3. sync源码面前无秘密
3.1. context
3.1.1. 背景&作用
背景:Go的服务端开发,几乎所有的request都是默认生成一个goroutine进行处理,但这也存在一个问题,如果该请求超时或取消了,所有在该请求上的goroutine如何迅速退出,并释放资源让系统回收?
作用:context的作用就是为了解决这个问题,在一组goroutine之间传递共享值,取消信号,deadline
3.1.2. 源码分析
Context源码
// Context是并发安全且幂等的
type Context interface {
// 返回 context 是否会被取消以及自动取消时间(即 deadline)
Deadline() (deadline time.Time, ok bool)
// 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
Done() <-chan struct{}
// 在 channel Done 关闭后,返回 context 取消原因
Err() error
// 获取 key 对应的 value
Value(key interface{}) interface{}
}
四种类型的Context
emptyCtx源码
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
cacelCtx源码
用于手动取消,超时取消
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
// 递归移除
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
timerCtx源码
cancleCtx的补充场景,用于定时取消
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
valueCtx源码
type valueCtx struct {
Context
key, val interface{}
}
如何查找一个value,Context并没有通过一个map去存储,而是递归遍历整个Context树
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
context的传递
共有WithValue,WithDeadline(WithTimeout调用的也是WithDeadline),WithCancel,Background,TODO
主要通过propagateCancel完成
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// context内部可识别的类型
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
// 构建父子关系
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
取消context
cancelCtx类型
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
timerCtx类型
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
3.1.3. 最佳实践
小建议:When designing an API with context, remember the advice: pass context.Context
in as an argument; don't store it in structs.
原因:将context作为结构体的属性,调用者对所使用的 context
生命周期产生迷惑,如有调用方想用 WitchCancel
,有的想用 WithDeadline
,也很难理解上面传来的 context 的作用是 cancel
?还是 deadline?
调用者所使用 context
的生命周期被绑定到了一个共享的 context 上面。
原文:Contexts and structs
翻译:官发博文:进一步阐明关于 context 的最佳实践
监听任务是否结束或取消
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
case <-time.After(3 * time.Second):
fmt.Println("任务完成")
}
}
// context deadline exceeded
3.1.4. 参考文献
一起聊聊 Go Context 的正确使用姿势
深度解密Go语言之context
Go语言设计与实现 上下文 Context
3.2. sync.Cond
3.2.1. 使用场景
一个goroutine的集合点,等待或发布一个event
这里的event是两个或两个以上的goroutine之间的任意信号
如果不使用sync.Cond,那么可能会使用的就是for轮询等待,这将消耗一个CPU核心的所有周期,也许你想用sleep去等待,但这仍然是低效的,因为你不知道应该sleep多久,最好有一个在event完成时就触发通知的机制,sync.Cond就提供了这样的一个功能。
3.2.2. 最佳实践
如果队列长度超过2,就会wait,直到队列长度<2,就会触发signal,通知wait的goroutine继续执行下去
func doCond() {
c := sync.NewCond(&sync.Mutex{})
queue := make([]interface{}, 0, 10)
removeFromQueue := func(dur time.Duration) {
time.Sleep(dur)
c.L.Lock()
queue = queue[1:]
fmt.Println("Remove success!")
c.L.Unlock()
c.Signal()
}
for i := 0; i < 10; i++ {
c.L.Lock()
for len(queue) == 2 {
c.Wait()
}
queue = append(queue, struct{}{})
fmt.Println("Add success!")
go removeFromQueue(1 * time.Second)
c.L.Unlock()
}
}
//Add success!
//Add success!
//Remove success!
//Remove success!
//Add success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Remove success!
//Add success!
//Add success!
A,B,C同时订阅了button,只要button被触发,就broadcast给A,B,C
func doBroadcastCond() {
type Button struct {
Clicked *sync.Cond
}
button := Button{Clicked: sync.NewCond(&sync.Mutex{})}
subscribe := func(c *sync.Cond, fn func()) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
c.L.Lock()
defer c.L.Unlock()
c.Wait()
fn()
}()
wg.Wait()
}
wg := sync.WaitGroup{}
wg.Add(3)
subscribe(button.Clicked, func() {
fmt.Println("A subscribe button")
wg.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("B subscribe button")
wg.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("C subscribe button")
wg.Done()
})
time.Sleep(2 * time.Second)
// 不加会死锁
button.Clicked.Broadcast()
wg.Wait()
}
//A subscribe button
//B subscribe button
//C subscribe button
3.2.3. 参考文献
《Concurrency in Go》
3.3. sync.Once
3.3.1. 使用场景
只需要初始化一次的操作,如数据库,MQ,ES的连接等等
func doOnce() {
once := &sync.Once{}
var num int
for i := 0; i < 10; i++ {
once.Do(func() {
num++
})
}
fmt.Println(num)
}
// 1
3.3.2. 源码分析
sync.Once的源码其实很简单,通过锁+标记就能使某个方法只执行一次
源码中也和我们提到了,sync.Once.Do的语义是当Do返回时,f()已经执行完了,但CAS的做法和这个语义并不吻合,所以没有使用
// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/386),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
// Note: Here is an incorrect implementation of Do:
//
// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
// f()
// }
//
// Do guarantees that when it returns, f has finished.
// This implementation would not implement that guarantee:
// given two simultaneous calls, the winner of the cas would
// call f, and the second would return immediately, without
// waiting for the first's call to f to complete.
// This is why the slow path falls back to a mutex, and why
// the atomic.StoreUint32 must be delayed until after f returns.
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
3.3.3. 参考文献
Go并发编程 — sync.Once 单实例模式的思考
Go 语言高性能编程 sync.Once如何提升性能
3.4. sync.WaitGroup
3.4.1. 使用场景
一个goroutine等待一组gorutine的场景
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
3.4.2. 实际场景
某接口查询条件限制1000条,但对于每次查询大于1000条,可以使用sync.WaitGroup
func WaitGroupTest() {
argv := []string{
"a", "b", "c",
}
wg := &sync.WaitGroup{}
for _, v := range argv {
v := v
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
fmt.Println(v)
time.Sleep(2 * time.Second)
}()
}
wg.Wait()
fmt.Println("Done!!!")
}
3.4.3. 源码分析
Go中看似简单的WaitGroup源码设计,竟然暗含这么多知识?
针对32位和64位的编译器,WaitGroup采取的策略也不同,详见下图
为什么counter和waiter要一起取呢?
- 为了并发安全,如果分开,可能会出现并发问题,导致对值操作的顺序错误
- atomic对64位操作,需要操作者自己保证64位对齐
Go中由WaitGroup引发对内存对齐思考
Golang WaitGroup 原理深度剖析
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// 64位对齐
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

