gRPC支持四种通信模式,其中流式传输包含客户端流、服务端流和双向流三种,适合传输大体积或分批次的XML数据,避免单次请求包过大导致的性能问题。不同流模式的传输方向和处理逻辑存在明显差异,需要结合业务场景选择。

gRPC流式上传XML数据实现步骤
1. 定义Proto服务
首先需要在proto文件中定义支持流式传输的服务方法,指定请求和响应的流类型,同时定义XML数据的承载结构。
syntax = "proto3";
package xml_upload;
// 定义XML数据块结构,分片传输时使用
message XmlChunk {
bytes xml_content = 1; // 单块XML内容
int32 chunk_index = 2; // 分片序号
bool is_last = 3; // 是否为最后一块
}
// 定义上传响应结构
message UploadResponse {
bool success = 1;
string message = 2;
}
// 定义服务,UploadXml为客户端流方法,GetXmlStream为服务端流方法
service XmlStreamService {
// 客户端流:客户端持续发送XML分片,服务端接收完成后返回响应
rpc UploadXml (stream XmlChunk) returns (UploadResponse);
// 服务端流:客户端发送请求,服务端持续返回XML分片
rpc GetXmlStream (XmlChunk) returns (stream XmlChunk);
}
2. 客户端流上传XML实现(Go语言示例)
客户端流模式下,客户端可以分多次发送XML数据分片,服务端在接收到所有分片后统一处理并返回结果。
package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"time"
pb "path/to/proto" // 替换为实际proto生成的包路径
"google.golang.org/grpc"
)
func main() {
// 建立gRPC连接
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewXmlStreamServiceClient(conn)
// 读取本地XML文件
xmlData, err := ioutil.ReadFile("test.xml")
if err != nil {
log.Fatalf("读取XML文件失败: %v", err)
}
// 将XML数据拆分为每块1024字节的分片
chunkSize := 1024
var chunks []string
for i := 0; i < len(xmlData); i += chunkSize {
end := i + chunkSize
if end > len(xmlData) {
end = len(xmlData)
}
chunks = append(chunks, string(xmlData[i:end]))
}
// 调用客户端流方法
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.UploadXml(ctx)
if err != nil {
log.Fatalf("调用UploadXml失败: %v", err)
}
// 循环发送所有分片
for index, chunk := range chunks {
req := &pb.XmlChunk{
XmlContent: []byte(chunk),
ChunkIndex: int32(index),
IsLast: index == len(chunks)-1,
}
if err := stream.Send(req); err != nil {
log.Fatalf("发送分片失败: %v", err)
}
}
// 接收服务端响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("接收响应失败: %v", err)
}
fmt.Printf("上传结果: %v, 消息: %sn", resp.Success, resp.Message)
}
3. 服务端流返回XML实现(Go语言示例)
服务端流模式下,客户端发送一次请求,服务端可以分多次返回XML数据分片,适合服务端需要主动推送批量XML数据的场景。
package main
import (
"fmt"
"io/ioutil"
"log"
"net"
pb "path/to/proto" // 替换为实际proto生成的包路径
"google.golang.org/grpc"
)
type server struct {
pb.UnimplementedXmlStreamServiceServer
}
// 实现客户端流上传方法
func (s *server) UploadXml(stream pb.XmlStreamService_UploadXmlServer) error {
var xmlBuilder []byte
// 循环接收客户端发送的所有分片
for {
chunk, err := stream.Recv()
if err != nil {
// 接收完成,返回响应
return stream.SendAndClose(&pb.UploadResponse{
Success: true,
Message: fmt.Sprintf("共接收%d字节XML数据", len(xmlBuilder)),
})
}
xmlBuilder = append(xmlBuilder, chunk.XmlContent...)
// 如果是最后一块,可提前做校验
if chunk.IsLast {
// 这里可以加入XML格式校验逻辑
}
}
}
// 实现服务端流获取方法
func (s *server) GetXmlStream(req *pb.XmlChunk, stream pb.XmlStreamService_GetXmlStreamServer) error {
// 读取本地XML文件,模拟服务端数据
xmlData, err := ioutil.ReadFile("test.xml")
if err != nil {
return err
}
chunkSize := 1024
// 分片发送XML数据
for i := 0; i < len(xmlData); i += chunkSize {
end := i + chunkSize
if end > len(xmlData) {
end = len(xmlData)
}
chunk := &pb.XmlChunk{
XmlContent: xmlData[i:end],
ChunkIndex: int32(i / chunkSize),
IsLast: end == len(xmlData),
}
if err := stream.Send(chunk); err != nil {
return err
}
}
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
s := grpc.NewServer()
pb.RegisterXmlStreamServiceServer(s, &server{})
log.Printf("服务启动,监听端口: 50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("服务启动失败: %v", err)
}
}
客户端流与服务器流的核心区别
| 对比维度 | 客户端流 | 服务器流 |
|---|---|---|
| 数据传输方向 | 客户端向服务端单向持续发送数据 | 服务端向客户端单向持续发送数据 |
| 请求发送次数 | 客户端可发送多次请求消息 | 客户端仅发送一次请求消息 |
| 响应返回次数 | 服务端仅返回一次响应消息 | 服务端可返回多次响应消息 |
| 适用场景 | 大XML文件上传、批量XML数据上报 | 大XML文件下载、服务端主动推送批量XML数据 |
| 结束时机 | 客户端调用CloseAndRecv后结束发送,等待响应 | 服务端发送完所有数据后结束流 |
注意事项
- XML分片时尽量保证分片不包含不完整的XML标签,避免服务端拼接后解析失败
- 流式传输需要设置合理的超时时间,避免长时间占用连接
- 如果XML数据需要校验格式,建议在服务端接收完所有分片后统一校验,避免分片校验的复杂度
- 双向流模式可以同时支持客户端和服务端双向发送数据,适合需要实时交互的XML数据传输场景