You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Go语言WaitGroup复用报错及RWLock并发控制优化问询

正确的解决方案:利用RWMutex实现优先级互斥

sync.RWMutex的特性刚好匹配你的需求:

  • 写锁(Lock()):独占锁,同一时间只能有一个goroutine持有,会把所有其他读/写锁请求都挡住
  • 读锁(RLock()):共享锁,多个goroutine可以同时持有,只有写锁会被它挡住

对应到你的场景:

  • LocalSource执行关键操作(获取代理+写入队列)时,拿写锁,确保这段时间内没有remote能碰队列
  • RemoteSource写入队列时,拿读锁,这样多个remote可以同时写,只要没有local在占用写锁

另外,直接把之前误用的WaitGroup删掉就行,它的作用是等待一批goroutine完成,和你的互斥需求完全不搭。


优化后的完整代码

package main

import (
    "sync"
    "time"
    // 导入你的其他依赖,比如日志、自定义类型包
)

// 这里定义你的全局常量和类型(示例)
var DURATION_FORCE_UPDATE = 30 * time.Second

type ProxySource struct {
    Id         string
    WatchWait  time.Duration
    // 其他字段...
}

type ProxyProvider struct {
    receivingProxyBC struct{ In() chan<- Proxy } // 你的队列结构
    workingProxyBC   struct{ Len() int }
    // 其他字段...
}

type Proxy struct { /* 你的代理结构体 */ }

// 假设你的日志函数已定义
func LogInfo(msg ...interface{})  {}
func LogError(msg ...interface{}) {}

func main() {
    // 初始化你的localSources、remoteSources、proxyProvider实例...

    // 核心互斥锁:控制local和remote的写入权限
    proxyAccessLock := sync.RWMutex{}

    // 启动localSource协程
    for _, proxySource := range localSources {
        go func(src *ProxySource) {
            lastGetTime := time.Now()
            firstLoad := true

            for {
                now := time.Now()
                totalProxies := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()

                // 代理足够且未到强制更新时间,先睡会儿
                if totalProxies > 100 && now.Sub(lastGetTime) < DURATION_FORCE_UPDATE && !firstLoad {
                    time.Sleep(src.WatchWait)
                    continue
                }
                firstLoad = false

                // 关键操作:获取代理+写入队列,全程持有写锁
                proxyAccessLock.Lock()
                defer proxyAccessLock.Unlock() // 确保锁一定会释放,避免死锁

                proxies, err := src.GetProxies()
                if err != nil {
                    LogError("Failed to get proxies from source ", src.Id, ": ", err)
                    time.Sleep(5 * time.Second)
                    continue
                }

                LogInfo("Adding proxies from local source ", src.Id)
                for _, p := range proxies {
                    proxyProvider.receivingProxyBC.In() <- p
                }
                LogInfo("Finished adding proxies from local source ", src.Id)

                lastGetTime = now
                time.Sleep(20 * time.Second)
                LogInfo("Local source ", src.Id, " entering watch period")
            }
        }(proxySource)
    }

    // 启动remoteSource协程
    for _, proxySource := range remoteSources {
        go func(src *ProxySource) {
            // 延迟启动,给local源一点初始化时间
            time.Sleep(2 * time.Second)
            lastGetTime := time.Now()
            firstLoad := true

            for {
                now := time.Now()
                totalProxies := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()

                // 代理足够且未到强制更新时间,先睡会儿
                if totalProxies > 100 && now.Sub(lastGetTime) < DURATION_FORCE_UPDATE && !firstLoad {
                    time.Sleep(src.WatchWait)
                    continue
                }
                firstLoad = false

                // 获取代理的操作不需要互斥,只有写入队列需要
                proxies, err := src.GetProxies()
                if err != nil {
                    LogError("Failed to get proxies from remote source ", src.Id, ": ", err)
                    time.Sleep(5 * time.Second)
                    continue
                }

                // 写入队列时持有读锁:多个remote可以同时写,只要没有local在占用写锁
                proxyAccessLock.RLock()
                LogInfo("Adding proxies from remote source ", src.Id)
                for _, p := range proxies {
                    proxyProvider.receivingProxyBC.In() <- p
                }
                proxyAccessLock.RUnlock()
                LogInfo("Finished adding proxies from remote source ", src.Id)

                lastGetTime = now
                time.Sleep(20 * time.Second)
                LogInfo("Remote source ", src.Id, " entering watch period")
            }
        }(proxySource)
    }

    // 阻塞主协程(根据你的程序需求调整,比如用信号监听)
    select {}
}

关键优化点说明

  1. 抛弃错误的WaitGroup:完全移除之前误用的WaitGroup,改用RWMutex实现正确的互斥逻辑
  2. 锁的正确使用
    • Local源在「获取代理+写入队列」的整个流程中持有写锁,确保这段时间内没有任何remote能写入队列
    • Remote源仅在写入队列时持有读锁,多个remote可以同时获取读锁,实现并行写入
  3. 锁的安全释放:Local源中用defer释放写锁,即使GetProxies()出错,锁也能正确释放,避免死锁
  4. 缩小锁的范围:Remote源的GetProxies()操作不需要加锁,只有写入队列时才加锁,提升性能

内容的提问来源于stack exchange,提问作者o0omycomputero0o

火山引擎 最新活动