gRPC提供了四种通信模式,其中双向流模式可以让客户端和服务端在单个连接上同时双向发送消息序列,非常适合需要实时交互的场景。实现该功能需要先定义protobuf接口,再通过Golang代码完成服务端和客户端的逻辑开发。

环境准备
在开始开发前需要安装必要的依赖工具,首先确保本地已经安装Golang环境,然后安装protobuf编译器和Golang的gRPC相关插件。
- 安装protobuf编译器,不同系统可以通过对应的包管理工具安装,安装完成后执行protoc --version验证是否安装成功
- 安装Golang的protobuf和gRPC插件,执行以下命令安装:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
定义protobuf接口
首先需要编写proto文件定义双向流的服务接口,这里以简单的消息推送场景为例,定义客户端和服务端互相发送消息的接口。
syntax = "proto3";
package chat;
option go_package = "./chat";
// 定义消息结构
message ChatMessage {
string sender = 1;
string content = 2;
int64 timestamp = 3;
}
// 定义双向流服务
service ChatService {
// 双向流方法,客户端和服务端都可以发送多个ChatMessage
rpc ChatStream (stream ChatMessage) returns (stream ChatMessage);
}
执行以下命令将proto文件编译为Golang代码:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative chat.proto
实现服务端逻辑
服务端需要实现proto定义的服务接口,处理客户端发送的流消息,同时可以向客户端发送响应消息。
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"google.golang.org/grpc"
pb "your_module_path/chat" // 替换为实际的chat包路径
)
// 定义服务结构体,实现ChatServiceServer接口
type chatServer struct {
pb.UnimplementedChatServiceServer
}
// 实现ChatStream方法
func (s *chatServer) ChatStream(stream pb.ChatService_ChatStreamServer) error {
// 启动一个goroutine处理客户端发送的消息
go func() {
for {
// 接收客户端发送的消息
in, err := stream.Recv()
if err == io.EOF {
// 客户端发送结束
return
}
if err != nil {
log.Printf("接收客户端消息失败: %v", err)
return
}
log.Printf("收到客户端消息: 发送者=%s, 内容=%s", in.Sender, in.Content)
}
}()
// 向客户端发送多条消息
for i := 0; i < 5; i++ {
msg := &pb.ChatMessage{
Sender: "服务端",
Content: fmt.Sprintf("这是服务端发送的第%d条消息", i+1),
Timestamp: 0,
}
if err := stream.Send(msg); err != nil {
log.Printf("向客户端发送消息失败: %v", err)
return err
}
}
// 等待流结束
return nil
}
func main() {
// 监听端口
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听端口失败: %v", err)
}
// 创建gRPC服务端
s := grpc.NewServer()
pb.RegisterChatServiceServer(s, &chatServer{})
log.Printf("服务端启动,监听端口: 50051")
// 启动服务端
if err := s.Serve(lis); err != nil {
log.Fatalf("启动服务端失败: %v", err)
}
}
实现客户端逻辑
客户端需要连接服务端,同时发送流消息和接收服务端的响应消息。
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
pb "your_module_path/chat" // 替换为实际的chat包路径
)
func main() {
// 连接服务端
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("连接服务端失败: %v", err)
}
defer conn.Close()
// 创建客户端
client := pb.NewChatServiceClient(conn)
// 创建双向流
stream, err := client.ChatStream(context.Background())
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 启动goroutine接收服务端发送的消息
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Println("服务端流结束")
return
}
if err != nil {
log.Printf("接收服务端消息失败: %v", err)
return
}
log.Printf("收到服务端消息: 发送者=%s, 内容=%s", msg.Sender, msg.Content)
}
}()
// 向服务端发送多条消息
for i := 0; i < 3; i++ {
msg := &pb.ChatMessage{
Sender: "客户端",
Content: fmt.Sprintf("这是客户端发送的第%d条消息", i+1),
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Fatalf("向服务端发送消息失败: %v", err)
}
time.Sleep(time.Second)
}
// 关闭客户端的发送流
stream.CloseSend()
// 等待接收完毕
time.Sleep(2 * time.Second)
}
运行与验证
先启动服务端程序,再启动客户端程序,可以看到服务端会打印收到的客户端消息,客户端会打印收到的服务端消息,说明双向流功能已经正常工作。
注意事项
- 双向流中服务端和客户端发送消息是并行的,不需要等待对方响应后再发送下一条消息
- 当一端调用Send方法发送消息后,另一端通过Recv方法接收,流的生命周期由双方共同控制
- 如果出现连接断开等错误,需要做好重连和错误处理的逻辑
- 生产环境中需要使用安全的连接配置,替换示例中的grpc.WithInsecure()选项