在Go语言的后端开发中,数据库操作往往是性能瓶颈之一,单线程串行执行数据库调用会导致大量时间浪费在等待IO上,而Goroutine的轻量级特性非常适合用来并发处理多个独立的数据库请求,Channel则可以作为Goroutine之间通信和协调的核心工具,合理搭配两者能大幅提升数据库操作的效率。

Goroutine与Channel的基础概念
Goroutine是Go语言实现的用户态轻量级线程,创建和切换成本极低,适合用来并发执行多个独立的数据库查询任务。Channel是Go语言中用于Goroutine之间传递数据的管道,分为无缓冲Channel和有缓冲Channel两种,能安全地在不同Goroutine之间传递结果或者控制信号,避免共享内存带来的数据竞争问题。
无关联数据库调用的并发实现
当多个数据库调用之间没有依赖关系,比如需要同时查询用户基本信息、用户订单列表、用户积分记录这三个独立的表,就可以为每个查询启动一个Goroutine,通过Channel收集结果。
以下是具体的实现示例:
package main
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/go-sql-driver/mysql"
)
// 模拟用户基本信息结构体
type UserInfo struct {
ID int
Name string
Age int
}
// 模拟用户订单结构体
type UserOrder struct {
OrderID int
UserID int
Amount float64
}
// 模拟用户积分结构体
type UserScore struct {
UserID int
Score int
}
// 查询用户基本信息
func queryUserInfo(ctx context.Context, db *sql.DB, userID int, resultChan chan<- UserInfo, errChan chan<- error) {
// 模拟查询逻辑,实际场景替换为真实的SQL查询
time.Sleep(100 * time.Millisecond)
info := UserInfo{ID: userID, Name: "张三", Age: 25}
resultChan <- info
errChan <- nil
}
// 查询用户订单
func queryUserOrder(ctx context.Context, db *sql.DB, userID int, resultChan chan<- []UserOrder, errChan chan<- error) {
time.Sleep(120 * time.Millisecond)
orders := []UserOrder{
{OrderID: 1, UserID: userID, Amount: 99.9},
{OrderID: 2, UserID: userID, Amount: 199.8},
}
resultChan <- orders
errChan <- nil
}
// 查询用户积分
func queryUserScore(ctx context.Context, db *sql.DB, userID int, resultChan chan<- UserScore, errChan chan<- error) {
time.Sleep(80 * time.Millisecond)
score := UserScore{UserID: userID, Score: 1500}
resultChan <- score
errChan <- nil
}
func main() {
// 初始化数据库连接,实际场景替换为真实的DSN
db, err := sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/test")
if err != nil {
panic(err)
}
defer db.Close()
ctx := context.Background()
userID := 1
// 创建结果Channel和错误Channel
userInfoChan := make(chan UserInfo, 1)
userOrderChan := make(chan []UserOrder, 1)
userScoreChan := make(chan UserScore, 1)
errChan1 := make(chan error, 1)
errChan2 := make(chan error, 1)
errChan3 := make(chan error, 1)
// 启动三个Goroutine并发执行查询
go queryUserInfo(ctx, db, userID, userInfoChan, errChan1)
go queryUserOrder(ctx, db, userID, userOrderChan, errChan2)
go queryUserScore(ctx, db, userID, userScoreChan, errChan3)
// 收集结果
userInfo := <-userInfoChan
userOrders := <-userOrderChan
userScore := <-userScoreChan
// 收集错误
err1 := <-errChan1
err2 := <-errChan2
err3 := <-errChan3
// 处理错误
if err1 != nil {
fmt.Printf("查询用户信息失败: %vn", err1)
}
if err2 != nil {
fmt.Printf("查询用户订单失败: %vn", err2)
}
if err3 != nil {
fmt.Printf("查询用户积分失败: %vn", err3)
}
fmt.Printf("用户信息: %+vn", userInfo)
fmt.Printf("用户订单: %+vn", userOrders)
fmt.Printf("用户积分: %+vn", userScore)
}
有依赖关系的并发数据库调用
如果数据库调用之间存在依赖,比如需要先查询用户ID,再根据用户ID查询对应的订单,这时候可以通过有缓冲Channel传递中间结果,协调两个Goroutine的执行顺序。
示例代码如下:
package main
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/go-sql-driver/mysql"
)
type User struct {
ID int
Name string
}
type Order struct {
OrderID int
UserID int
Amount float64
}
// 第一步:查询用户列表
func queryUserList(ctx context.Context, db *sql.DB, userChan chan<- []User, errChan chan<- error) {
time.Sleep(100 * time.Millisecond)
users := []User{
{ID: 1, Name: "张三"},
{ID: 2, Name: "李四"},
}
userChan <- users
errChan <- nil
}
// 第二步:根据用户列表查询对应的订单
func queryOrderByUserIDs(ctx context.Context, db *sql.DB, userChan <-chan []User, orderChan chan<- map[int][]Order, errChan chan<- error) {
users := <-userChan
userIDMap := make(map[int][]Order)
for _, user := range users {
// 模拟根据单个用户ID查询订单
time.Sleep(50 * time.Millisecond)
orders := []Order{
{OrderID: user.ID * 10, UserID: user.ID, Amount: 100.0},
}
userIDMap[user.ID] = orders
}
orderChan <- userIDMap
errChan <- nil
}
func main() {
db, err := sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/test")
if err != nil {
panic(err)
}
defer db.Close()
ctx := context.Background()
userChan := make(chan []User, 1)
orderChan := make(chan map[int][]Order, 1)
errChan1 := make(chan error, 1)
errChan2 := make(chan error, 1)
// 启动第一个Goroutine查询用户列表
go queryUserList(ctx, db, userChan, errChan1)
// 启动第二个Goroutine,等待用户列表结果后查询订单
go queryOrderByUserIDs(ctx, db, userChan, orderChan, errChan2)
// 收集结果和错误
users := <-userChan
orders := <-orderChan
err1 := <-errChan1
err2 := <-errChan2
if err1 != nil {
fmt.Printf("查询用户列表失败: %vn", err1)
}
if err2 != nil {
fmt.Printf("查询订单失败: %vn", err2)
}
fmt.Printf("用户列表: %+vn", users)
fmt.Printf("用户订单映射: %+vn", orders)
}
需要注意的问题
- 控制Goroutine的数量,避免无限制创建Goroutine导致数据库连接池耗尽,可以通过带缓冲的Channel实现简单的信号量来控制并发数。
- Channel使用后要及时关闭,避免Goroutine泄露,不过要注意不要在多个Goroutine中同时关闭同一个Channel。
- 数据库操作要设置合理的超时时间,通过
context.WithTimeout控制查询的最大执行时间,避免Goroutine长时间阻塞。 - 错误要单独通过Channel传递,不要直接忽略Goroutine中的错误,否则出现问题很难排查。
并发数控制示例
如果需要并发查询100个用户的订单,不能启动100个Goroutine,可以通过信号量控制最大并发数为10:
package main
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
type Order struct {
OrderID int
UserID int
}
func querySingleUserOrder(ctx context.Context, db *sql.DB, userID int, wg *sync.WaitGroup, resultChan chan<- Order, errChan chan<- error) {
defer wg.Done()
// 模拟查询单个用户订单
time.Sleep(100 * time.Millisecond)
order := Order{OrderID: userID * 100, UserID: userID}
resultChan <- order
errChan <- nil
}
func main() {
db, err := sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/test")
if err != nil {
panic(err)
}
defer db.Close()
ctx := context.Background()
// 最大并发数
maxWorker := 10
// 信号量,控制并发数
sem := make(chan struct{}, maxWorker)
// 结果Channel
resultChan := make(chan Order, 100)
// 错误Channel
errChan := make(chan error, 100)
var wg sync.WaitGroup
// 模拟100个用户ID
userIDs := make([]int, 100)
for i := 0; i < 100; i++ {
userIDs[i] = i + 1
}
for _, userID := range userIDs {
wg.Add(1)
// 获取信号量,如果满了会阻塞
sem <- struct{}{}
go func(uid int) {
defer func() { <-sem }() // 释放信号量
querySingleUserOrder(ctx, db, uid, &wg, resultChan, errChan)
}(userID)
}
// 等待所有Goroutine执行完成
wg.Wait()
close(resultChan)
close(errChan)
// 统计结果
successCount := 0
for range resultChan {
successCount++
}
fmt.Printf("成功查询%d个用户的订单n", successCount)
}