基本介绍
Mutex(互斥锁)
属于悲观锁,是 Go 标准库中sync中提供的一种用于控制多个 goroutine 之间对共享资源的并发访问的机制,通常用于避免多个协程同时访问共享数据,以防止竞态条件和数据竞争问题的发生。
同时 sync.Mutex
是一个不可重入锁,不会记录哪个 goroutine 拥有这把锁,对外提供三个方法用于并发控制,分别是Mutex.TryLock()
,Mutex.Lock()
和 Mutex.UnLock()
。
首先,对互斥锁 Mutex
的工作流程做一个整体的介绍:
在刚开始的时候,是处于 正常模式(非公平锁)
,请求锁的 goroutine 会自旋去尝试获取锁;
当自旋超过4次还没有获取到锁的时候,该 goroutine 会按照 FIFO(先入先出)
的顺序加入等待队列的尾部;
当锁被释放之后,会依次唤醒请求队列头部的 goroutine,但此时该 goroutine 并不会直接获取锁,而是需要和当前自旋的 goroutine 进行竞争,因为当前可能有多个 goroutine 在CPU中运行并自旋请求锁,所以从等待队列中取出来的 goroutine 大概率获取不到锁,失败之后该 goroutine 会被重新放回等待队列头部;
当一个 goroutine 超过 1ms 时仍未获取到锁,这把锁就会被切换到饥饿模式(公平锁)
;
在饥饿模式下,会直接把锁交给等待队列中排在第一位的 goroutine;同时新进来的 goroutine 不会参与抢锁也不会进入自旋状态,而是直接放入等待队列的尾部,
如果一个 goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式
源码分析
数据结构
type Mutex struct {
state int32 // 锁的状态
sema uint32 // 信号量
}
const (
mutexLocked = 1 >mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++ // 每次自旋结束后会累计次数,会影响下一轮runtime_canSpin(iter)的返回值
old = m.state // 更新状态
continue
}
new := old
// 计算新状态
if old&mutexStarving == 0 { // 如果不是饥饿并且原来锁是空闲的,就直接拿锁,标记mutexLocked
new |= mutexLocked
}
// 如果原来锁忙或者是饥饿状态,则让 waiterCount 加1,表示这个 goroutine 在等这个互斥锁
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 starvationThresholdNs
old = m.state
// 如果现在不是饥饿状态,直接进入下一次循环去尝试抢锁
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 这个位运算还是很骚的,一步完成了 置mutexLocked为1 和 waitCount - 1
delta := int32(mutexLocked - 1mutexWaiterShift == 1 {
// 如果除了当前 goroutine 以外没有其他请求锁的 goroutine,退出饥饿模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
// 自己尝试更新锁的状态没成功就说明此时有其他人更新了,获取一下新的锁状态
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlook
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
快速解锁失败则调用 m.unlockSlow(new)
慢解锁
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
// 正常模式的处理方法
if new&mutexStarving == 0 {
old := new
for {
// 没有等待者不用唤醒
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 有等待者唤醒队头 goroutine 移交锁的所有权
new = (old - 1