在Go中实现可动态更新URL列表的定时轮询任务,核心是通过goroutine管理轮询逻辑,用channel实现URL列表的动态同步,同时结合context控制任务的启停与生命周期,避免资源泄漏。下面会逐步拆解实现思路并给出完整示例。

核心设计思路
整个任务的运行逻辑可以分为三个部分:
- 轮询任务管理器:负责启动轮询goroutine,维护当前的URL列表,接收外部的列表更新请求
- 定时轮询执行器:按照设定的时间间隔,遍历当前URL列表发起请求,处理请求结果
- 动态更新通道:外部通过向通道发送更新指令,实时修改正在运行的轮询任务的URL列表
数据结构定义
首先定义URL信息和更新指令的结构体,方便后续逻辑处理:
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
// URLInfo 存储单个URL的轮询信息
type URLInfo struct {
URL string // 待轮询的URL
Timeout time.Duration // 单个请求的超时时间
}
// UpdateOp 定义更新操作的类型
type UpdateOp int
const (
AddOp UpdateOp = iota // 新增URL
DelOp // 删除URL
UpdateOp // 更新URL的超时时间
)
// UpdateCmd 更新指令结构体
type UpdateCmd struct {
Op UpdateOp // 操作类型
URL URLInfo // 操作的URL信息
}轮询任务管理器实现
管理器会维护当前的URL列表,启动定时轮询逻辑,同时监听更新通道的指令:
// PollingManager 轮询任务管理器
type PollingManager struct {
urls map[string]URLInfo // 存储当前URL列表,key为URL字符串
updateCh chan UpdateCmd // 接收更新指令的通道
ctx context.Context
cancel context.CancelFunc
interval time.Duration // 轮询间隔
wg sync.WaitGroup
mu sync.RWMutex // 保护urls的并发读写
}
// NewPollingManager 创建新的轮询管理器
func NewPollingManager(ctx context.Context, interval time.Duration) *PollingManager {
subCtx, cancel := context.WithCancel(ctx)
return &PollingManager{
urls: make(map[string]URLInfo),
updateCh: make(chan UpdateCmd, 10),
ctx: subCtx,
cancel: cancel,
interval: interval,
}
}核心轮询与更新逻辑
管理器的启动方法会启动两个goroutine,一个负责定时轮询,一个负责处理更新指令:
// Start 启动轮询任务
func (pm *PollingManager) Start() {
pm.wg.Add(2)
// 启动定时轮询goroutine
go pm.pollLoop()
// 启动更新指令处理goroutine
go pm.updateLoop()
}
// pollLoop 定时轮询逻辑
func (pm *PollingManager) pollLoop() {
defer pm.wg.Done()
ticker := time.NewTicker(pm.interval)
defer ticker.Stop()
for {
select {
case <-pm.ctx.Done():
fmt.Println("轮询任务已停止")
return
case <-ticker.C:
pm.doPoll()
}
}
}
// doPoll 执行单次轮询
func (pm *PollingManager) doPoll() {
pm.mu.RLock()
// 复制当前URL列表,避免长时间持有读锁
currentURLs := make([]URLInfo, 0, len(pm.urls))
for _, info := range pm.urls {
currentURLs = append(currentURLs, info)
}
pm.mu.RUnlock()
for _, info := range currentURLs {
pm.pollSingleURL(info)
}
}
// pollSingleURL 轮询单个URL
func (pm *PollingManager) pollSingleURL(info URLInfo) {
ctx, cancel := context.WithTimeout(pm.ctx, info.Timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", info.URL, nil)
if err != nil {
fmt.Printf("创建请求失败,URL: %s, 错误: %v\n", info.URL, err)
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("请求失败,URL: %s, 错误: %v\n", info.URL, err)
return
}
defer resp.Body.Close()
fmt.Printf("轮询成功,URL: %s, 状态码: %d\n", info.URL, resp.StatusCode)
}
// updateLoop 处理更新指令的循环
func (pm *PollingManager) updateLoop() {
defer pm.wg.Done()
for {
select {
case <-pm.ctx.Done():
fmt.Println("更新监听已停止")
return
case cmd := <-pm.updateCh:
pm.handleUpdateCmd(cmd)
}
}
}
// handleUpdateCmd 处理单条更新指令
func (pm *PollingManager) handleUpdateCmd(cmd UpdateCmd) {
pm.mu.Lock()
defer pm.mu.Unlock()
switch cmd.Op {
case AddOp:
if _, exists := pm.urls[cmd.URL.URL]; !exists {
pm.urls[cmd.URL.URL] = cmd.URL
fmt.Printf("新增URL成功: %s\n", cmd.URL.URL)
}
case DelOp:
if _, exists := pm.urls[cmd.URL.URL]; exists {
delete(pm.urls, cmd.URL.URL)
fmt.Printf("删除URL成功: %s\n", cmd.URL.URL)
}
case UpdateOp:
if _, exists := pm.urls[cmd.URL.URL]; exists {
pm.urls[cmd.URL.URL] = cmd.URL
fmt.Printf("更新URL超时时间成功: %s, 新超时: %v\n", cmd.URL.URL, cmd.URL.Timeout)
}
}
}
// AddURL 对外暴露的新增URL方法
func (pm *PollingManager) AddURL(url string, timeout time.Duration) {
pm.updateCh <- UpdateCmd{
Op: AddOp,
URL: URLInfo{URL: url, Timeout: timeout},
}
}
// DelURL 对外暴露的删除URL方法
func (pm *PollingManager) DelURL(url string) {
pm.updateCh <- UpdateCmd{
Op: DelOp,
URL: URLInfo{URL: url},
}
}
// UpdateURLTimeout 对外暴露的更新URL超时时间方法
func (pm *PollingManager) UpdateURLTimeout(url string, timeout time.Duration) {
pm.updateCh <- UpdateCmd{
Op: UpdateOp,
URL: URLInfo{URL: url, Timeout: timeout},
}
}
// Stop 停止轮询任务
func (pm *PollingManager) Stop() {
pm.cancel()
pm.wg.Wait()
fmt.Println("轮询管理器已完全停止")
}使用示例
下面是完整的main函数示例,展示如何启动任务、动态更新URL列表:
func main() {
// 创建根context
ctx := context.Background()
// 创建轮询管理器,轮询间隔为5秒
manager := NewPollingManager(ctx, 5*time.Second)
// 启动任务
manager.Start()
// 初始添加两个URL
manager.AddURL("https://ipipp.com", 3*time.Second)
manager.AddURL("https://www.baidu.com", 2*time.Second)
// 运行10秒后动态新增URL
time.Sleep(10 * time.Second)
manager.AddURL("https://www.sina.com", 3*time.Second)
// 再运行10秒后删除第一个URL
time.Sleep(10 * time.Second)
manager.DelURL("https://ipipp.com")
// 再运行10秒后更新第二个URL的超时时间
time.Sleep(10 * time.Second)
manager.UpdateURLTimeout("https://www.baidu.com", 5*time.Second)
// 总共运行35秒后停止任务
time.Sleep(5 * time.Second)
manager.Stop()
}注意事项
- URL列表的读写使用了读写锁
sync.RWMutex,避免并发读写map导致的panic,同时读锁不会阻塞其他读操作,提升性能 - 更新通道设置了缓冲大小,避免发送更新指令时阻塞调用方
- 所有goroutine都通过context控制生命周期,停止时可以确保所有goroutine正常退出,不会泄漏
- 轮询单个URL时使用了请求级别的context超时,避免单个请求阻塞过长时间影响整体轮询节奏