Go语言WaitGroup复用报错及RWLock并发控制优化问询
正确的解决方案:利用RWMutex实现优先级互斥
sync.RWMutex的特性刚好匹配你的需求:
- 写锁(Lock()):独占锁,同一时间只能有一个goroutine持有,会把所有其他读/写锁请求都挡住
- 读锁(RLock()):共享锁,多个goroutine可以同时持有,只有写锁会被它挡住
对应到你的场景:
- LocalSource执行关键操作(获取代理+写入队列)时,拿写锁,确保这段时间内没有remote能碰队列
- RemoteSource写入队列时,拿读锁,这样多个remote可以同时写,只要没有local在占用写锁
另外,直接把之前误用的WaitGroup删掉就行,它的作用是等待一批goroutine完成,和你的互斥需求完全不搭。
优化后的完整代码
package main import ( "sync" "time" // 导入你的其他依赖,比如日志、自定义类型包 ) // 这里定义你的全局常量和类型(示例) var DURATION_FORCE_UPDATE = 30 * time.Second type ProxySource struct { Id string WatchWait time.Duration // 其他字段... } type ProxyProvider struct { receivingProxyBC struct{ In() chan<- Proxy } // 你的队列结构 workingProxyBC struct{ Len() int } // 其他字段... } type Proxy struct { /* 你的代理结构体 */ } // 假设你的日志函数已定义 func LogInfo(msg ...interface{}) {} func LogError(msg ...interface{}) {} func main() { // 初始化你的localSources、remoteSources、proxyProvider实例... // 核心互斥锁:控制local和remote的写入权限 proxyAccessLock := sync.RWMutex{} // 启动localSource协程 for _, proxySource := range localSources { go func(src *ProxySource) { lastGetTime := time.Now() firstLoad := true for { now := time.Now() totalProxies := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len() // 代理足够且未到强制更新时间,先睡会儿 if totalProxies > 100 && now.Sub(lastGetTime) < DURATION_FORCE_UPDATE && !firstLoad { time.Sleep(src.WatchWait) continue } firstLoad = false // 关键操作:获取代理+写入队列,全程持有写锁 proxyAccessLock.Lock() defer proxyAccessLock.Unlock() // 确保锁一定会释放,避免死锁 proxies, err := src.GetProxies() if err != nil { LogError("Failed to get proxies from source ", src.Id, ": ", err) time.Sleep(5 * time.Second) continue } LogInfo("Adding proxies from local source ", src.Id) for _, p := range proxies { proxyProvider.receivingProxyBC.In() <- p } LogInfo("Finished adding proxies from local source ", src.Id) lastGetTime = now time.Sleep(20 * time.Second) LogInfo("Local source ", src.Id, " entering watch period") } }(proxySource) } // 启动remoteSource协程 for _, proxySource := range remoteSources { go func(src *ProxySource) { // 延迟启动,给local源一点初始化时间 time.Sleep(2 * time.Second) lastGetTime := time.Now() firstLoad := true for { now := time.Now() totalProxies := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len() // 代理足够且未到强制更新时间,先睡会儿 if totalProxies > 100 && now.Sub(lastGetTime) < DURATION_FORCE_UPDATE && !firstLoad { time.Sleep(src.WatchWait) continue } firstLoad = false // 获取代理的操作不需要互斥,只有写入队列需要 proxies, err := src.GetProxies() if err != nil { LogError("Failed to get proxies from remote source ", src.Id, ": ", err) time.Sleep(5 * time.Second) continue } // 写入队列时持有读锁:多个remote可以同时写,只要没有local在占用写锁 proxyAccessLock.RLock() LogInfo("Adding proxies from remote source ", src.Id) for _, p := range proxies { proxyProvider.receivingProxyBC.In() <- p } proxyAccessLock.RUnlock() LogInfo("Finished adding proxies from remote source ", src.Id) lastGetTime = now time.Sleep(20 * time.Second) LogInfo("Remote source ", src.Id, " entering watch period") } }(proxySource) } // 阻塞主协程(根据你的程序需求调整,比如用信号监听) select {} }
关键优化点说明
- 抛弃错误的WaitGroup:完全移除之前误用的WaitGroup,改用RWMutex实现正确的互斥逻辑
- 锁的正确使用:
- Local源在「获取代理+写入队列」的整个流程中持有写锁,确保这段时间内没有任何remote能写入队列
- Remote源仅在写入队列时持有读锁,多个remote可以同时获取读锁,实现并行写入
- 锁的安全释放:Local源中用
defer释放写锁,即使GetProxies()出错,锁也能正确释放,避免死锁 - 缩小锁的范围:Remote源的
GetProxies()操作不需要加锁,只有写入队列时才加锁,提升性能
内容的提问来源于stack exchange,提问作者o0omycomputero0o




