翻译于 2015/05/04 14:22
2 人 顶 此译文
The default Go implementation ofsync.RWMutex does not scale well
to multiple cores, as all readers contend on the same memory location
when they all try to atomically increment it. This gist explores ann
-way RWMutex, also known as a "big reader" lock, which gives each
CPU core its own RWMutex. Readers take only a read lock local to their
core, whereas writers must take all locks in order.
To determine which lock to take, readers use the CPUID instruction, which gives the APICID of the currently active CPU without having to issue a system call or modify the runtime. This instruction is supported on both Intel and AMD processors; ARM CPUs should use the CPU ID registerinstead. For systems with more than 256 processors, x2APIC must be used, and the EDX register after CPUID with EAX=0xb should be used instead. A mapping from APICID to CPU index is constructed (using CPU affinity syscalls) when the program is started, as it is static for the lifetime of a process. Since the CPUID instruction can be fairly expensive, goroutines will also only periodically update their estimate of what core they are running on. More frequent updates lead to less inter-core lock traffic, but also increases the time spent on CPUID instructions relative to the actual locking.
Go语言默认的 sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种 n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个 CPU 内核分配独立的 RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。
读者使用 CPUID 指令来决定使用何种锁,该指令仅需返回当前活动 CPU 的 APICID,而不需要发出系统调用指令抑或改变运行时。这在 Intel 或 AMD 处理器上均是可以的;ARM 处理器则需要使用 CPU ID 寄存器 。 对于超过 256 个处理器的系统,必须使用 x2APIC, 另外除了 CPUID 还要用到带有EAX=0xb 的 EDX 寄存器。程序启动时,会构建(通过 CPU 亲和力系统调用) APICID 到 CPU 索引的映射, 该映射在处理器的整个生命周期中静态存在。由于 CPUID 指令的开销可能相当昂贵,goroutine 将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的 CPUID 指令时间增加。
Stale CPU information.The information of which CPU a goroutine is running on might be stale when we take the lock (the goroutine could have been moved to another core), but this will only affect performance, not correctness, as long as the reader remembers which lock it took. Such moves are also unlikely, as the OS kernel tries to keep threads on the same core to improve cache hits.
There are many parameters that affect the performance characteristics of this scheme. In particular, the frequency of CPUID checking, the number of readers, the ratio of readers to writers, and the time readers hold their locks, are all important. Since only a single writer is active at the time, the duration a writer holds a lock for does not affect the difference in performance between sync.RWMutex and DRWMutex.
陈旧的 CPU 信息。如果加上锁运行 goroutine 的 CPU 信息可能会是过时的 (goroutine 会转移到另一个核心)。在 reader 记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。
这个模式的性能特征会被大量的参数所影响。特别是 CPUID 检测频率,readers 的数量,readers 和 writers 的比率,还有 readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个 writer 活跃的时候,这个 writer 持有锁的时期不会影响 sync.RWMutex 和 DRWMutex 之间的性能差异。
Experiments show that DRWMutex performs better the more cores the system has, and in particular when the fraction of writers is <1%, and CPUID is called at most every 10 locks (this changes depending on the duration a lock is held for). Even on few cores, DRWMutex outperforms sync.RWMutex under these conditions, which are common for applications that elect to use sync.RWMutex over sync.Mutex.
The plot below shows mean performance across 10 runs as the number of cores increases using:
drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100
Error bars denote 25th and 75th percentile. Note the drops every 10th core; this is because 10 cores constitute a NUMA node on the machine the benchmarks were run on, so once a NUMA node is added, cross-core traffic becomes more expensive. Performance increases for DRWMutex as more readers can work in parallel compared to sync.RWMutex.
See the go-nuts threadfor further discussion.
cpu_amd64.s
#include "textflag.h" // func cpu() uint64 TEXT 路cpu(SB),NOSPLIT,$0-8 MOVL $0x01, AX // version information MOVL $0x00, BX // any leaf will do MOVL $0x00, CX // any subleaf will do // call CPUID BYTE $0x0f BYTE $0xa2 SHRQ $24, BX // logical cpu id is put in EBX[31-24] MOVQ BX, ret+0(FP) RET
main.go
package main import ( "flag" "fmt" "math/rand" "os" "runtime" "runtime/pprof" "sync" "syscall" "time" "unsafe" ) func cpu() uint64 // implemented in cpu_amd64.s var cpus map[uint64]int // determine mapping from APIC ID to CPU index by pinning the entire process to // one core at the time, and seeing that its APIC ID is. func init() { cpus = make(map[uint64]int) var aff uint64 syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff))) n := 0 start := time.Now() var mask uint64 = 1 Outer: for { for (aff & mask) == 0 { mask <<= 1 if mask == 0 || mask > aff { break Outer } } ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask))) if ret != 0 { panic(err.Error()) } // what CPU do we have? <-time.After(1 * time.Millisecond) c := cpu() if oldn, ok := cpus[c]; ok { fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c) } cpus[c] = n mask <<= 1 n++ } fmt.Printf("%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus) ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff))) if ret != 0 { panic(err.Error()) } } type RWMutex2 []sync.RWMutex func (mx RWMutex2) Lock() { for core := range mx { mx[core].Lock() } } func (mx RWMutex2) Unlock() { for core := range mx { mx[core].Unlock() } } func main() { cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling") locks := flag.Uint64("i", 10000, "Number of iterations to perform") write := flag.Float64("p", 0.0001, "Probability of write locks") wwork := flag.Int("w", 1, "Amount of work for each writer") rwork := flag.Int("r", 100, "Amount of work for each reader") readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers") checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations") flag.Parse() var o *os.File if *cpuprofile { o, _ := os.Create("rw.out") pprof.StartCPUProfile(o) } readers_per_core := *readers / runtime.GOMAXPROCS(0) var wg sync.WaitGroup var mx1 sync.RWMutex start1 := time.Now() for n := 0; n < runtime.GOMAXPROCS(0); n++ { for r := 0; r < readers_per_core; r++ { wg.Add(1) go func() { defer wg.Done() r := rand.New(rand.NewSource(rand.Int63())) for n := uint64(0); n < *locks; n++ { if r.Float64() < *write { mx1.Lock() x := 0 for i := 0; i < *wwork; i++ { x++ } _ = x mx1.Unlock() } else { mx1.RLock() x := 0 for i := 0; i < *rwork; i++ { x++ } _ = x mx1.RUnlock() } } }() } } wg.Wait() end1 := time.Now() t1 := end1.Sub(start1) fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1) if *cpuprofile { pprof.StopCPUProfile() o.Close() o, _ = os.Create("rw2.out") pprof.StartCPUProfile(o) } mx2 := make(RWMutex2, len(cpus)) start2 := time.Now() for n := 0; n < runtime.GOMAXPROCS(0); n++ { for r := 0; r < readers_per_core; r++ { wg.Add(1) go func() { defer wg.Done() c := cpus[cpu()] r := rand.New(rand.NewSource(rand.Int63())) for n := uint64(0); n < *locks; n++ { if *checkcpu != 0 && n%*checkcpu == 0 { c = cpus[cpu()] } if r.Float64() < *write { mx2.Lock() x := 0 for i := 0; i < *wwork; i++ { x++ } _ = x mx2.Unlock() } else { mx2[c].RLock() x := 0 for i := 0; i < *rwork; i++ { x++ } _ = x mx2[c].RUnlock() } } }() } } wg.Wait() end2 := time.Now() pprof.StopCPUProfile() o.Close() t2 := end2.Sub(start2) fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2) }
实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.
下图显示核数量使用增加每10个的平均性能:
drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100
错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。
查看go-nuts tread进一步讨论
cpu_amd64.s
#include "textflag.h" // func cpu() uint64 TEXT 路cpu(SB),NOSPLIT,$0-8 MOVL $0x01, AX // version information MOVL $0x00, BX // any leaf will do MOVL $0x00, CX // any subleaf will do // call CPUID BYTE $0x0f BYTE $0xa2 SHRQ $24, BX // logical cpu id is put in EBX[31-24] MOVQ BX, ret+0(FP) RET
main.go
package main import ( "flag" "fmt" "math/rand" "os" "runtime" "runtime/pprof" "sync" "syscall" "time" "unsafe" ) func cpu() uint64 // implemented in cpu_amd64.s var cpus map[uint64]int // determine mapping from APIC ID to CPU index by pinning the entire process to // one core at the time, and seeing that its APIC ID is. func init() { cpus = make(map[uint64]int) var aff uint64 syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff))) n := 0 start := time.Now() var mask uint64 = 1 Outer: for { for (aff & mask) == 0 { mask <<= 1 if mask == 0 || mask > aff { break Outer } } ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask))) if ret != 0 { panic(err.Error()) } // what CPU do we have? <-time.After(1 * time.Millisecond) c := cpu() if oldn, ok := cpus[c]; ok { fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c) } cpus[c] = n mask <<= 1 n++ } fmt.Printf("%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus) ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff))) if ret != 0 { panic(err.Error()) } } type RWMutex2 []sync.RWMutex func (mx RWMutex2) Lock() { for core := range mx { mx[core].Lock() } } func (mx RWMutex2) Unlock() { for core := range mx { mx[core].Unlock() } } func main() { cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling") locks := flag.Uint64("i", 10000, "Number of iterations to perform") write := flag.Float64("p", 0.0001, "Probability of write locks") wwork := flag.Int("w", 1, "Amount of work for each writer") rwork := flag.Int("r", 100, "Amount of work for each reader") readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers") checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations") flag.Parse() var o *os.File if *cpuprofile { o, _ := os.Create("rw.out") pprof.StartCPUProfile(o) } readers_per_core := *readers / runtime.GOMAXPROCS(0) var wg sync.WaitGroup var mx1 sync.RWMutex start1 := time.Now() for n := 0; n < runtime.GOMAXPROCS(0); n++ { for r := 0; r < readers_per_core; r++ { wg.Add(1) go func() { defer wg.Done() r := rand.New(rand.NewSource(rand.Int63())) for n := uint64(0); n < *locks; n++ { if r.Float64() < *write { mx1.Lock() x := 0 for i := 0; i < *wwork; i++ { x++ } _ = x mx1.Unlock() } else { mx1.RLock() x := 0 for i := 0; i < *rwork; i++ { x++ } _ = x mx1.RUnlock() } } }() } } wg.Wait() end1 := time.Now() t1 := end1.Sub(start1) fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1) if *cpuprofile { pprof.StopCPUProfile() o.Close() o, _ = os.Create("rw2.out") pprof.StartCPUProfile(o) } mx2 := make(RWMutex2, len(cpus)) start2 := time.Now() for n := 0; n < runtime.GOMAXPROCS(0); n++ { for r := 0; r < readers_per_core; r++ { wg.Add(1) go func() { defer wg.Done() c := cpus[cpu()] r := rand.New(rand.NewSource(rand.Int63())) for n := uint64(0); n < *locks; n++ { if *checkcpu != 0 && n%*checkcpu == 0 { c = cpus[cpu()] } if r.Float64() < *write { mx2.Lock() x := 0 for i := 0; i < *wwork; i++ { x++ } _ = x mx2.Unlock() } else { mx2[c].RLock() x := 0 for i := 0; i < *rwork; i++ { x++ } _ = x mx2[c].RUnlock() } } }() } } wg.Wait() end2 := time.Now() pprof.StopCPUProfile() o.Close() t2 := end2.Sub(start2) fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2) }