1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
// GoroutineManager 用于管理协程的结构体 type GoroutineManager struct { maxGoroutines int taskQueue chan TaskFunc semaphore chan struct{} } type TaskFunc struct { Func func(a string) error Msg string } // NewGoroutineManager 创建一个GoroutineManager实例 func NewGoroutineManager(maxGoroutines int) *GoroutineManager { gm := &GoroutineManager{ maxGoroutines: maxGoroutines, semaphore: make(chan struct{}, maxGoroutines), taskQueue: make(chan TaskFunc, maxGoroutines), } go gm.Worker() return gm } func (gm *GoroutineManager) Limit() error { gm.semaphore <- struct{}{} return nil } func (gm *GoroutineManager) Release() { <-gm.semaphore } // RunGoroutine 开始协程 func (gm *GoroutineManager) RunGoroutine(task func(a string) error, msg string) error { taskFunc := TaskFunc{ Func: task, Msg: msg, } gm.taskQueue <- taskFunc return nil } func (gm *GoroutineManager) Worker() { for v := range gm.taskQueue { _ = gm.Limit() go func() { defer gm.Release() v.Func(v.Msg) }() } } |
调用逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
type Cve5AllUpdateQueue struct { GoroutineManager *utils.GoroutineManager } func (c *Cve5AllUpdateQueue) Listen() { service.Cve5Log("Cve5AllUpdateQueue Queue listen start") maxGoroutines := 30 c.GoroutineManager = utils.NewGoroutineManager(maxGoroutines) for { msg, err := utils.GetRedisQueueOne(service.Cve5AllUpdateRedisKey) if err != nil || msg == nil { time.Sleep(time.Millisecond * 100) continue } msgStr, ok := msg.([]byte) if !ok { service.Cve5Log("Cve5AllUpdateQueue Listen msg to string type error") continue } if string(msgStr) == "" { service.Cve5Log("Cve5AllUpdateQueue Listen msg empty") continue } res := c.Handle(string(msgStr)) if res != nil { service.Cve5Log("Cve5AllUpdateQueue Handle" + res.Error()) } } } func (c *Cve5AllUpdateQueue) Handle(msg string) error { _ = c.GoroutineManager.RunGoroutine(service.HandleCveAllUpdate, msg) return nil } |
需要关注的地方在于
1, semaphore chan struct{} 仅仅控制worker()新开协程的数量,所以gm.Limit()函数必须有个返回值,以便于适当的阻塞worker()
2, taskQueue chan TaskFunc仅仅控制chan传入func方法的数量。
3,taskQueue + semaphore 才能控制传入func数量和执行func数量,新开的协程不能超过maxGoroutines
4,调用RunGoroutine时,也要接收返回值。
「三年博客,如果觉得我的文章对您有用,请帮助本站成长」
共有 0 - golang 实现简易协程池