在Go语言开发的HTTP服务中,文件上传和下载功能如果缺少速率限制,可能会导致单个请求占满服务器带宽,影响其他正常请求的处理。通过合理的速率控制,可以平衡不同请求的带宽占用,提升服务的整体可用性。

速率限制的核心思路
文件传输的速率限制本质是控制单位时间内允许传输的字节数。我们可以通过自定义io.Reader和io.Writer接口的实现,在每次读写数据时暂停合适的时间,从而将传输速率控制在目标范围内。核心逻辑是记录已传输的字节数和时间,当单位时间内传输的字节数达到阈值时,就休眠一段时间再继续传输。
限制文件下载速率
文件下载的场景中,服务器需要向客户端发送文件数据,我们可以封装一个带速率限制的io.Writer,在写入响应的时候控制速率。
速率限制写入器实现
首先实现一个RateLimitedWriter结构体,它包装了原始的io.Writer,并维护速率限制相关的状态:
package main
import (
"io"
"time"
)
// RateLimitedWriter 带速率限制的写入器
type RateLimitedWriter struct {
writer io.Writer // 原始写入器
rate int64 // 目标速率,单位:字节/秒
totalBytes int64 // 已写入的总字节数
startTime time.Time // 写入开始时间
currentSecond int64 // 当前统计的秒数
secondBytes int64 // 当前秒内已写入的字节数
}
// NewRateLimitedWriter 创建速率限制写入器
func NewRateLimitedWriter(writer io.Writer, rate int64) *RateLimitedWriter {
return &RateLimitedWriter{
writer: writer,
rate: rate,
startTime: time.Now(),
}
}
// Write 实现io.Writer接口的Write方法
func (w *RateLimitedWriter) Write(p []byte) (n int, err error) {
// 如果速率设置为0,不做限制,直接写入
if w.rate <= 0 {
return w.writer.Write(p)
}
// 分段处理写入的数据,避免一次写入过多导致速率超标
chunkSize := int(w.rate / 10) // 每次最多写入速率的十分之一,减少休眠次数
if chunkSize <= 0 {
chunkSize = 1
}
totalWritten := 0
for totalWritten < len(p) {
end := totalWritten + chunkSize
if end > len(p) {
end = len(p)
}
chunk := p[totalWritten:end]
// 写入当前分段
n, err = w.writer.Write(chunk)
if err != nil {
return totalWritten, err
}
totalWritten += n
w.totalBytes += int64(n)
w.secondBytes += int64(n)
// 计算当前经过的秒数
elapsed := time.Since(w.startTime).Seconds()
currentSecond := int64(elapsed)
// 如果进入新的秒,重置当前秒的字节数
if currentSecond > w.currentSecond {
w.currentSecond = currentSecond
w.secondBytes = 0
}
// 如果当前秒写入的字节数超过速率限制,计算需要休眠的时间
if w.secondBytes > w.rate {
over := w.secondBytes - w.rate
// 计算需要休眠的秒数,避免过度休眠
sleepTime := time.Duration(float64(over)/float64(w.rate)*1000) * time.Millisecond
if sleepTime > 0 {
time.Sleep(sleepTime)
}
// 休眠后重置当前秒的字节数
w.secondBytes = 0
w.startTime = time.Now()
w.currentSecond = int64(w.startTime.Seconds())
}
}
return totalWritten, nil
}
下载接口实现示例
在HTTP下载接口中使用上面的RateLimitedWriter,限制下载速率为每秒100KB:
package main
import (
"net/http"
"os"
)
func downloadHandler(w http.ResponseWriter, r *http.Request) {
// 打开要下载的文件
file, err := os.Open("./test_file.zip")
if err != nil {
http.Error(w, "文件打开失败", http.StatusInternalServerError)
return
}
defer file.Close()
// 获取文件信息,设置响应头
fileInfo, err := file.Stat()
if err != nil {
http.Error(w, "获取文件信息失败", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Disposition", "attachment; filename=test_file.zip")
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", string(rune(fileInfo.Size())))
// 创建速率限制写入器,限制速率为100KB/s,即102400字节/秒
rateLimitedWriter := NewRateLimitedWriter(w, 102400)
// 将文件内容写入速率限制写入器
_, err = io.Copy(rateLimitedWriter, file)
if err != nil {
http.Error(w, "文件传输失败", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/download", downloadHandler)
http.ListenAndServe(":8080", nil)
}
限制文件上传速率
文件上传的场景中,客户端向服务器发送文件数据,我们可以封装一个带速率限制的io.Reader,在读取请求体的时候控制速率。
速率限制读取器实现
实现RateLimitedReader结构体,包装原始的io.Reader:
package main
import (
"io"
"time"
)
// RateLimitedReader 带速率限制的读取器
type RateLimitedReader struct {
reader io.Reader // 原始读取器
rate int64 // 目标速率,单位:字节/秒
totalBytes int64 // 已读取的总字节数
startTime time.Time // 读取开始时间
currentSecond int64 // 当前统计的秒数
secondBytes int64 // 当前秒内已读取的字节数
}
// NewRateLimitedReader 创建速率限制读取器
func NewRateLimitedReader(reader io.Reader, rate int64) *RateLimitedReader {
return &RateLimitedReader{
reader: reader,
rate: rate,
startTime: time.Now(),
}
}
// Read 实现io.Reader接口的Read方法
func (r *RateLimitedReader) Read(p []byte) (n int, err error) {
// 如果速率设置为0,不做限制,直接读取
if r.rate <= 0 {
return r.reader.Read(p)
}
// 分段读取,每次最多读取速率的十分之一
chunkSize := int(r.rate / 10)
if chunkSize <= 0 {
chunkSize = 1
}
if len(p) > chunkSize {
p = p[:chunkSize]
}
// 读取数据
n, err = r.reader.Read(p)
if n > 0 {
r.totalBytes += int64(n)
r.secondBytes += int64(n)
// 计算当前经过的秒数
elapsed := time.Since(r.startTime).Seconds()
currentSecond := int64(elapsed)
// 如果进入新的秒,重置当前秒的字节数
if currentSecond > r.currentSecond {
r.currentSecond = currentSecond
r.secondBytes = 0
}
// 如果当前秒读取的字节数超过速率限制,计算需要休眠的时间
if r.secondBytes > r.rate {
over := r.secondBytes - r.rate
sleepTime := time.Duration(float64(over)/float64(r.rate)*1000) * time.Millisecond
if sleepTime > 0 {
time.Sleep(sleepTime)
}
// 休眠后重置当前秒的字节数
r.secondBytes = 0
r.startTime = time.Now()
r.currentSecond = int64(r.startTime.Seconds())
}
}
return n, err
}
上传接口实现示例
在HTTP上传接口中使用RateLimitedReader,限制上传速率为每秒100KB:
package main
import (
"net/http"
"os"
)
func uploadHandler(w http.ResponseWriter, r *http.Request) {
// 限制请求体大小,避免过大请求
r.Body = http.MaxBytesReader(w, r.Body, 100<<20) // 限制最大100MB
// 创建速率限制读取器,限制速率为100KB/s
rateLimitedReader := NewRateLimitedReader(r.Body, 102400)
// 创建保存上传文件的路径
file, err := os.Create("./uploaded_file.bin")
if err != nil {
http.Error(w, "创建文件失败", http.StatusInternalServerError)
return
}
defer file.Close()
// 将速率限制读取器的内容拷贝到文件
_, err = io.Copy(file, rateLimitedReader)
if err != nil {
http.Error(w, "文件上传失败", http.StatusInternalServerError)
return
}
w.Write([]byte("文件上传成功"))
}
func main() {
http.HandleFunc("/upload", uploadHandler)
http.ListenAndServe(":8080", nil)
}
注意事项
- 速率限制的精度受休眠时间和分段大小的影响,分段越小精度越高,但会增加休眠次数,需要在精度和性能之间做平衡。
- 如果服务同时处理多个上传或下载请求,每个请求应该单独创建速率限制器,避免互相影响。
- 实际场景中可以根据用户等级、文件类型等动态调整速率限制值,提升服务的灵活性。
- 速率限制只是带宽控制的一部分,还可以结合连接数限制、请求频率限制等手段进一步提升服务的稳定性。