【Let's Go!】 sync源码面前无秘密

3. sync源码面前无秘密

3.1. context

3.1.1. 背景&作用

背景:Go的服务端开发,几乎所有的request都是默认生成一个goroutine进行处理,但这也存在一个问题,如果该请求超时或取消了,所有在该请求上的goroutine如何迅速退出,并释放资源让系统回收?

作用:context的作用就是为了解决这个问题,在一组goroutine之间传递共享值,取消信号,deadline

3.1.2. 源码分析

Context源码

// Context是并发安全且幂等的
type Context interface {
    // 返回 context 是否会被取消以及自动取消时间(即 deadline)
    Deadline() (deadline time.Time, ok bool)

    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
    Done() <-chan struct{}

    // 在 channel Done 关闭后,返回 context 取消原因
    Err() error

    // 获取 key 对应的 value
    Value(key interface{}) interface{}
}

四种类型的Context

emptyCtx源码

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

func Background() Context {
		return background
}

func TODO() Context {
		return todo
}

cacelCtx源码

用于手动取消,超时取消

// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
    cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

// 递归移除
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
      panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
      c.mu.Unlock()
      return // already canceled
    }
    c.err = err
    if c.done == nil {
      c.done = closedchan
    } else {
      close(c.done)
    }
    for child := range c.children {
      // NOTE: acquiring the child's lock while holding parent's lock.
      child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
      removeChild(c.Context, c)
    }
}

timerCtx源码

cancleCtx的补充场景,用于定时取消

// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

valueCtx源码

type valueCtx struct {
    Context
    key, val interface{}
}

如何查找一个value,Context并没有通过一个map去存储,而是递归遍历整个Context树

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
      return c.val
    }
    return c.Context.Value(key)
}

context的传递

共有WithValue,WithDeadline(WithTimeout调用的也是WithDeadline),WithCancel,Background,TODO

主要通过propagateCancel完成

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    if parent == nil {
      panic("cannot create context from nil parent")
    }
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    done := parent.Done()
    if done == nil {
      return // parent is never canceled
    }

    select {
    case <-done:
      // parent is already canceled
      child.cancel(false, parent.Err())
      return
    default:
    }

   	// context内部可识别的类型
    if p, ok := parentCancelCtx(parent); ok {
      p.mu.Lock()
      if p.err != nil {
        // parent has already been canceled
        child.cancel(false, p.err)
      } else {
        if p.children == nil {
          p.children = make(map[canceler]struct{})
        }
        // 构建父子关系
        p.children[child] = struct{}{}
      }
      p.mu.Unlock()
    } else {
      atomic.AddInt32(&goroutines, +1)
      go func() {
        select {
        case <-parent.Done():
          child.cancel(false, parent.Err())
        case <-child.Done():
        }
      }()
    }
}

取消context

cancelCtx类型

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
      panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
      c.mu.Unlock()
      return // already canceled
    }
    c.err = err
    if c.done == nil {
      c.done = closedchan
    } else {
      close(c.done)
    }
    for child := range c.children {
      // NOTE: acquiring the child's lock while holding parent's lock.
      child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
      removeChild(c.Context, c)
    }
}

timerCtx类型

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
      // Remove this timerCtx from its parent cancelCtx's children.
      removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
      c.timer.Stop()
      c.timer = nil
    }
    c.mu.Unlock()
}

3.1.3. 最佳实践

小建议:When designing an API with context, remember the advice: pass context.Context in as an argument; don't store it in structs.

原因:将context作为结构体的属性,调用者对所使用的 context 生命周期产生迷惑,如有调用方想用 WitchCancel,有的想用 WithDeadline,也很难理解上面传来的 context 的作用是 cancel?还是 deadline?调用者所使用 context 的生命周期被绑定到了一个共享的 context 上面。

原文:Contexts and structs

翻译:官发博文:进一步阐明关于 context 的最佳实践

监听任务是否结束或取消

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    select {
    case <-ctx.Done():
      fmt.Println(ctx.Err())
    case <-time.After(3 * time.Second):
      fmt.Println("任务完成")
    }
}

// context deadline exceeded

3.1.4. 参考文献

一起聊聊 Go Context 的正确使用姿势

深度解密Go语言之context

Go语言设计与实现 上下文 Context

3.2. sync.Cond

3.2.1. 使用场景

一个goroutine的集合点,等待或发布一个event

这里的event是两个或两个以上的goroutine之间的任意信号

如果不使用sync.Cond,那么可能会使用的就是for轮询等待,这将消耗一个CPU核心的所有周期,也许你想用sleep去等待,但这仍然是低效的,因为你不知道应该sleep多久,最好有一个在event完成时就触发通知的机制,sync.Cond就提供了这样的一个功能。

