// 分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)
// wg 用来等待程序完成
// 计数加 2,表示要等待两个 goroutine
var wg sync.WaitGroup
wg.Add(2)
wg.Wait()
原子函数能够以很底层的加锁机制来同步访问整型变量和指针。
import "sync/atomic"
// 安全地对 counter 加 1
atomic.AddInt64(&counter, 1)
这个函数会同步整型值的加法,方法是强制同一时刻只能有一个 goroutine 运行并完成这个加法操作。当 goroutine 试图去调用任何原子函数时,这些 goroutine 都会自动根据所引用的变量做同步处理。 另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读 和写一个整型值的方式。
互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码。 Lock()和 Unlock()函数调用定义的临界 区里被保护起来
原子函数和互斥锁都能工作,但是依靠它们都不会让编写并发程序变得更简单,更不容易出 错,或者更有趣。在 Go 语言里,你不仅可以使用原子函数和互斥锁来保证对共享资源的安全访 问以及消除竞争状态,还可以使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做 同步。
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通 道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送 和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在
创建两个goroutine,然后一起监听,接受然后发送,会两个依次接受到?不会一起并发么?自己发送自己接收到?
具体例子参考:go语言实战
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类 型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的 条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲 区容纳被发送的值时,发送动作才会阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大 的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的 通道没有这种保护。
程序和 Go 语言的调度器带有随机成分,这个程序每次执行得到的输出会不一样。
双重检测锁(DCL):
func GetInstance() *singleton {
if instance == nil {
mu.Lock()
defer mu.Unlock()
if instance == nil {
instance = &singleton{}
}
}
return instance
}
通过上面的Check-Lock-Check机制,的确可以解决锁竟争的问题,但是这种方法不管是否singleton实例对象是否已创建,每次都要执行两次check才是一个完整的判断,那有没有方法使得只要一次check就可以完成对singleton实例对象是否存在的检查呢? 有!通过golang的sync/atomic包提供的原子性操作可以更高效的完成这个检查
func GetInstance() *singleton {
if atomic.LoadUInt32(&initialized) == 1 {
return instance
}
mu.Lock()
defer mu.Unlock()
if initialized == 0 {
instance = &singleton{}
atomic.StoreUint32(&initialized, 1)
}
return instance
}
文章所说的两次检查改一次检查不是很理解。是说多个并发初始化的时候,其他非第一次的可以不用再次检查。可是
if initialized == 0 {
不也是第二次检查了么?还是因为原子操作阻塞了。而两次检查是卡在Mutex中?不是这样吧。见下Once
可以看到我们之前其实是借鉴了golang原生标准包sync中对Once实现对源码,那既然标准包中已经实现了这个Check-Lock-Check机制,那我们直接调用sync包提供once.Do()方法对某个方法只进行一次性调用:
package singleton
import (
"sync"
)
type singleton struct {
}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
keyword: 可见性
这里有几个值得注意的点:
- Do 方法中的第一层检测atomic.LoadUint32(&o.done) == 0是否应该用 CAS 来实现?
- 如果不是,为什么不只用o.done == 0(就像 java 单例的双重检测锁那样)?
- doSlow 方法的临界区内,为什么第二层检测时直接读取 done,而写值却用了 atomic 操作?
因此这里并不能使用 CAS,因为在并发执行时,CAS 抢夺失败的那个 goroutine 会直接返回,而此时有可能抢夺成功的那个还没执行完成,所以不能达到设计目标。因此源代码中使用 atomic.LoadUint32(&o.done) == 0和o.done == 0结合互斥锁实现了双重检测锁,将同时执行的 goroutine 阻塞在 Mutex 处直到抢夺成功的函数执行完成。
但是 go 并没有给我们提供volatile这个关键字,那我们该如何保证可观测呢?我以前写过一片文章提到过这个问题为什么 golang 没有 volatile? 文中提到过,go 鼓励开发者"通过通信去共享内存,而不是通过共享内存去通信"。但是目前看来,这个观念的转换过程并不是那么顺利——至少在这里,1.13.4 的标准库源码中 Once.go 的源码中还在使用共享变量进行通信
if atomic.LoadUint32(&o.done) == 0 {
因此如果不做同步而直接用o.done == 0的话肯定无法观测到 doSlow 方法中设置的 done 值。
Once.done 是一个共享变量,那么怎么保证其并发可观测呢?办法有很多,加锁可以,但是基于原子操作的 atomic.Load/Store是一个更轻量且高效的选择
也就是跟dcl的区别?减少并发时候的并发o.done为0的次数。进而避免第二次检查。
临界区能否保证另一个 goroutine 的 atomic.Load*读取到这里设置的值呢?不能。
而如果在临界区外部对变量进行读写则,无法保证能被观测到,正如 Once 中的情形一样。这也就解释了为什么 doSlow 中写 done 时即使是在临界区内也要使用 atomic.StoreUint32 进行写操作。
至于 doSlow 中第二重检测时为什么是直接读取 done 而没有用 atomic 操作,因为这个读操作和临界区内的写操作均处于临界区内,已经保证了对写操作的可观测性,自然不需要 atomic 了。
也就是读写在同一临界区。也就是已经没有其他的goroutine能进行值的更改了?什么叫已经保证了对写操作的可观测性,自然不需要 atomic 了。
注意到,Store 的实现中使用了一个交换指令 XCHGL,在 x86 架构上它实际上包含了一个 LOCK 前缀。是不是比较眼熟?没错,他就是 java 的 volatile 变量读写操作 经过 JIT 编译后携带的指令前缀,volatile 就是通过它保证了可见性。因此到这里我们可以确定 atomic 操作的可观测性。
类似java并发变成的原子操作
为什么要把 done 这个成员变量放在结构体的第一位?
hot path: 泛指编译后的 go 程序中极度频繁运行的指令集合,其特点是所有的hot path中的调用都会被内联编译以保证运行速度。
什么是内联编译?就是每次调用都会生成一次完整的操作指令直接执行,而不是通过跳转指令跳转至定义处执行。这样做的好处是提高运行速度,但缺点也很明显:某个调用很频繁的话就会大量浪费空间,造成编译后的程序体积膨胀。
而为了保证其内联编译的体积,这里做的一个优化就是将 done 放置于结构体的第一位,好处是能直接通过结构体地址读取其值 (原理类似 c 语言中数组的地址等于首位元素的地址),节省了用于计算偏移量的指令,缓解了内联编译后的体积膨胀问题。
原子性的操作比锁性能好:Mutex由操作系统实现,而atomic包中的原子操作则由底层硬件直接提供支持。
哈希表在有并发的场景并不安全:同时读写一个哈希表的后果是不确定的。如果你需要使用goroutines同时对一个哈希表做读写,对哈希表的访问需要通过某种同步机制做协调。一个常用的方法是是使用 sync.RWMutex。
这个语句声明了一个counter变量,这是一个包含了一个map和sync.RWMutex的匿名结构体。
var counter = struct{
sync.RWMutex
m map[string]int
}{m: make(map[string]int)}
读counter前,获取读锁:
counter.RLock()
n := counter.m["some_key"]
counter.RUnlock()
fmt.Println("some_key:", n)
写counter前,获取写锁
counter.Lock()
counter.m["some_key"]++
通过引入两个map,将读写分离到不同的map,其中read map只提供读,而dirty map则负责写. 这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数,当未命中数到达一定数量后,将dirty map上升为read map. 另外,虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值.
从以上的源码可知,sync.map并不适合同时存在大量读写的场景,大量的写会导致read map读取不到数据从而加锁进行进一步读取,同时dirty map不断升级为read map.
从而导致整体性能较低,特别是针对cache场景.针对append-only以及大量读,少量写场景使用sync.map则相对比较合适.
对于map,还有一种基于hash的实现思路,具体就是对map加读写锁,但是分配n个map,根据对key做hash运算确定是分配到哪个map中.
这样锁的消耗就降到了1/n(理论值).具体实现可见:concurrent-map
相比之下, 基于hash的方式更容易理解,整体性能较稳定. sync.map在某些场景性能可能差一些,但某些场景却能取得更好的效果.
所以还是要根据具体的业务场景进行取舍.