Golang RPC服务注册与发现实现示例
在微服务架构中,服务之间的远程调用是家常便饭。为了让服务能够彼此发现并通信,服务注册与发现机制成为了基础设施中不可或缺的一环。本文将使用 Go 语言,结合 Consul 作为注册中心,演示如何实现一个基本的 RPC 服务注册与发现示例。
什么是 RPC?
RPC(Remote Procedure Call)即远程过程调用,允许客户端像调用本地方法一样调用远程服务。在 Go 中,标准库 net/rpc 提供了简洁的 RPC 实现,开发者只需定义好服务结构体和方法,注册后即可通过 TCP/HTTP 供客户端调用。
服务注册与发现
服务注册是指服务启动时将自身的元数据(IP、端口、健康状态等)注册到注册中心;服务发现则是消费者从注册中心获取可用服务列表,从而进行调用。Consul、Etcd、ZooKeeper 是常见的注册中心。本文选用 Consul,因为它提供了 HTTP API,集成简单。
项目结构
rpc-registry-demo/ ├── server/ │ └── main.go # 服务端:注册服务到Consul并启动RPC ├── client/ │ └── main.go # 客户端:从Consul发现服务并发起RPC调用 └── common/ └── service.go # 定义RPC服务接口和参数
定义服务接口 (common/service.go)
我们先定义一个简单的算术服务,包含一个远程方法 Multiply,接收两个整数参数并返回乘积。
package common
// Args 定义RPC调用参数
type Args struct {
A, B int
}
// ArithService 算术运算服务
type ArithService struct{}
// Multiply 乘法运算
func (t *ArithService) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}注意:标准库 net/rpc 要求方法符合签名 func (t *T) MethodName(argType T1, replyType *T2) error,且参数和返回值类型需为导出类型或内置类型。
服务端实现 (server/main.go)
服务端负责初始化 RPC 服务,注册到 Consul,并启动 TCP 监听。
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"os"
"os/signal"
"syscall"
consulapi "github.com/hashicorp/consul/api"
"rpc-registry-demo/common"
)
const (
consulAddress = "127.0.0.1:8500"
serviceName = "arith-service"
servicePort = 1234
)
func main() {
// 1. 创建RPC服务并注册
arith := new(common.ArithService)
rpc.Register(arith)
// 2. 监听TCP端口
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", servicePort))
if err != nil {
log.Fatalf("Listen error: %v", err)
}
defer listener.Close()
// 3. 注册服务到Consul
go registerToConsul()
// 4. 处理信号,优雅退出时反注册
go handleGracefulShutdown()
fmt.Printf("RPC server is running on port %d...\n", servicePort)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Accept error: %v", err)
continue
}
go rpc.ServeConn(conn)
}
}
func registerToConsul() {
config := consulapi.DefaultConfig()
config.Address = consulAddress
client, err := consulapi.NewClient(config)
if err != nil {
log.Fatalf("Create consul client error: %v", err)
}
// 获取本机IP
ip := getLocalIP()
registration := &consulapi.AgentServiceRegistration{
ID: fmt.Sprintf("%s-%s-%d", serviceName, ip, servicePort),
Name: serviceName,
Address: ip,
Port: servicePort,
Check: &consulapi.AgentServiceCheck{
TCP: fmt.Sprintf("%s:%d", ip, servicePort), // TCP健康检查
Interval: "10s",
Timeout: "1s",
},
}
if err := client.Agent().ServiceRegister(registration); err != nil {
log.Fatalf("Register service error: %v", err)
}
fmt.Println("Service registered to Consul.")
}
func getLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "127.0.0.1"
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
return "127.0.0.1"
}
func handleGracefulShutdown() {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("\nShutting down server...")
deregisterFromConsul()
os.Exit(0)
}
func deregisterFromConsul() {
config := consulapi.DefaultConfig()
config.Address = consulAddress
client, err := consulapi.NewClient(config)
if err != nil {
log.Printf("Create consul client error: %v", err)
return
}
ip := getLocalIP()
serviceID := fmt.Sprintf("%s-%s-%d", serviceName, ip, servicePort)
if err := client.Agent().ServiceDeregister(serviceID); err != nil {
log.Printf("Deregister service error: %v", err)
} else {
fmt.Println("Service deregistered from Consul.")
}
}服务启动后会自动向 Consul 注册,并周期性地进行 TCP 健康检查。当程序收到终止信号时会优雅地注销服务。
客户端实现 (client/main.go)
客户端从 Consul 查询可用服务实例,选择一个发起 RPC 调用。
package main
import (
"fmt"
"log"
"net/rpc"
consulapi "github.com/hashicorp/consul/api"
"rpc-registry-demo/common"
)
const (
consulAddress = "127.0.0.1:8500"
serviceName = "arith-service"
)
func main() {
serviceAddr, err := discoverService()
if err != nil {
log.Fatalf("Discover service error: %v", err)
}
fmt.Printf("Discovered service at: %s\n", serviceAddr)
// 连接RPC服务
client, err := rpc.Dial("tcp", serviceAddr)
if err != nil {
log.Fatalf("Dial error: %v", err)
}
defer client.Close()
// 调用远程方法
args := &common.Args{A: 7, B: 8}
var reply int
err = client.Call("ArithService.Multiply", args, &reply)
if err != nil {
log.Fatalf("RPC call error: %v", err)
}
fmt.Printf("Result: %d * %d = %d\n", args.A, args.B, reply)
}
func discoverService() (string, error) {
config := consulapi.DefaultConfig()
config.Address = consulAddress
client, err := consulapi.NewClient(config)
if err != nil {
return "", err
}
services, _, err := client.Health().Service(serviceName, "", true, nil)
if err != nil {
return "", err
}
if len(services) == 0 {
return "", fmt.Errorf("no healthy service found")
}
// 简单负载均衡:取第一个健康实例
s := services[0].Service
return fmt.Sprintf("%s:%d", s.Address, s.Port), nil
}客户端通过 Consul 的健康检查接口过滤出健康实例,然后建立 TCP 连接并发起 RPC 调用。实际生产环境中可增加连接池、重试、负载均衡等策略。
运行与测试
启动 Consul(开发模式):
consul agent -dev。启动服务端:
go run server/main.go,观察控制台输出注册成功信息。启动客户端:
go run client/main.go,应能看到服务发现后的地址和计算结果。
注意事项
确保 Go 环境已配置,并安装依赖:
go get github.com/hashicorp/consul/api。服务端 IP 获取逻辑需根据部署环境调整,容器环境下可能需要特殊处理。
健康检查采用了 TCP 方式,确保端口可达即认为健康,可根据业务需求改为 HTTP 检查。
示例中未处理网络分区、重连等场景,生产环境需进一步完善。
总结
本文展示了如何使用 Go 的标准库 net/rpc 配合 Consul 实现基本的服务注册与发现。通过将注册逻辑集成到服务生命周期中,并让客户端动态发现服务地址,我们构建了一个灵活、可扩展的微服务通信基础。你可以在此基础上引入 gRPC、服务网格等更高级的技术,但原理都是相通的。