Add方法
- Add方法会先counter和waiter
- 条件判断
- 如果counter = 0 & waiter > 0就会执行V操作,唤醒wait的goroutine
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
// 唤醒所有waiter
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}

Done操作其实就是Add(-1),这里使用Done语意更清晰
Wait
- waiter + 1
- P操作占用资源,等待V唤醒
- 检测WaitGroup是否被复用
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
// 根据地址加载值
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
return
}
// Increment waiters count.
// 原子waiter + 1
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// P操作,占用资源,等待V唤醒
runtime_Semacquire(semap)
// !=0说明WaitGroup被复用
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
3.5. channnel & goroutine
3.5.1. channel的本质
channel的源码表示是hchan,是一个用于通信和同步的有锁队列
channel使用互斥锁解决并发问题,目前社区中已经存在无锁队列的实现,但就目前来说,表现一般,且没有提供FIFO的特性,所以该方案暂时被搁浅了
有锁,无锁控制并发的思路:乐观锁和悲观锁,乐观锁和悲观锁并不一样,乐观锁并不是一种锁,是一种并发控制思想,本质是是基于验证的协议,也就是我们最常用的version
3.5.2. channel的数据结构
type hchan struct {
// 环形队列的长度
qcount uint // total data in the queue
// 环形队列的容量
dataqsiz uint // size of the circular queue
// 元素缓冲区
buf unsafe.Pointer // points to an array of dataqsiz elements
// chan中元素大小
elemsize uint16
// chan是否被关闭
closed uint32
// 元素类型
elemtype *_type // element type
// 发送队列的游标
sendx uint // send index
// 接收队列的游标
recvx uint // receive index
// 接收队列
recvq waitq // list of recv waiters
// 发送队列
sendq waitq // list of send waiters
// 锁
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
// 双向链表
type waitq struct {
first *sudog
last *sudog
}
// 双向链表的节点
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
3.5.3. 接收和阻塞的过程
func main() {
ch := make(chan int, 3)
checkChannelDataStructure(ch)
}
func checkChannelDataStructure(ch chan int) {
// 6个发送
go func() { ch <- 1 }()
go func() { ch <- 2 }()
go func() { ch <- 3 }()
go func() { ch <- 4 }()
go func() { ch <- 5 }()
go func() { ch <- 6 }()
// 8个接收
go func() { <-ch }()
go func() { <-ch }()
go func() { <-ch }()
go func() { <-ch }()
go func() { <-ch }()
go func() { <-ch }()
go func() { <-ch }()
go func() { <-ch }()
}
- make之后会先创建长度=3的buf数组,elemtype=2,也就是int类型,发送和接收的阻塞队列队头和队尾都为空
- 如果goroutine在buf长度范围内,直接进入buf中
- 如果发送消息的goroutine超过了buf的长度,就会被放在发送的阻塞队列中
- 接收元素,1和2已经被接收,4和5存入buf中
- 接收队列阻塞,有两个goroutine被存入其中,等待消息
3.6. GMP模型(TODO)
G---goroutine,用户线程
M---内核线程
P---调度器
一篇写得很好的GMP调度模型文章
[典藏版] Golang 调度器 GMP 原理与调度全分析