Go协程池的实现
1. 前言
按理说Go的协程足够轻量了,为什么还要自己实现一套协程池呢?
正常情况下是不用的,但前不久我遇到了不正常情况,比如修复数据.....
那个excle表格,最后一行的行号的那几个0差点没给我看晕过去,虽说协程轻量,但也经不起我这么开呀。已经能想象到了,每个请求开一个协程,系统估计会被我玩炸掉哈哈。可是这么多请求串行执行,要跑到猴年马月呀,于是心一狠,写了一套属于自己的协程池,只能算丐版,大佬勿喷😂
2. 如何设计
先明白协程池的作用:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
参考java的ThreadPoolExecutor类,看看他的运行机制:

我们可以看到实现协程池的组件至少需要:任务分发器、任务队列、核心协程、客户端
- 任务分发器:负责分发任务给协程或将任务阻塞在队列,将任务队列和协程池解耦
- 任务队列:将任务缓冲到阻塞队列中(存放任务的容器罢了)
- 核心协程:执行任务队列中的任务
- 客户端:负责声明任务内容,并调用协程池执行任务
所以也就可以画出我们阉割版的协程池的架构图了:

3. Go协程池
看以下内容前建议先了解一下channel的知识,【Let's Go!】 sync源码面前无秘密
3.1. 任务队列&任务池
// Job 任务接口
type Job interface {
Do() error
}
// JobChan Job的channel队列
type JobChan chan Job
// WorkerChan JobChan的channel队列
type WorkerChan chan JobChan
var (
jobQueue JobChan // 任务队列
workerPool WorkerChan // 任务池
)
var maxWorkerSize = 20 // 线程池大小
- 声明任务Job接口,用于客户端定义任务的行为
- JobChan作为任务队列
- 这里的WorkerChan作为处理任务队列的协程
这里的WorkerChan可能比较难理解,需要多理解几遍~
WorkerChan既是协程池也是任务池,任务队列和协程之间通过它交互,可以理解为上图的**「任务获取」**
3.2. 工作协程
type Worker struct {
jobChannel JobChan
quit chan bool
}
func (w *Worker) Start() {
go func() {
for {
// 把Worker的任务队列加入到任务池中,代表该协程处于空闲状态
workerPool <- w.jobChannel
select {
// 取出任务队列中的任务,并执行
case job := <-w.jobChannel:
if err := job.Do(); err != nil {
fmt.Println("do job failed!")
}
case <-w.quit:
return
}
}
}()
}
func NewWorker() *Worker {
return &Worker{
jobChannel: make(chan Job),
quit: make(chan bool),
}
}
3.3. 任务分发器&客户端
type Dispatcher struct {
Workers []*Worker // 工作协程
quit chan bool // 停止分发标志
}
func NewDispatcher(maxPoolSize, maxJobQueueSize int) (*Dispatcher, error) {
if maxWorkerSize <= 0 || maxJobQueueSize <= 0 {
return nil, errors.New("pool size must greater than zero")
}
maxWorkerSize = maxPoolSize
workerPool = make(WorkerChan, maxWorkerSize)
jobQueue = make(JobChan, maxJobQueueSize)
return &Dispatcher{}, nil
}
func (d *Dispatcher) doDispatcher() {
for {
select {
// 阻塞队列中是否有任务
case job := <-jobQueue:
// 工作协程是否可以处理阻塞队列
jobChan := <-workerPool
// 如果可以处理,就塞进去处理掉....
jobChan <- job
case <-d.quit:
return
}
}
}
func (d *Dispatcher) RegisterJob(job Job) {
jobQueue <- job
}
func (d *Dispatcher) Run() {
for i := 0; i < maxWorkerSize; i++ {
worker := NewWorker()
d.Workers = append(d.Workers, worker)
worker.Start()
}
go d.doDispatcher()
}
// DefaultGoroutinePoolExecutor 客户端默认实现
func DefaultGoroutinePoolExecutor(jobs ...Job) *Dispatcher {
dispatcher, _ := GoroutinePoolExecutor(20, 20, jobs...)
return dispatcher
}
func GoroutinePoolExecutor(maxPoolSize, maxJobQueueSize int, jobs ...Job) (*Dispatcher, error) {
dispatcher, err := NewDispatcher(maxPoolSize, maxJobQueueSize)
if err != nil {
return nil, err
}
dispatcher.Run()
for _, job := range jobs {
dispatcher.RegisterJob(job)
}
return dispatcher, err
}
3.4. 总结
总体来说,这个协程池还是很简单的,比对着ThreadPoolExecutor类以及我那张阉割版架构图,还是很容易理解其中的核心思想的,而且开箱即用,使用DefaultGoroutinePoolExecutor配置默认参数,再注册属于自己的任务,协程池就跑起来啦~
这个协程池的主要目的是为了控制流量、限制并发~
goroutine的设计在前面已经说过了,非常的轻量,几乎是即启即用的那种,所以协程池对于减少资源消耗,提高响应速度的帮助微乎其微,但确实可以起到管理协程以及限制并发的目的!!!
4. 性能测试
type CrawlerHandler struct {
wg *sync.WaitGroup
}
func (c CrawlerHandler) Do() error {
c.wg.Add(1)
defer c.wg.Done()
url := "https://www.baidu.com"
resp, err := http.Get(url)
if err != nil {
return err
}
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return nil
}
func BenchmarkDefaultGoroutinePoolExecutor(b *testing.B) {
wg := sync.WaitGroup{}
jobs := make([]Job, 0, b.N)
for i := 0; i < b.N; i++ {
jobs = append(jobs, CrawlerHandler{
wg: &wg,
})
}
// 20 20
DefaultGoroutinePoolExecutor(jobs...)
wg.Wait()
}
func BenchmarkGoroutinePoolExecutor(b *testing.B) {
wg := sync.WaitGroup{}
jobs := make([]Job, 0, b.N)
for i := 0; i < b.N; i++ {
jobs = append(jobs, CrawlerHandler{
wg: &wg,
})
}
_, _ = GoroutinePoolExecutor(100, 100, jobs...)
wg.Wait()
}
func BenchmarkSerialExecutor(b *testing.B) {
wg := sync.WaitGroup{}
for i := 0; i < b.N; i++ {
c := CrawlerHandler{
wg: &wg,
}
_ = c.Do()
}
wg.Wait()
}
这里我们声明了3组压测用例,Do里面的操作就是爬取网页
- 20 20的协程池
- 100 100的协程池
- 串行处理
goos: darwin
goarch: amd64
pkg: golearning/goskill/threadpool
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkDefaultGoroutinePoolExecutor
BenchmarkDefaultGoroutinePoolExecutor-12 100 70900068 ns/op
BenchmarkGoroutinePoolExecutor
BenchmarkGoroutinePoolExecutor-12 10000 2570963 ns/op
BenchmarkSerialExecutor
BenchmarkSerialExecutor-12 16 68145716 ns/op
PASS
如上便是压测结果,还是很可观的,2组的一次操作仅用了2570963ns,3组的一次操作则用了68145716ns
看了一些blog,发现这些参数的配置还是挺需要经验性......
至此协程池告一段落~
5. 参考文献
-
本文的协程池主要是参照很早以前看的一篇blog,现在找不到......不甚感激那位作者
-
Java线程池实现原理及其在美团业务中的实践
-
面试官:Java中的线程池用过吧?来说说你是怎么理解线程池吧?
-
Golang 开发需要协程池吗?
-
100 行写一个 go 的协程池 (任务池)