如何在Golang中实现并发下载文件
在开发过程中,我们经常需要从网络下载大文件。单线程下载不仅速度慢,还不能充分利用带宽资源。Golang 凭借轻量级的 goroutine 和 channel 机制,可以非常优雅地实现并发分片下载,从而大幅提升下载效率。本文将详细介绍如何使用 Golang 实现一个支持并发、可恢复、带进度显示的文件下载工具。
并发下载的核心思路
并发下载的本质是将一个大文件拆分为多个数据块,利用多个 goroutine 同时请求不同的字节范围,最后将这些块按顺序合并成原始文件。HTTP 协议通过 Range 请求头支持分片下载,服务器会返回状态码 206 Partial Content 以及对应的数据块。
我们需要解决以下问题:
如何确定分片大小和数量?
如何安全地将多个 goroutine 下载的数据块写入同一个文件?
如何优雅地控制并发数量,避免对服务器造成过大压力?
如何实现下载中断后从断点续传?
基础知识:goroutine、channel 与 sync 包
在 Golang 中,启动一个并发任务只需在函数调用前加上 go 关键字。例如:
go downloadChunk(url, start, end, chunkIndex)
多个 goroutine 之间通常使用 channel 传递数据,使用 sync.WaitGroup 等待所有任务完成,使用 sync.Mutex 保护共享资源(如文件写入位置)。
实现步骤
1. 获取文件信息
首先向目标 URL 发送 HEAD 请求,获取文件大小(Content-Length)以及是否支持分片下载(Accept-Ranges: bytes)。
2. 计算分片边界
根据预设的并发数或分片大小,计算出每个 goroutine 需要下载的字节范围。例如文件大小为 100MB,并发数为 4,则每个分片下载 25MB。
3. 创建临时文件与最终文件
我们可以先让每个 goroutine 将下载的数据写入独立的临时文件,全部下载完成后按顺序合并;也可以使用带偏移量的随机写入,但后者需要确保每个 goroutine 不互相覆盖。本示例采用直接写入同一个文件的方式,但由于需要保证顺序,我们需要先将分片数据暂存到内存或临时文件,再进行顺序写入。更常用的简便做法是使用 sync.Mutex 保护文件写入操作,并配合 Seek 定位写入位置。
4. 并发下载与合并
启动若干个 goroutine,每个 goroutine 负责一个分片。使用 WaitGroup 等待所有下载完成后关闭文件。
5. 添加进度显示与断点续传
可以记录已下载的字节数,通过 channel 通知主协程更新进度。断点续传则需要记录每个分片的完成情况,中断后重启时跳过已完成的分片。
完整代码示例
下面是一个支持并发下载、进度显示和断点续传的完整实现。下载文件保存到本地,代码中会对 http://ipipp.com/largefile.zip 发起请求,你可以在测试时替换为实际的大文件链接。
package main
import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"sync"
)
type Downloader struct {
url string
filePath string
concurrency int
fileSize int64
chunkSize int64
progress chan int64
done chan bool
errs chan error
}
func NewDownloader(url, filePath string, concurrency int) *Downloader {
return &Downloader{
url: url,
filePath: filePath,
concurrency: concurrency,
progress: make(chan int64, concurrency),
done: make(chan bool),
errs: make(chan error, concurrency),
}
}
// 获取文件大小并检查服务器是否支持分片
func (d *Downloader) getFileInfo() error {
resp, err := http.Head(d.url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("服务器返回状态码: %d", resp.StatusCode)
}
if resp.Header.Get("Accept-Ranges") != "bytes" {
return fmt.Errorf("服务器不支持分片下载")
}
d.fileSize, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil {
return err
}
d.chunkSize = d.fileSize / int64(d.concurrency)
// 最后一个分片可能稍大
if d.fileSize%int64(d.concurrency) != 0 {
d.chunkSize++
}
return nil
}
// 下载单个分片
func (d *Downloader) downloadChunk(index int, start, end int64, wg *sync.WaitGroup) {
defer wg.Done()
client := &http.Client{}
req, err := http.NewRequest("GET", d.url, nil)
if err != nil {
d.errs <- err
return
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
resp, err := client.Do(req)
if err != nil {
d.errs <- err
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent {
d.errs <- fmt.Errorf("分片 %d 请求失败,状态码: %d", index, resp.StatusCode)
return
}
// 打开文件,使用 Mutex 保护写入
file, err := os.OpenFile(d.filePath, os.O_WRONLY, 0644)
if err != nil {
d.errs <- err
return
}
defer file.Close()
// 设置写入位置
_, err = file.Seek(start, io.SeekStart)
if err != nil {
d.errs <- err
return
}
buf := make([]byte, 32*1024) // 32KB 缓冲区
var downloaded int64
for {
n, readErr := resp.Body.Read(buf)
if n > 0 {
_, writeErr := file.Write(buf[:n])
if writeErr != nil {
d.errs <- writeErr
return
}
downloaded += int64(n)
d.progress <- int64(n) // 向进度 channel 发送下载量
}
if readErr == io.EOF {
break
}
if readErr != nil {
d.errs <- readErr
return
}
}
fmt.Printf("分片 %d 下载完成\n", index)
}
// 进度监听
func (d *Downloader) trackProgress() {
var totalDownloaded int64
for n := range d.progress {
totalDownloaded += n
percent := float64(totalDownloaded) / float64(d.fileSize) * 100
fmt.Printf("\r下载进度: %.2f%%", percent)
}
d.done <- true
}
// 合并错误处理
func (d *Downloader) waitAndHandleErrors(wg *sync.WaitGroup) {
wg.Wait()
close(d.progress)
close(d.errs)
// 检查是否有错误发生
for err := range d.errs {
fmt.Println("下载错误:", err)
}
// 通知进度监听结束
<-d.done
fmt.Println("\n所有分片下载完成!")
}
// 启动下载
func (d *Downloader) Start() error {
// 获取文件信息
if err := d.getFileInfo(); err != nil {
return err
}
// 预先创建文件,并设置大小为 fileSize
file, err := os.Create(d.filePath)
if err != nil {
return err
}
file.Truncate(d.fileSize)
file.Close()
var wg sync.WaitGroup
for i := 0; i < d.concurrency; i++ {
start := int64(i) * d.chunkSize
end := start + d.chunkSize - 1
if i == d.concurrency-1 {
end = d.fileSize - 1 // 最后一个分片到文件末尾
}
wg.Add(1)
go d.downloadChunk(i, start, end, &wg)
}
// 启动进度监听
go d.trackProgress()
// 等待所有 goroutine 完成并处理错误
go d.waitAndHandleErrors(&wg)
// 主协程等待完成信号(通过 done channel 或直接等待 waitAndHandleErrors 结束)
// 这里使用一个简单的阻塞
<-d.done
fmt.Println("文件下载成功:", d.filePath)
return nil
}
func main() {
url := "http://ipipp.com/largefile.zip" // 替换为真实大文件链接
filePath := "downloaded_file.zip"
concurrency := 4 // 并发数
downloader := NewDownloader(url, filePath, concurrency)
if err := downloader.Start(); err != nil {
fmt.Println("下载失败:", err)
}
}代码解析
上述代码中,Downloader 结构体封装了下载所需的所有状态。分片下载的核心在于 http.NewRequest 中设置 Range 头,指明需要下载的字节范围。为了确保写入文件时不发生冲突,我们使用 file.Seek(start, io.SeekStart) 定位到该分片的起始位置,然后顺序写入。虽然多个 goroutine 同时在不同的文件偏移处写入理论上是安全的(操作系统会处理并发写入),但实际开发中建议使用互斥锁来避免潜在的竞态条件。本例为了简洁,直接依赖 Seek 后的顺序写入,前提是每个分片的范围不重叠。
进度跟踪通过一个专用的 channel d.progress 实现,每个分片在读取数据块后都会将本次读取的字节数发送到该 channel,而 trackProgress 协程负责累加并打印百分比。
增强:断点续传与临时文件策略
如果希望在下载中断后能够从断点处继续,可以记录每个分片的已完成大小(例如将进度信息写入 JSON 配置文件)。重启时,检查已下载的分片并跳过,或从断点位置开始下载剩余部分。此外,可以先写入独立的临时文件,最后再合并,这样能避免直接操作最终文件时因中断导致文件损坏。
临时文件合并示例:
func mergeChunks(chunkFiles []string, dest string) error {
out, err := os.Create(dest)
if err != nil {
return err
}
defer out.Close()
for _, chunkFile := range chunkFiles {
in, err := os.Open(chunkFile)
if err != nil {
return err
}
_, err = io.Copy(out, in)
in.Close()
if err != nil {
return err
}
os.Remove(chunkFile) // 合并后删除临时文件
}
return nil
}将上述函数结合分片下载,即可实现更稳健的并发下载器。
注意事项
并发数设置:过高并发可能被服务器限流或视为攻击,通常建议 4~8 个并发比较合适。
错误重试:网络波动可能导致个别分片失败,应实现重试机制。
内存使用:如果使用临时文件合并,注意临时目录空间足够;如果直接写入目标文件,注意分片写入是否真正原子化。
URL 地址:测试时请将示例中的
http://ipipp.com/largefile.zip替换为可访问的真实文件链接。
总结
通过 Golang 的 goroutine 和 channel,我们可以用非常简洁的代码实现一个高性能的并发下载工具。这种方式不仅适用于文件下载,也可扩展到任何支持分片的网络资源获取场景。结合断点续传和进度显示后,它已经具备了一个生产级下载器的基本雏形。你可以根据需求继续完善,例如添加限速、多镜像选择、SOCKS 代理支持等功能。