1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
package utils import "sync" // GoroutineManager 用于管理协程的结构体 type GoroutineManager struct { maxGoroutines int taskQueue chan TaskFunc semaphore chan struct{} lock sync.Mutex } type TaskFunc struct { Func func(a string) error Msg string } // NewGoroutineManager 创建一个GoroutineManager实例 func NewGoroutineManager(maxGoroutines int) *GoroutineManager { gm := &GoroutineManager{ maxGoroutines: maxGoroutines, semaphore: make(chan struct{}, maxGoroutines), taskQueue: make(chan TaskFunc, maxGoroutines), } go gm.Worker() return gm } func (gm *GoroutineManager) Limit() error { gm.semaphore <- struct{}{} return nil } func (gm *GoroutineManager) Release() { <-gm.semaphore } // RunGoroutine 开始协程 func (gm *GoroutineManager) RunGoroutine(task func(a string) error, msg string) error { taskFunc := TaskFunc{ Func: task, Msg: msg, } gm.taskQueue <- taskFunc return nil } func (gm *GoroutineManager) Worker() { for v := range gm.taskQueue { gm.lock.Lock() _ = gm.Limit() gm.lock.Unlock() go func(task TaskFunc) { defer gm.Release() err := task.Func(task.Msg) if err != nil { return } }(v) } } |
尤其注意:在并发情况下,对一个channel进行多协程消费,并不能保证每个元素只被消费一次。(通常是每个与元素至少一次)
所以在Worker方法中加锁,保证并发安全。加锁虽然是在 for range 循环内部,但在每次循环中都会先获取互斥锁 lock,然后再执行任务相关的操作,最后释放互斥锁。这样的确可以视作在通道操作上加了互斥锁。
调用逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
package queue import ( "../utils" "github.com/astaxie/beego" ) type Queue interface { Listen() } func InitQueues() { ip := utils.GetLocalIp() if ip != "" && ip == beego.AppConfig.String("targetIp") { RegisterQueue(new(Cve5AllUpdateOnceQueue)) } } func RegisterQueue(queues ...Queue) { for _, q := range queues { go func(q Queue) { q.Listen() }(q) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
package queue import ( "../service" "../utils" "fmt" "time" ) // 一次性全量更新 type Cve5AllUpdateOnceQueue struct { GoroutineManager *utils.GoroutineManager } func (c *Cve5AllUpdateOnceQueue) Listen() { maxGoroutines := 30 fmt.Println("Cve5AllUpdateOnceQueue Listen") c.GoroutineManager = utils.NewGoroutineManager(maxGoroutines) count := 0 for { msg, err := utils.GetRedisQueueOne(service.Cve5AllUpdateOnceRedisKey) if err != nil || msg == nil { time.Sleep(time.Millisecond * 100) continue } msgStr, ok := msg.([]byte) if !ok { continue } if string(msgStr) == "" { continue } count++ fmt.Println("Cve5AllUpdateOnceQueue handle msg", count) fmt.Println(string(msgStr)) res := c.Handle(string(msgStr)) if res != nil { service.Cve5Log("Cve5AllUpdateOnceQueue Handle" + res.Error()) } } } func (c *Cve5AllUpdateOnceQueue) Handle(msg string) error { _ = c.GoroutineManager.RunGoroutine(service.HandleCveAllUpdateOnce, msg) return nil } |
2025.02.14:
无法复现出channel的线程安全问题。
deepSeek: 加不加锁的区别
| 场景 |
不加锁 |
加锁 |
| 线程安全 |
是(通道操作是线程安全的) |
是(通道操作是线程安全的) |
| 性能 |
可能更好,因为没有锁的开销 |
可能稍差,因为锁会增加额外的开销 |
| 协程竞争 |
多个协程可能同时阻塞在 semaphore <- struct{}{},增加等待时间 |
每次只有一个协程尝试向 semaphore 发送数据,减少竞争 |
| 适用场景 |
并发量较小,或者对性能要求较高 |
并发量较大,或者需要严格控制协程的启动顺序 |
「三年博客,如果觉得我的文章对您有用,请帮助本站成长」
共有 0 - go 协程池