【我想自己做一个】Go协程池

Go协程池的实现

1. 前言

按理说Go的协程足够轻量了,为什么还要自己实现一套协程池呢?

正常情况下是不用的,但前不久我遇到了不正常情况,比如修复数据.....

那个excle表格,最后一行的行号的那几个0差点没给我看晕过去,虽说协程轻量,但也经不起我这么开呀。已经能想象到了,每个请求开一个协程,系统估计会被我玩炸掉哈哈。可是这么多请求串行执行,要跑到猴年马月呀,于是心一狠,写了一套属于自己的协程池,只能算丐版,大佬勿喷😂

2. 如何设计

先明白协程池的作用:

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

参考java的ThreadPoolExecutor类,看看他的运行机制:

图2 ThreadPoolExecutor运行流程

我们可以看到实现协程池的组件至少需要:任务分发器、任务队列、核心协程、客户端

  1. 任务分发器:负责分发任务给协程或将任务阻塞在队列,将任务队列和协程池解耦
  2. 任务队列:将任务缓冲到阻塞队列中(存放任务的容器罢了)
  3. 核心协程:执行任务队列中的任务
  4. 客户端:负责声明任务内容,并调用协程池执行任务

所以也就可以画出我们阉割版的协程池的架构图了:

协程池架构图

3. Go协程池

看以下内容前建议先了解一下channel的知识,【Let's Go!】 sync源码面前无秘密

3.1. 任务队列&任务池

// Job 任务接口
type Job interface {
	Do() error
}

// JobChan Job的channel队列
type JobChan chan Job

// WorkerChan JobChan的channel队列
type WorkerChan chan JobChan

var (
	jobQueue   JobChan    // 任务队列
	workerPool WorkerChan // 任务池
)

var maxWorkerSize = 20 // 线程池大小
  1. 声明任务Job接口,用于客户端定义任务的行为
  2. JobChan作为任务队列
  3. 这里的WorkerChan作为处理任务队列的协程

这里的WorkerChan可能比较难理解,需要多理解几遍~

WorkerChan既是协程池也是任务池,任务队列和协程之间通过它交互,可以理解为上图的**「任务获取」**

3.2. 工作协程

type Worker struct {
	jobChannel JobChan
	quit       chan bool
}

func (w *Worker) Start() {
	go func() {
		for {
			// 把Worker的任务队列加入到任务池中,代表该协程处于空闲状态
			workerPool <- w.jobChannel
			select {
			// 取出任务队列中的任务,并执行
			case job := <-w.jobChannel:
				if err := job.Do(); err != nil {
					fmt.Println("do job failed!")
				}
			case <-w.quit:
				return
			}
		}
	}()
}

func NewWorker() *Worker {
	return &Worker{
		jobChannel: make(chan Job),
		quit:       make(chan bool),
	}
}

3.3. 任务分发器&客户端

type Dispatcher struct {
	Workers []*Worker // 工作协程
	quit    chan bool // 停止分发标志
}

func NewDispatcher(maxPoolSize, maxJobQueueSize int) (*Dispatcher, error) {
	if maxWorkerSize <= 0 || maxJobQueueSize <= 0 {
		return nil, errors.New("pool size must greater than zero")
	}
	maxWorkerSize = maxPoolSize
	workerPool = make(WorkerChan, maxWorkerSize)
	jobQueue = make(JobChan, maxJobQueueSize)
	return &Dispatcher{}, nil
}

func (d *Dispatcher) doDispatcher() {
	for {
		select {
		// 阻塞队列中是否有任务
		case job := <-jobQueue:
			// 工作协程是否可以处理阻塞队列
			jobChan := <-workerPool

			// 如果可以处理,就塞进去处理掉....
			jobChan <- job
		case <-d.quit:
			return
		}
	}
}

func (d *Dispatcher) RegisterJob(job Job) {
	jobQueue <- job
}

func (d *Dispatcher) Run() {
	for i := 0; i < maxWorkerSize; i++ {
		worker := NewWorker()
		d.Workers = append(d.Workers, worker)
		worker.Start()
	}

	go d.doDispatcher()
}


