Skip to main content
 首页 » 编程设计

go中semaphore(信号量)源码解读

2022年07月19日147cloudgamer

运行时信号量机制 semaphore

前言

最近在看源码,发现好多地方用到了这个semaphore

本文是在go version go1.13.15 darwin/amd64上进行的

作用是什么

下面是官方的描述

// Semaphore implementation exposed to Go. 
// Intended use is provide a sleep and wakeup 
// primitive that can be used in the contended case 
// of other synchronization primitives. 
// Thus it targets the same goal as Linux's futex, 
// but it has much simpler semantics. 
// 
// That is, don't think of these as semaphores. 
// Think of them as a way to implement sleep and wakeup 
// such that every sleep is paired with a single wakeup, 
// even if, due to races, the wakeup happens before the sleep. 
 
// 具体的用法是提供 sleep 和 wakeup 原语 
// 以使其能够在其它同步原语中的竞争情况下使用 
// 因此这里的 semaphore 和 Linux 中的 futex 目标是一致的 
// 只不过语义上更简单一些 
// 
// 也就是说,不要认为这些是信号量 
// 把这里的东西看作 sleep 和 wakeup 实现的一种方式 
// 每一个 sleep 都会和一个 wakeup 配对 
// 即使在发生 race 时,wakeup 在 sleep 之前时也是如此   

上面提到了和futex作用一样,关于futex

futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具

Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。

go中的semaphore作用和futex目标一样,提供sleepwakeup原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine需要休眠时,将其进行集中存放,当需要wakeup时,再将其取出,重新放入调度器中。

例如在读写锁的实现中,读锁和写锁之前的相互阻塞唤醒,就是通过sleepwakeup实现,当有读锁存在的时候,新加入的写锁通过semaphore阻塞自己,当前面的读锁完成,在通过semaphore唤醒被阻塞的写锁。

写锁

// 获取互斥锁 
// 阻塞等待所有读操作结束(如果有的话) 
func (rw *RWMutex) Lock() { 
	... 
	// 原子的修改readerCount的值,直接将readerCount减去rwmutexMaxReaders 
	// 说明,有写锁进来了,这在上面的读锁中也有体现 
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders 
	// 当r不为0说明,当前写锁之前有读锁的存在 
	// 修改下readerWait,也就是当前写锁需要等待的读锁的个数   
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { 
		// 阻塞当前写锁 
		runtime_SemacquireMutex(&rw.writerSem, false, 0) 
	} 
	... 
} 

通过runtime_SemacquireMutex对当前写锁进行sleep

读锁释放

// 减少读操作计数,即readerCount-- 
// 唤醒等待写操作的协程(如果有的话) 
func (rw *RWMutex) RUnlock() { 
	... 
	// 首先通过atomic的原子性使readerCount-1 
	// 1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作 
	// 2.若readerCount小于0, 证明已经没有读锁, 但是还有因为读锁被阻塞的写锁存在 
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { 
		// 尝试唤醒被阻塞的写锁 
		rw.rUnlockSlow(r) 
	} 
	... 
} 
 
func (rw *RWMutex) rUnlockSlow(r int32) { 
	... 
	// readerWait--操作,如果readerWait--操作之后的值为0,说明,写锁之前,已经没有读锁了 
	// 通过writerSem信号量,唤醒队列中第一个阻塞的写锁 
	if atomic.AddInt32(&rw.readerWait, -1) == 0 { 
		// 唤醒一个写锁 
		runtime_Semrelease(&rw.writerSem, false, 1) 
	} 
} 

写锁处理完之后,调用runtime_Semrelease来唤醒sleep的写锁

几个主要的方法

go/src/sync/runtime.go中,定义了这几个方法

// Semacquire等待*s > 0,然后原子递减它。 
// 它是一个简单的睡眠原语,用于同步 
// library and不应该直接使用。 
func runtime_Semacquire(s *uint32) 
 
// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象 
// 如果lifo为true,waiter将会被插入到队列的头部 
// skipframes是跟踪过程中要省略的帧数,从这里开始计算 
// runtime_SemacquireMutex's caller. 
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) 
 
// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine 
// 它是一个简单的唤醒原语,用于同步 
// library and不应该直接使用。 
// 如果handoff为true, 传递信号到队列头部的waiter 
// skipframes是跟踪过程中要省略的帧数,从这里开始计算 
// runtime_Semrelease's caller. 
func runtime_Semrelease(s *uint32, handoff bool, skipframes int) 

具体的实现是在go/src/runtime/sema.go

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire 
func sync_runtime_Semacquire(addr *uint32) { 
	semacquire1(addr, false, semaBlockProfile, 0) 
} 
 
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease 
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { 
	semrelease1(addr, handoff, skipframes) 
} 
 
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex 
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { 
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) 
} 

如何实现

sudog 缓存

semaphore的实现使用到了sudog,我们先来看下

sudog 是运行时用来存放处于阻塞状态的goroutine的一个上层抽象,是用来实现用户态信号量的主要机制之一。 例如当一个goroutine因为等待channel的数据需要进行阻塞时,sudog会将goroutine及其用于等待数据的位置进行记录, 并进而串联成一个等待队列,或二叉平衡树。

// sudogs are allocated from a special pool. Use acquireSudog and 
// releaseSudog to allocate and free them. 
type sudog struct { 
	// 以下字段受hchan保护 
	g *g 
 
	// isSelect 表示 g 正在参与一个 select, so 
	// 因此 g.selectDone 必须以 CAS 的方式来获取wake-up race. 
	isSelect bool 
	next     *sudog 
	prev     *sudog 
	elem     unsafe.Pointer // 数据元素(可能指向栈) 
 
	// 以下字段不会并发访问。 
	// 对于通道,waitlink只被g访问。 
	// 对于信号量,所有字段(包括上面的字段) 
	// 只有当持有一个semroot锁时才被访问。 
	acquiretime int64 
	releasetime int64 
	ticket      uint32 
	parent      *sudog //semaRoot 二叉树 
	waitlink    *sudog // g.waiting 列表或 semaRoot 
	waittail    *sudog // semaRoot 
	c           *hchan // channel 
} 

sudog的获取和归还,遵循以下策略:

1、获取,首先从per-P缓存获取,对于per-P缓存,如果per-P缓存为空,则从全局池抓取一半,然后取出per-P缓存中的最后一个;

2、归还,归还到per-P缓存,如果per-P缓存满了,就把per-P缓存的一半归还到全局缓存中,然后归还sudogper-P缓存中。

acquireSudog

1、如果per-P缓存的内容没达到长度的一般,则会从全局额缓存中抓取一半;

2、然后返回把per-P缓存中最后一个sudog返回,并且置空;

// go/src/runtime/proc.go 
//go:nosplit 
func acquireSudog() *sudog { 
	// Delicate dance: 信号量的实现调用acquireSudog,然后acquireSudog调用new(sudog) 
	// new调用malloc, malloc调用垃圾收集器,垃圾收集器在stopTheWorld调用信号量 
	// 通过在new(sudog)周围执行acquirem/releasem来打破循环 
	// acquirem/releasem在new(sudog)期间增加m.locks,防止垃圾收集器被调用。 
 
	// 获取当前 g 所在的 m 
	mp := acquirem() 
	// 获取p的指针 
	pp := mp.p.ptr() 
	if len(pp.sudogcache) == 0 { 
		lock(&sched.sudoglock) 
		// 首先,尝试从中央缓存获取一批数据。 
		for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { 
			s := sched.sudogcache 
			sched.sudogcache = s.next 
			s.next = nil 
			pp.sudogcache = append(pp.sudogcache, s) 
		} 
		unlock(&sched.sudoglock) 
		// 如果中央缓存中没有,新分配 
		if len(pp.sudogcache) == 0 { 
			pp.sudogcache = append(pp.sudogcache, new(sudog)) 
		} 
	} 
	// 取缓存中最后一个 
	n := len(pp.sudogcache) 
	s := pp.sudogcache[n-1] 
	pp.sudogcache[n-1] = nil 
	// 将刚取出的在缓存中移除 
	pp.sudogcache = pp.sudogcache[:n-1] 
	if s.elem != nil { 
		throw("acquireSudog: found s.elem != nil in cache") 
	} 
	releasem(mp) 
	return s 
} 