3.2.2. 最佳实践

如果队列长度超过2,就会wait,直到队列长度<2,就会触发signal,通知wait的goroutine继续执行下去

func doCond() {
    c := sync.NewCond(&sync.Mutex{})
    queue := make([]interface{}, 0, 10)

    removeFromQueue := func(dur time.Duration) {
      time.Sleep(dur)
      c.L.Lock()
      queue = queue[1:]
      fmt.Println("Remove success!")
      c.L.Unlock()
      c.Signal()
		}

    for i := 0; i < 10; i++ {
      c.L.Lock()
      for len(queue) == 2 {
        c.Wait()
      }
      queue = append(queue, struct{}{})
      fmt.Println("Add success!")
      go removeFromQueue(1 * time.Second)
      c.L.Unlock()
    }
}
//Add success!
//Add success!
//Remove success!
//Remove success!
//Add success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Add success!
//Remove success!
//Remove success!
//Add success!
//Add success!

A,B,C同时订阅了button,只要button被触发,就broadcast给A,B,C

func doBroadcastCond() {
    type Button struct {
      Clicked *sync.Cond
    }

    button := Button{Clicked: sync.NewCond(&sync.Mutex{})}
    subscribe := func(c *sync.Cond, fn func()) {
      wg := sync.WaitGroup{}
      wg.Add(1)
      go func() {
        wg.Done()
        c.L.Lock()
        defer c.L.Unlock()
        c.Wait()
        fn()
      }()
      wg.Wait()
    }

    wg := sync.WaitGroup{}
    wg.Add(3)
    subscribe(button.Clicked, func() {
      fmt.Println("A subscribe button")
      wg.Done()
    })
    subscribe(button.Clicked, func() {
      fmt.Println("B subscribe button")
      wg.Done()
    })
    subscribe(button.Clicked, func() {
      fmt.Println("C subscribe button")
      wg.Done()
    })
    time.Sleep(2 * time.Second)

    // 不加会死锁
    button.Clicked.Broadcast()
    wg.Wait()
}
//A subscribe button
//B subscribe button
//C subscribe button

3.2.3. 参考文献

《Concurrency in Go》

3.3. sync.Once

3.3.1. 使用场景

只需要初始化一次的操作,如数据库,MQ,ES的连接等等

func doOnce() {
    once := &sync.Once{}
    var num int
    for i := 0; i < 10; i++ {
      once.Do(func() {
        num++
      })
    }
    fmt.Println(num)
}
// 1

3.3.2. 源码分析

sync.Once的源码其实很简单,通过锁+标记就能使某个方法只执行一次

源码中也和我们提到了,sync.Once.Do的语义是当Do返回时,f()已经执行完了,但CAS的做法和这个语义并不吻合,所以没有使用

// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
type Once struct {
    // done indicates whether the action has been performed.
    // It is first in the struct because it is used in the hot path.
    // The hot path is inlined at every call site.
    // Placing done first allows more compact instructions on some architectures (amd64/386),
    // and fewer instructions (to calculate offset) on other architectures.
    done uint32
    m    Mutex
}

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// 	var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// 	config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
    // Note: Here is an incorrect implementation of Do:
    //
    //	if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
    //		f()
    //	}
    //
    // Do guarantees that when it returns, f has finished.
    // This implementation would not implement that guarantee:
    // given two simultaneous calls, the winner of the cas would
    // call f, and the second would return immediately, without
    // waiting for the first's call to f to complete.
    // This is why the slow path falls back to a mutex, and why
    // the atomic.StoreUint32 must be delayed until after f returns.

    if atomic.LoadUint32(&o.done) == 0 {
      // Outlined slow-path to allow inlining of the fast-path.
      o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
      defer atomic.StoreUint32(&o.done, 1)
      f()
    }
}

3.3.3. 参考文献

Go并发编程 — sync.Once 单实例模式的思考

Go 语言高性能编程 sync.Once如何提升性能

3.4. sync.WaitGroup

3.4.1. 使用场景

一个goroutine等待一组gorutine的场景

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.

3.4.2. 实际场景

某接口查询条件限制1000条,但对于每次查询大于1000条,可以使用sync.WaitGroup

func WaitGroupTest() {
    argv := []string{
      "a", "b", "c",
    }
    wg := &sync.WaitGroup{}
    for _, v := range argv {
      v := v
      wg.Add(1)
      go func() {
        defer func() {
          wg.Done()
        }()
        fmt.Println(v)
        time.Sleep(2 * time.Second)
      }()
    }
    wg.Wait()
    fmt.Println("Done!!!")
}

3.4.3. 源码分析

Go中看似简单的WaitGroup源码设计,竟然暗含这么多知识?

