sync.Cond 作为 go 标准库提供的一个并发原语,但是可能你从来没听过,可见它使用场景挺少的,但是我们需要有这个知识储备,只有储备了之后才能在需要用的时候用出来。

其实如果你之前和我一样接触过 java,那么其实对于这个并发原语其实应该很熟悉,其实就是常说的等待通知机制,也就是 wait 方法和 notify 方法。

使用

我们首先从使用的角度的出发,先来看看 cond 是如何使用的

三个方法

首先我用最白话的方式描述一下 cond 的三个方法

  • Wait 当前调用者等待执行,直到被唤醒,调用该方法时需要加锁
  • Signal 唤醒一个调用者
  • Broadcast 唤醒所有调用者

一把锁一个队列

cond 初始化需要传入一个锁,用于并发控制,调用 wait 的时候需要加锁

cond 内部维护着一个队列,等待调用者排队等待

使用

我们创建两个 goroutine 使用 cond 等待执行任务,然后使用 signal 方法唤醒试试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"sync"
"time"
)

func main() {
cond := sync.NewCond(&sync.Mutex{})
go func() {
cond.L.Lock()
fmt.Println("a is waiting...")
cond.Wait()
fmt.Println("a was awakened")
cond.L.Unlock()
}()
go func() {
cond.L.Lock()
fmt.Println("b is waiting...")
cond.Wait()
fmt.Println("b was awakened")
cond.L.Unlock()
}()
time.Sleep(time.Second)
cond.Signal()
time.Sleep(time.Second)
cond.Signal()
time.Sleep(time.Second)
}
1
2
3
4
5
output: 
a is waiting...
b is waiting...
a was awakened
b was awakened

当然你也可以使用功能 Broadcast 方法全部一次性唤醒,输出也是一样的。

这里埋一个伏笔,我们这里两个 goroutine 都 阻塞在了 wait 方法,都没有 unlock 这里的互斥锁,但是我们看到 waiting 都打印出来了,那为什么可以这样做呢?

这个使用的给你的感觉是什么?我第一次看到 cond 的时候就给我的感觉是 waitgroup 的反向操作。

我们知道 waitgroup 可以描述为将一个大任务拆分成多个小任务,每次拆成一个任务就 add 一次,每一次任务完成就 done 一次,然后有人 wait 直到所有的任务都完成。而 cond 是不是刚好反了一下,是一堆人在等着执行,等着被唤醒执行,但是好像又不太一样。

源码分析

在看源码之前还是带着几个问题去看:

  1. wait 之前为什么需要 lock?
  2. signal 次数大于当前等待对象数量会有问题吗?
  3. broadcast 之后还能继续 wait 吗?

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify notifyList
checker copyChecker
}

type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32

// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32

// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}

可以看到结构非常简单,noCopy 和 checker 保证 cond 不能被 copy,否则会 panic,而且是个运行时检查。

剩下的就是一把锁一个队列了

方法

1
2
3
4
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

创建没啥好说的,就是传入一个锁赋值就可以了

1
2
3
4
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
1
2
3
4
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

Signal 和 Broadcast 都是 check 一下 cond 有没有被复制,然后就直接通过 sema 的 notify 方法将队列传入唤醒了

1
2
3
4
5
6
7
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

wait 方法也是类似,不过这里需要注意的一点是,这里首先 unlock 了一次,然后再开始 wait,这也就是解释了之前那个伏笔,并且也引出了为什么 wait 之前必须 lock,因为不 lock 的话直接 unlock 肯定报错

runtime_notifyListWait

首先我们来看 runtime_notifyListAdd

1
2
3
4
5
6
7
8
9
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}

非常简单就是将 notifyList 的中的 wait + 1,并且这是一个原子操作

runtime_notifyListWait

然后来看 runtime_notifyListWait 这里的第二个参数 t 就是上一个 Xadd 之后 -1 返回的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)

// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}

// Enqueue itself.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}

不难,说几个要点:

  • 如果当前传入的 t < notify 的话,证明已经被唤醒了,所以直接解锁返回
  • 获取一个 sudog 用于挂起
  • s.ticket = t 注意这里后面会用到,这里将 sudog 里面的 ticket 标记为当前队列长度
  • 当 tail 为 nil 证明是空队列,直接 head 赋值为 s;如果 tail 不为 nil 证明队列有元素直接链到队尾,并且将当前节点作为新的队尾
  • 然后 gopark 等着被唤醒就可以

runtime_notifyListNotifyOne

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// notifyListNotifyOne notifies one entry in the list.
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock at all.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}

lockWithRank(&l.lock, lockRankNotifyList)

// Re-check under the lock if we need to do anything.
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}

// Update the next notify ticket number.
atomic.Store(&l.notify, t+1)

// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
  • wait 和 notify 数量一致就没有人等着了,直接返回
  • lock 之后 double check 一次,并发编程的常规操作了
  • notify 的数量在原有数量上+1,因为这次唤醒一个新的了
  • 只有当 ticket 为 t 的时候证明才是下一个需要被唤醒的 sudog (上面的注释解释了这里为什么使用循环,大多数情况下就是 head 就是需要被唤醒的 sudog 了)
  • 然后就是队列出队的基本操作了
  • 最后 readyWithTime 调用 goready 唤醒对应的 sudog 执行就可以了

runtime_notifyListNotifyAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}

// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil

// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)

// Go through the local list and ready all waiters.
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}

看完 notify 方法然后再看 notifyAll 方法就很简单了,其实就是遍历了整个队列,对每一个 sudog 都 ready 一次就可以了

总结

总的来说 cond 的实现还是很容易理解的,并没有想的很复杂,只需要在使用的时候多加注意:wait 之前需要加锁。

和 java 比较起来,我记得一开始学的时候 notify 还是随机唤醒一个,然后后来根据不同的 jvm 有了不同的实现,hotspot 实现还是队列。

最后是使用,为什么我这么晚才写这个 cond 呢..其实拖延了很久了,因为在实际中没用过,就在最近在处理一个并发场景的时候偶发的用上了一下,就想着来补一下了。所以在实际中,可能你永远也用不到它,但是知道它,当个知识储备以防不时之需吧。