releaseSudog

1、如果per-P缓存满了,就归还per-P缓存一般的内容到全局缓存;

2、然后将回收的sudog放到per-P缓存中。

// go/src/runtime/proc.go 
//go:nosplit 
func releaseSudog(s *sudog) { 
	if s.elem != nil { 
		throw("runtime: sudog with non-nil elem") 
	} 
	if s.isSelect { 
		throw("runtime: sudog with non-false isSelect") 
	} 
	if s.next != nil { 
		throw("runtime: sudog with non-nil next") 
	} 
	if s.prev != nil { 
		throw("runtime: sudog with non-nil prev") 
	} 
	if s.waitlink != nil { 
		throw("runtime: sudog with non-nil waitlink") 
	} 
	if s.c != nil { 
		throw("runtime: sudog with non-nil c") 
	} 
	gp := getg() 
	if gp.param != nil { 
		throw("runtime: releaseSudog with non-nil gp.param") 
	} 
	// 避免重新安排到另一个P 
	mp := acquirem() // avoid rescheduling to another P 
	pp := mp.p.ptr() 
	// 如果缓存满了 
	if len(pp.sudogcache) == cap(pp.sudogcache) { 
		// 将本地高速缓存的一半传输到中央高速缓存 
		var first, last *sudog 
		for len(pp.sudogcache) > cap(pp.sudogcache)/2 { 
			n := len(pp.sudogcache) 
			p := pp.sudogcache[n-1] 
			pp.sudogcache[n-1] = nil 
			pp.sudogcache = pp.sudogcache[:n-1] 
			if first == nil { 
				first = p 
			} else { 
				last.next = p 
			} 
			last = p 
		} 
		lock(&sched.sudoglock) 
		last.next = sched.sudogcache 
		sched.sudogcache = first 
		unlock(&sched.sudoglock) 
	} 
	// 归还sudog到`per-P`缓存中 
	pp.sudogcache = append(pp.sudogcache, s) 
	releasem(mp) 
} 

semaphore

// go/src/runtime/sema.go 
// 用于sync.Mutex的异步信号量。 
 
// semaRoot拥有一个具有不同地址(s.elem)的sudog平衡树。 
// 每个sudog都可以依次(通过s.waitlink)指向一个列表,在相同地址上等待的其他sudog。 
// 对具有相同地址的sudog内部列表进行的操作全部为O(1)。顶层semaRoot列表的扫描为O(log n), 
// 其中,n是阻止goroutines的不同地址的数量,通过他们散列到给定的semaRoot。 
type semaRoot struct { 
	lock  mutex 
	// waiters的平衡树的根节点 
	treap *sudog 
	// waiters的数量,读取的时候无所 
	nwait uint32 
} 
 
// Prime to not correlate with any user patterns. 
const semTabSize = 251 
 
var semtable [semTabSize]struct { 
	root semaRoot 
	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte 
} 

poll_runtime_Semacquire/sync_runtime_SemacquireMutex

// go/src/runtime/sema.go 
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire 
func poll_runtime_Semacquire(addr *uint32) { 
	semacquire1(addr, false, semaBlockProfile, 0) 
} 
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex 
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { 
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) 
} 
 
 
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { 
	// 判断这个goroutine,是否是m上正在运行的那个 
	gp := getg() 
	if gp != gp.m.curg { 
		throw("semacquire not on the G stack") 
	} 
 
	// *addr -= 1 
	if cansemacquire(addr) { 
		return 
	} 
 
	// 增加等待计数 
	// 再试一次 cansemacquire 如果成功则直接返回 
	// 将自己作为等待者入队 
	// 休眠 
	// (等待器描述符由出队信号产生出队行为) 
 
	// 获取一个sudog 
	s := acquireSudog() 
	root := semroot(addr) 
	t0 := int64(0) 
	s.releasetime = 0 
	s.acquiretime = 0 
	s.ticket = 0 
	if profile&semaBlockProfile != 0 && blockprofilerate > 0 { 
		t0 = cputicks() 
		s.releasetime = -1 
	} 
	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { 
		if t0 == 0 { 
			t0 = cputicks() 
		} 
		s.acquiretime = t0 
	} 
	for { 
		lock(&root.lock) 
		// 添加我们自己到nwait来禁用semrelease中的"easy case" 
		atomic.Xadd(&root.nwait, 1) 
		// 检查cansemacquire避免错过唤醒 
		if cansemacquire(addr) { 
			atomic.Xadd(&root.nwait, -1) 
			unlock(&root.lock) 
			break 
		} 
		// 任何在 cansemacquire 之后的 semrelease 都知道我们在等待(因为设置了 nwait),因此休眠 
 
		// 队列将s添加到semaRoot中被阻止的goroutine中 
		root.queue(addr, s, lifo) 
		// 将当前goroutine置于等待状态并解锁锁。 
		// 通过调用goready(gp),可以使goroutine再次可运行。 
		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) 
		if s.ticket != 0 || cansemacquire(addr) { 
			break 
		} 
	} 
	if s.releasetime > 0 { 
		blockevent(s.releasetime-t0, 3+skipframes) 
	} 
 
	// 归还sudog 
	releaseSudog(s) 
} 
 