针对32位和64位的编译器,WaitGroup采取的策略也不同,详见下图

为什么counter和waiter要一起取呢?

  1. 为了并发安全,如果分开,可能会出现并发问题,导致对值操作的顺序错误
  2. atomic对64位操作,需要操作者自己保证64位对齐

Go中由WaitGroup引发对内存对齐思考

Golang WaitGroup 原理深度剖析

// A WaitGroup must not be copied after first use.
type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    state1 [3]uint32
}

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
  	// 64位对齐
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
      return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
      return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

图片

Add方法

  1. Add方法会先counter和waiter
  2. 条件判断
  3. 如果counter = 0 & waiter > 0就会执行V操作,唤醒wait的goroutine
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()

    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)

    if v < 0 {
      panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
      return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    if *statep != state {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    *statep = 0
  
  	// 唤醒所有waiter
    for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
    }
}

图片

Done操作其实就是Add(-1),这里使用Done语意更清晰

Wait

  1. waiter + 1
  2. P操作占用资源,等待V唤醒
  3. 检测WaitGroup是否被复用
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
   
    for {
      // 根据地址加载值
      state := atomic.LoadUint64(statep)
      v := int32(state >> 32)
      w := uint32(state)
      if v == 0 {
        // Counter is 0, no need to wait.
        return
      }
      // Increment waiters count.
      // 原子waiter + 1
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
        
        // P操作,占用资源,等待V唤醒
        runtime_Semacquire(semap)
        
        // !=0说明WaitGroup被复用
        if *statep != 0 {
          panic("sync: WaitGroup is reused before previous Wait has returned")
        }
       
        return
      }
    }
}

3.5. channnel & goroutine

3.5.1. channel的本质

channel的源码表示是hchan,是一个用于通信和同步的有锁队列

channel使用互斥锁解决并发问题,目前社区中已经存在无锁队列的实现,但就目前来说,表现一般,且没有提供FIFO的特性,所以该方案暂时被搁浅了

有锁,无锁控制并发的思路:乐观锁和悲观锁,乐观锁和悲观锁并不一样,乐观锁并不是一种锁,是一种并发控制思想,本质是是基于验证的协议,也就是我们最常用的version

3.5.2. channel的数据结构

type hchan struct {
  	// 环形队列的长度
    qcount   uint           // total data in the queue
  
  	// 环形队列的容量
    dataqsiz uint           // size of the circular queue
  
  	// 元素缓冲区
    buf      unsafe.Pointer // points to an array of dataqsiz elements
  
  	// chan中元素大小
    elemsize uint16
  
  	// chan是否被关闭
    closed   uint32
  	
  	// 元素类型
    elemtype *_type // element type
  
  	// 发送队列的游标
    sendx    uint   // send index
  
  	// 接收队列的游标
    recvx    uint   // receive index
  
  	// 接收队列
    recvq    waitq  // list of recv waiters
  
  	// 发送队列
    sendq    waitq  // list of send waiters

  	// 锁
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

// 双向链表
type waitq struct {
    first *sudog
    last  *sudog
}

// 双向链表的节点
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    // success indicates whether communication over channel c
    // succeeded. It is true if the goroutine was awoken because a
    // value was delivered over channel c, and false if awoken
    // because c was closed.
    success bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

3.5.3. 接收和阻塞的过程

func main() {
	ch := make(chan int, 3)
	checkChannelDataStructure(ch)
}

func checkChannelDataStructure(ch chan int) {
	// 6个发送
	go func() { ch <- 1 }()
	go func() { ch <- 2 }()
	go func() { ch <- 3 }()
	go func() { ch <- 4 }()
	go func() { ch <- 5 }()
	go func() { ch <- 6 }()

	// 8个接收
	go func() { <-ch }()
	go func() { <-ch }()
	go func() { <-ch }()
	go func() { <-ch }()
	go func() { <-ch }()
	go func() { <-ch }()
	go func() { <-ch }()
	go func() { <-ch }()
}
  1. make之后会先创建长度=3的buf数组,elemtype=2,也就是int类型,发送和接收的阻塞队列队头和队尾都为空
image-20210919172316741
  1. 如果goroutine在buf长度范围内,直接进入buf中
image-20210919172720943
  1. 如果发送消息的goroutine超过了buf的长度,就会被放在发送的阻塞队列中
image-20210919173204264
  1. 接收元素,1和2已经被接收,4和5存入buf中
image-20210919173928483
  1. 接收队列阻塞,有两个goroutine被存入其中,等待消息
image-20210919174340376

3.6. GMP模型(TODO)

G---goroutine,用户线程

M---内核线程

P---调度器

一篇写得很好的GMP调度模型文章

[典藏版] Golang 调度器 GMP 原理与调度全分析

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×