// DefaultGoroutinePoolExecutor 客户端默认实现
func DefaultGoroutinePoolExecutor(jobs ...Job) *Dispatcher {
	dispatcher, _ := GoroutinePoolExecutor(20, 20, jobs...)
	return dispatcher
}

func GoroutinePoolExecutor(maxPoolSize, maxJobQueueSize int, jobs ...Job) (*Dispatcher, error) {
	dispatcher, err := NewDispatcher(maxPoolSize, maxJobQueueSize)
	if err != nil {
		return nil, err
	}
	dispatcher.Run()
	for _, job := range jobs {
		dispatcher.RegisterJob(job)
	}
	return dispatcher, err
}

3.4. 总结

总体来说,这个协程池还是很简单的,比对着ThreadPoolExecutor类以及我那张阉割版架构图,还是很容易理解其中的核心思想的,而且开箱即用,使用DefaultGoroutinePoolExecutor配置默认参数,再注册属于自己的任务,协程池就跑起来啦~

这个协程池的主要目的是为了控制流量、限制并发~

goroutine的设计在前面已经说过了,非常的轻量,几乎是即启即用的那种,所以协程池对于减少资源消耗,提高响应速度的帮助微乎其微,但确实可以起到管理协程以及限制并发的目的!!!

4. 性能测试

type CrawlerHandler struct {
	wg *sync.WaitGroup
}

func (c CrawlerHandler) Do() error {
	c.wg.Add(1)
	defer c.wg.Done()
	url := "https://www.baidu.com"
	resp, err := http.Get(url)
	if err != nil {
		return err
	}
	_, err = ioutil.ReadAll(resp.Body)
	if err != nil {
		return err
	}
	return nil
}

func BenchmarkDefaultGoroutinePoolExecutor(b *testing.B) {
	wg := sync.WaitGroup{}
	jobs := make([]Job, 0, b.N)
	for i := 0; i < b.N; i++ {
		jobs = append(jobs, CrawlerHandler{
			wg: &wg,
		})
	}
	// 20 20
	DefaultGoroutinePoolExecutor(jobs...)
	wg.Wait()
}

func BenchmarkGoroutinePoolExecutor(b *testing.B) {
	wg := sync.WaitGroup{}
	jobs := make([]Job, 0, b.N)
	for i := 0; i < b.N; i++ {
		jobs = append(jobs, CrawlerHandler{
			wg: &wg,
		})
	}
	_, _ = GoroutinePoolExecutor(100, 100, jobs...)
	wg.Wait()
}

func BenchmarkSerialExecutor(b *testing.B) {
	wg := sync.WaitGroup{}
	for i := 0; i < b.N; i++ {
		c := CrawlerHandler{
			wg: &wg,
		}
		_ = c.Do()
	}
	wg.Wait()
}

这里我们声明了3组压测用例,Do里面的操作就是爬取网页

  1. 20 20的协程池
  2. 100 100的协程池
  3. 串行处理
goos: darwin
goarch: amd64
pkg: golearning/goskill/threadpool
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkDefaultGoroutinePoolExecutor
BenchmarkDefaultGoroutinePoolExecutor-12    	     100	  70900068 ns/op
BenchmarkGoroutinePoolExecutor
BenchmarkGoroutinePoolExecutor-12           	   10000	   2570963 ns/op
BenchmarkSerialExecutor
BenchmarkSerialExecutor-12                	      	16	  68145716 ns/op
PASS

如上便是压测结果,还是很可观的,2组的一次操作仅用了2570963ns,3组的一次操作则用了68145716ns

看了一些blog,发现这些参数的配置还是挺需要经验性......

至此协程池告一段落~

5. 参考文献

  1. 本文的协程池主要是参照很早以前看的一篇blog,现在找不到......不甚感激那位作者

  2. Java线程池实现原理及其在美团业务中的实践

  3. 面试官:Java中的线程池用过吧?来说说你是怎么理解线程池吧?

  4. Golang 开发需要协程池吗?

  5. 100 行写一个 go 的协程池 (任务池)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×