func cansemacquire(addr *uint32) bool { 
	for { 
		v := atomic.Load(addr) 
		if v == 0 { 
			return false 
		} 
		if atomic.Cas(addr, v, v-1) { 
			return true 
		} 
	} 
} 

sync_runtime_Semrelease

// go/src/runtime/sema.go 
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease 
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { 
	semrelease1(addr, handoff, skipframes) 
} 
 
func semrelease1(addr *uint32, handoff bool, skipframes int) { 
	root := semroot(addr) 
	atomic.Xadd(addr, 1) 
 
	// Easy case:没有等待者 
	// 这个检查必须发生在xadd之后,以避免错过唤醒 
	if atomic.Load(&root.nwait) == 0 { 
		return 
	} 
 
	// Harder case: 找到等待者,并且唤醒 
	lock(&root.lock) 
	if atomic.Load(&root.nwait) == 0 { 
		// 该计数已被另一个goroutine占用, 
		// 因此无需唤醒其他goroutine。 
		unlock(&root.lock) 
		return 
	} 
 
	// 搜索一个等待着然后将其唤醒 
	s, t0 := root.dequeue(addr) 
	if s != nil { 
		atomic.Xadd(&root.nwait, -1) 
	} 
	unlock(&root.lock) 
	if s != nil { // 可能会很慢,因此先解锁 
		acquiretime := s.acquiretime 
		if acquiretime != 0 { 
			mutexevent(t0-acquiretime, 3+skipframes) 
		} 
		if s.ticket != 0 { 
			throw("corrupted semaphore ticket") 
		} 
		if handoff && cansemacquire(addr) { 
			s.ticket = 1 
		} 
		// goready(s.g, 5)  
		// 标记 runnable,等待被重新调度 
		readyWithTime(s, 5+skipframes) 
	} 
} 

摘自"同步原语"的一段总结

这一对 semacquire 和 semrelease 理解上可能不太直观。 首先,我们必须意识到这两个函数一定是在两个不同的 M(线程)上得到执行,否则不会出现并发,我们不妨设为 M1 和 M2。 当 M1 上的 G1 执行到 semacquire1 时,如果快速路径成功,则说明 G1 抢到锁,能够继续执行。但一旦失败且在慢速路径下 依然抢不到锁,则会进入 goparkunlock,将当前的 G1 放到等待队列中,进而让 M1 切换并执行其他 G。 当 M2 上的 G2 开始调用 semrelease1 时,只是单纯的将等待队列的 G1 重新放到调度队列中,而当 G1 重新被调度时(假设运气好又在 M1 上被调度),代码仍然会从 goparkunlock 之后开始执行,并再次尝试竞争信号量,如果成功,则会归还 sudog。

参考

【同步原语】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go并发编程实战--信号量的使用方法和其实现原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【进程同步之信号量机制(pv操作)及三个经典同步问题】https://blog.csdn.net/SpeedMe/article/details/17597373

本文作者:liz
本文链接https://boilingfrog.github.io/2021/04/02/semaphore/
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。


本文参考链接:https://www.cnblogs.com/ricklz/p/14610213.html
阅读延展