5.2 原子操作与CAS算法¶
学习目标¶
- 理解原子操作的硬件支持机制
- 掌握sync/atomic包的使用
- 深入学习Compare-And-Swap算法
- 了解无锁数据结构设计原理
学习内容¶
原子操作的硬件支持¶
原子操作是指不会被线程调度机制打断的操作,这种操作一旦开始,就会一直运行到结束,中间不会有任何上下文切换。在多线程环境中,原子操作是实现同步的基础机制之一。
CPU级别的原子指令¶
现代CPU都提供了原子操作的指令支持,如x86架构中的LOCK前缀指令,能够确保指令在执行过程中不会被其他处理器或核心干扰。这些指令为高级语言提供了底层的原子操作能力。
内存屏障与缓存一致性¶
为了保证多处理器系统中内存操作的顺序性和可见性,CPU实现了内存屏障(Memory Barrier)机制: - 写屏障(Store Barrier):确保所有在屏障之前的存储操作都在屏障之后的存储操作之前完成 - 读屏障(Load Barrier):确保所有在屏障之后的加载操作都在屏障之前的加载操作之后完成 - 全屏障(Full Barrier):同时具有读屏障和写屏障的功能
缓存一致性协议(如MESI协议)则保证了多个CPU核心对同一内存位置的访问能够保持一致性。
硬件层面的同步保证¶
硬件通过以下机制保证同步: - 总线锁定:通过锁定系统总线,阻止其他CPU访问内存 - 缓存锁定:利用缓存一致性协议,只对缓存中的数据进行锁定,性能优于总线锁定 - 原子指令:如cmpxchg(比较并交换)、xadd(交换并加)等指令
sync/atomic包详解¶
Go语言的sync/atomic包提供了一系列原子操作函数,用于实现低级别同步。这些函数可以在不使用互斥锁的情况下安全地访问共享变量。
基本原子操作函数¶
sync/atomic包提供了针对不同类型的原子操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int32 = 0
var wg sync.WaitGroup
// 启动100个goroutine进行累加操作
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 原子地将counter加1
atomic.AddInt32(&counter, 1)
}()
}
wg.Wait()
// 原子地加载counter的值
fmt.Println("Final counter value:", atomic.LoadInt32(&counter))
}
常用的基本原子操作函数包括: - AddT:原子地加上一个值(支持int32, int64, uint32, uint64, uintptr) - LoadT:原子地加载一个值 - StoreT:原子地存储一个值 - SwapT:原子地交换值,返回旧值
原子指针操作¶
sync/atomic包也支持指针类型的原子操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Data struct {
Value int
}
func main() {
var data *Data
var wg sync.WaitGroup
// 写入数据的goroutine
wg.Add(1)
go func() {
defer wg.Done()
newData := &Data{Value: 42}
// 原子地存储指针
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&data)), unsafe.Pointer(newData))
}()
// 读取数据的goroutine
wg.Add(1)
go func() {
defer wg.Done()
// 原子地加载指针
for {
d := (*Data)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&data))))
if d != nil {
fmt.Println("Data value:", d.Value)
break
}
}
}()
wg.Wait()
}
注意:使用指针原子操作需要导入unsafe包,将普通指针转换为unsafe.Pointer。
Value类型的使用¶
atomic.Value类型提供了一种安全的方式来原子地存储和加载任意类型的值:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var config atomic.Value
var wg sync.WaitGroup
// 写入配置
wg.Add(1)
go func() {
defer wg.Done()
conf := map[string]string{
"server": "localhost",
"port": "8080",
}
config.Store(conf)
fmt.Println("Config stored")
}()
// 读取配置
wg.Add(1)
go func() {
defer wg.Done()
for {
if conf, ok := config.Load().(map[string]string); ok {
fmt.Printf("Loaded config: server=%s, port=%s\n", conf["server"], conf["port"])
break
}
}
}()
wg.Wait()
}
atomic.Value的使用规则: 1. 首次存储的值不能为nil 2. 不能存储不同类型的值 3. 存储的值如果是指针类型,需要确保指向的数据不会被并发修改
性能特点分析¶
原子操作相比互斥锁有以下性能特点: - 开销更小:原子操作在用户态完成,不需要上下文切换 - 粒度更细:可以精确到变量级别,而不是代码块 - 无阻塞:原子操作不会导致goroutine阻塞 - 适用场景有限:只适用于简单的操作,复杂逻辑仍需使用互斥锁
Compare-And-Swap算法¶
Compare-And-Swap(CAS)是一种乐观锁技术,它尝试原子地更新一个值,只有当该值与预期值相同时才会执行更新。
CAS算法原理¶
CAS操作包含三个操作数: - 内存位置(V) - 预期原值(A) - 新值(B)
CAS的执行过程:如果内存位置V的值等于预期原值A,则将该位置更新为新值B,否则不做任何操作。无论是否更新,都返回该位置的旧值。
Go语言中sync/atomic包的CompareAndSwapT系列函数实现了CAS操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var value int32 = 10
var wg sync.WaitGroup
// 尝试将value从10更新为20
wg.Add(1)
go func() {
defer wg.Done()
success := atomic.CompareAndSwapInt32(&value, 10, 20)
if success {
fmt.Println("Goroutine 1: Successfully updated value to 20")
} else {
fmt.Println("Goroutine 1: Failed to update value")
}
}()
// 尝试将value从10更新为30
wg.Add(1)
go func() {
defer wg.Done()
success := atomic.CompareAndSwapInt32(&value, 10, 30)
if success {
fmt.Println("Goroutine 2: Successfully updated value to 30")
} else {
fmt.Println("Goroutine 2: Failed to update value")
}
}()
wg.Wait()
fmt.Println("Final value:", value)
}
在这个例子中,只有一个goroutine能成功更新value的值。
乐观锁的实现¶
基于CAS可以实现乐观锁,乐观锁假设并发冲突的概率很低,只有在提交操作时才检查是否有冲突:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 计数器结构体,使用乐观锁
type OptimisticCounter struct {
count int32
}
// 增加计数器的值
func (c *OptimisticCounter) Increment() {
for {
// 读取当前值
old := atomic.LoadInt32(&c.count)
// 计算新值
new := old + 1
// 尝试更新,如果失败则重试
if atomic.CompareAndSwapInt32(&c.count, old, new) {
return
}
// 失败后循环重试
}
}
// 获取当前计数器值
func (c *OptimisticCounter) Get() int32 {
return atomic.LoadInt32(&c.count)
}
func main() {
var counter OptimisticCounter
var wg sync.WaitGroup
// 启动1000个goroutine进行累加
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Get()) // 应该输出1000
}
自旋锁的构建¶
自旋锁是一种在获取锁之前不断循环检查的锁机制,可以用CAS实现:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 自旋锁实现
type SpinLock struct {
locked int32 // 0表示未锁定,1表示已锁定
}
// 获取锁
func (sl *SpinLock) Lock() {
// 循环直到获取到锁
for !atomic.CompareAndSwapInt32(&sl.locked, 0, 1) {
// 可以在这里添加短暂的延迟,减少CPU占用
// runtime.Gosched()
}
}
// 释放锁
func (sl *SpinLock) Unlock() {
atomic.StoreInt32(&sl.locked, 0)
}
func main() {
var lock SpinLock
var count int
var wg sync.WaitGroup
// 启动1000个goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
lock.Lock()
defer lock.Unlock()
count++
}()
}
wg.Wait()
fmt.Println("Final count:", count) // 应该输出1000
}
与传统锁的对比¶
| 特性 | CAS/原子操作 | 传统互斥锁 |
|---|---|---|
| 阻塞性 | 非阻塞(自旋本质上是忙等) | 阻塞 |
| 开销 | 低(用户态操作) | 高(可能导致上下文切换) |
| 适用场景 | 简单操作,冲突少 | 复杂操作,冲突多 |
| 公平性 | 不公平 | 通常更公平 |
| 扩展性 | 更好 | 较差 |
ABA问题与解决方案¶
ABA问题是CAS操作的一个潜在缺陷,当一个值从A变成B,又变回A时,CAS操作会误认为这个值没有被修改过。
ABA问题的产生原因¶
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var value int32 = 10 // A值
var wg sync.WaitGroup
// 线程1: 读取值,准备更新
wg.Add(1)
go func() {
defer wg.Done()
old := atomic.LoadInt32(&value)
fmt.Println("Thread 1: Read value as", old)
// 模拟一些操作,让其他线程有机会修改值
time.Sleep(100 * time.Millisecond)
// 尝试CAS操作,期望old是10
success := atomic.CompareAndSwapInt32(&value, old, 20)
if success {
fmt.Println("Thread 1: CAS succeeded, new value is 20")
} else {
fmt.Println("Thread 1: CAS failed")
}
}()
// 线程2: 修改值为B,再改回A
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond) // 等待线程1读取值
// 将值从10改为15(B值)
atomic.StoreInt32(&value, 15)
fmt.Println("Thread 2: Updated value to 15")
// 做一些操作
time.Sleep(10 * time.Millisecond)
// 将值从15改回10(A值)
atomic.StoreInt32(&value, 10)
fmt.Println("Thread 2: Updated value back to 10")
}()
wg.Wait()
fmt.Println("Final value:", value)
}
在这个例子中,线程1的CAS操作会成功,尽管value的值在中间被修改过,这就是ABA问题。
版本号解决方案¶
解决ABA问题的常用方法是引入版本号,每次更新值时同时更新版本号:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 带版本号的值
type VersionedValue struct {
value int32
version int32
}
// CAS操作,同时检查值和版本号
func casWithVersion(vv *VersionedValue, oldValue, newValue, oldVersion int32) bool {
for {
// 读取当前值和版本号
currentValue := atomic.LoadInt32(&vv.value)
currentVersion := atomic.LoadInt32(&vv.version)
// 检查是否与预期值和版本号匹配
if currentValue != oldValue || currentVersion != oldVersion {
return false
}
// 尝试更新值和版本号
// 注意:这里为了简化示例使用了锁,实际实现可能需要更复杂的原子操作
// 或者使用atomic.Value存储整个结构体
if atomic.CompareAndSwapInt32(&vv.value, oldValue, newValue) {
atomic.AddInt32(&vv.version, 1)
return true
}
}
}
func main() {
var vv VersionedValue = VersionedValue{value: 10, version: 1}
var wg sync.WaitGroup
// 线程1: 读取值和版本号,准备更新
wg.Add(1)
go func() {
defer wg.Done()
oldValue := atomic.LoadInt32(&vv.value)
oldVersion := atomic.LoadInt32(&vv.version)
fmt.Printf("Thread 1: Read value=%d, version=%d\n", oldValue, oldVersion)
// 模拟一些操作
time.Sleep(100 * time.Millisecond)
// 尝试带版本号的CAS操作
success := casWithVersion(&vv, oldValue, 20, oldVersion)
if success {
fmt.Printf("Thread 1: CAS succeeded, new value=%d, new version=%d\n",
atomic.LoadInt32(&vv.value), atomic.LoadInt32(&vv.version))
} else {
fmt.Println("Thread 1: CAS failed")
}
}()
// 线程2: 修改值为B,再改回A
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond) // 等待线程1读取值
// 将值从10改为15(B值),并更新版本号
oldValue := atomic.LoadInt32(&vv.value)
oldVersion := atomic.LoadInt32(&vv.version)
casWithVersion(&vv, oldValue, 15, oldVersion)
fmt.Printf("Thread 2: Updated value to 15, new version=%d\n",
atomic.LoadInt32(&vv.version))
// 做一些操作
time.Sleep(10 * time.Millisecond)
// 将值从15改回10(A值),并更新版本号
oldValue = atomic.LoadInt32(&vv.value)
oldVersion = atomic.LoadInt32(&vv.version)
casWithVersion(&vv, oldValue, 10, oldVersion)
fmt.Printf("Thread 2: Updated value back to 10, new version=%d\n",
atomic.LoadInt32(&vv.version))
}()
wg.Wait()
fmt.Printf("Final value=%d, version=%d\n",
atomic.LoadInt32(&vv.value), atomic.LoadInt32(&vv.version))
}
使用版本号后,即使值从A变为B再变回A,版本号也会递增,从而避免ABA问题。
指针标记技术¶
对于指针类型,可以使用指针的最低几位作为标记(因为指针通常是对齐的,最低几位为0),每次修改指针时更新标记:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
)
// 使用指针最低位作为版本标记
func getValue(ptr uintptr) uintptr {
return ptr &^ 1 // 清除最低位
}
func getTag(ptr uintptr) int {
return int(ptr & 1) // 获取最低位
}
func newTaggedPtr(value uintptr, tag int) uintptr {
return value | uintptr(tag&1) // 设置最低位
}
func main() {
var data int = 10 // A值
var taggedPtr uintptr = newTaggedPtr(uintptr(unsafe.Pointer(&data)), 0)
var wg sync.WaitGroup
// 线程1: 读取指针和标记,准备更新
wg.Add(1)
go func() {
defer wg.Done()
oldPtr := atomic.LoadUintptr(&taggedPtr)
oldValuePtr := getValue(oldPtr)
oldTag := getTag(oldPtr)
oldValue := *(*int)(unsafe.Pointer(oldValuePtr))
fmt.Printf("Thread 1: Read value=%d, tag=%d\n", oldValue, oldTag)
// 模拟一些操作
time.Sleep(100 * time.Millisecond)
// 新值
newData := 20
newValuePtr := uintptr(unsafe.Pointer(&newData))
newTag := 1 - oldTag // 翻转标记
// 尝试CAS操作
success := atomic.CompareAndSwapUintptr(
&taggedPtr,
oldPtr,
newTaggedPtr(newValuePtr, newTag),
)
if success {
fmt.Println("Thread 1: CAS succeeded")
} else {
fmt.Println("Thread 1: CAS failed")
}
}()
// 线程2: 修改指针为B,再改回A
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond) // 等待线程1读取值
// 将值从10改为15(B值),并更新标记
oldPtr := atomic.LoadUintptr(&taggedPtr)
newData := 15
newValuePtr := uintptr(unsafe.Pointer(&newData))
newTag := 1 - getTag(oldPtr)
atomic.CompareAndSwapUintptr(&taggedPtr, oldPtr, newTaggedPtr(newValuePtr, newTag))
fmt.Println("Thread 2: Updated value to 15")
// 做一些操作
time.Sleep(10 * time.Millisecond)
// 将值从15改回10(A值),并更新标记
oldPtr = atomic.LoadUintptr(&taggedPtr)
originalValuePtr := uintptr(unsafe.Pointer(&data))
newTag = 1 - getTag(oldPtr)
atomic.CompareAndSwapUintptr(&taggedPtr, oldPtr, newTaggedPtr(originalValuePtr, newTag))
fmt.Println("Thread 2: Updated value back to 10")
}()
wg.Wait()
finalPtr := atomic.LoadUintptr(&taggedPtr)
finalValue := *(*int)(unsafe.Pointer(getValue(finalPtr)))
fmt.Printf("Final value=%d\n", finalValue)
}
实际案例分析¶
在实际开发中,ABA问题可能导致严重的错误,例如在链表操作中:
// 简化的链表节点
type Node struct {
value int
next *Node
}
// 错误的无锁链表删除操作,存在ABA问题
func (n *Node) unsafeDeleteNext() bool {
next := n.next
if next == nil {
return false
}
// 如果next在读取后被删除并重新插入,CAS仍会成功
return atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&n.next)),
unsafe.Pointer(next),
unsafe.Pointer(next.next),
)
}
解决方法是为每个节点添加版本号,或者使用Go 1.19引入的sync/atomic包中的Pointer类型配合版本控制。
无锁数据结构设计¶
无锁(Lock-Free)数据结构是指不使用互斥锁,而是通过原子操作和CAS来实现并发访问的安全数据结构。
Lock-Free队列设计¶
无锁队列通常使用链表实现,通过CAS操作来保证入队和出队的原子性:
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// 无锁队列节点
type Node struct {
value interface{}
next *Node
}
// 无锁队列
type LockFreeQueue struct {
head *Node // 头节点,哨兵节点
tail *Node // 尾节点
}
// 创建新的无锁队列
func NewLockFreeQueue() *LockFreeQueue {
node := &Node{}
return &LockFreeQueue{
head: node,
tail: node,
}
}
// 入队操作
func (q *LockFreeQueue) Enqueue(value interface{}) {
newNode := &Node{value: value}
for {
// 获取当前尾节点
tail := (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail))))
// 获取尾节点的下一个节点
next := (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next))))
// 再次检查尾节点是否仍然是当前尾节点(可能被其他线程修改)
if tail == (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail)))) {
// 如果尾节点的下一个节点是nil,表示可以插入新节点
if next == nil {
// 尝试将新节点设置为尾节点的下一个节点
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&tail.next)),
unsafe.Pointer(next),
unsafe.Pointer(newNode),
) {
// 成功将新节点添加到尾部,尝试更新尾节点
atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&q.tail)),
unsafe.Pointer(tail),
unsafe.Pointer(newNode),
)
return
}
} else {
// 尾节点的下一个节点不为nil,说明尾节点需要更新
atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&q.tail)),
unsafe.Pointer(tail),
unsafe.Pointer(next),
)
}
}
}
}
// 出队操作
func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
for {
// 获取当前头节点、尾节点和头节点的下一个节点
head := (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head))))
tail := (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail))))
next := (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&head.next))))
// 再次检查头节点是否仍然是当前头节点
if head == (*Node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)))) {
// 队列为空
if head == tail {
if next == nil {
return nil, false // 队列空
}
// 尾节点需要更新
atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&q.tail)),
unsafe.Pointer(tail),
unsafe.Pointer(next),
)
} else {
// 尝试获取下一个节点的值
value := next.value
// 尝试更新头节点
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&q.head)),
unsafe.Pointer(head),
unsafe.Pointer(next),
) {
return value, true
}
}
}
}
}
func main() {
queue := NewLockFreeQueue()
var wg sync.WaitGroup
var enqueued, dequeued int32
// 启动10个生产者goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 100; j++ {
value := id*1000 + j
queue.Enqueue(value)
atomic.AddInt32(&enqueued, 1)
}
}(i)
}
// 启动10个消费者goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if value, ok := queue.Dequeue(); ok {
atomic.AddInt32(&dequeued, 1)
// 处理值,这里只是简单打印
// fmt.Println("Dequeued:", value)
} else if atomic.LoadInt32(&enqueued) == atomic.LoadInt32(&dequeued) {
// 所有元素都已处理完毕
break
}
}
}()
}
wg.Wait()
fmt.Printf("Enqueued: %d, Dequeued: %d\n", enqueued, dequeued)
}
无锁栈的实现¶
无锁栈的实现相对简单,只需要维护一个栈顶指针,通过CAS操作来压栈和弹栈:
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// 栈节点
type StackNode struct {
value interface{}
next *StackNode
}
// 无锁栈
type LockFreeStack struct {
top *StackNode
}
// 创建新的无锁栈
func NewLockFreeStack() *LockFreeStack {
return &LockFreeStack{}
}
// 压栈操作
func (s *LockFreeStack) Push(value interface{}) {
newNode := &StackNode{value: value}
for {
// 获取当前栈顶
top := (*StackNode)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&s.top))))
// 新节点的next指向当前栈顶
newNode.next = top
// 尝试将新节点设置为新的栈顶
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
unsafe.Pointer(top),
unsafe.Pointer(newNode),
) {
return
}
// 如果失败则重试
}
}
// 弹栈操作
func (s *LockFreeStack) Pop() (interface{}, bool) {
for {
// 获取当前栈顶
top := (*StackNode)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&s.top))))
// 栈为空
if top == nil {
return nil, false
}
// 获取栈顶的下一个节点
next := top.next
// 尝试将栈顶的下一个节点设置为新的栈顶
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
unsafe.Pointer(top),
unsafe.Pointer(next),
) {
return top.value, true
}
// 如果失败则重试
}
}
func main() {
stack := NewLockFreeStack()
var wg sync.WaitGroup
var pushed, popped int32
// 启动5个goroutine进行压栈操作
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 200; j++ {
value := id*1000 + j
stack.Push(value)
atomic.AddInt32(&pushed, 1)
}
}(i)
}
// 启动5个goroutine进行弹栈操作
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if _, ok := stack.Pop(); ok {
atomic.AddInt32(&popped, 1)
} else if atomic.LoadInt32(&pushed) == atomic.LoadInt32(&popped) {
// 所有元素都已处理完毕
break
}
}
}()
}
wg.Wait()
fmt.Printf("Pushed: %d, Popped: %d\n", pushed, popped)
}
内存回收问题¶
无锁数据结构面临的一个重要问题是内存回收。在并发环境中,当一个节点被删除时,可能还有其他线程正在访问该节点,直接释放会导致错误。
Go语言的垃圾回收机制缓解了这个问题,但在高性能场景下,仍需谨慎处理:
- 延迟回收:保留已删除的节点一段时间,确保没有线程再访问它们
- ** hazard pointers**:跟踪正在被访问的节点,确保不会回收仍在使用的节点
- epoch-based reclamation:基于时间 epoch 标记和回收节点
性能权衡考虑¶
无锁数据结构并非在所有场景下都优于带锁的数据结构:
- 优点:
- 更高的并发性能,特别是在低冲突场景
- 避免了死锁和优先级反转问题
-
更细粒度的同步控制
-
缺点:
- 实现复杂,容易出错
- 在高冲突场景下,CAS重试会导致性能下降
- 内存管理复杂
- 调试困难
选择无锁数据结构还是带锁数据结构,应基于具体的应用场景和性能测试结果。
高级主题¶
无锁编程技术¶
Lock-Free数据结构¶
Lock-Free数据结构是无锁编程的核心,它保证至少有一个线程能够继续执行,不会出现所有线程都阻塞的情况。
实现Lock-Free数据结构的关键原则: - 使用原子操作代替锁 - 设计时考虑所有可能的并发场景 - 确保操作的可线性化(linearizable) - 处理好内存回收问题
内存序列的理解¶
内存序列(Memory Ordering)定义了多线程环境中内存操作的可见性和顺序性。Go语言的sync/atomic包在1.19版本引入了更精细的内存顺序控制:
atomic.OrderRelaxed:无顺序保证,只保证操作本身的原子性atomic.OrderAcquire:在加载操作中,保证后续的加载操作不会重排到该操作之前atomic.OrderRelease:在存储操作中,保证之前的存储操作不会重排到该操作之后atomic.OrderAcqRel:同时具有Acquire和Release的语义atomic.OrderSeqCst:顺序一致性,最严格的内存顺序
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var data int32
var ready uint32
var wg sync.WaitGroup
// 写入数据的goroutine
wg.Add(1)
go func() {
defer wg.Done()
data = 42 // 写入数据
// 发布操作,使用Release顺序
atomic.StoreUint32(&ready, 1, atomic.OrderRelease)
}()
// 读取数据的goroutine
wg.Add(1)
go func() {
defer wg.Done()
// 等待数据准备好,使用Acquire顺序
for atomic.LoadUint32(&ready, atomic.OrderAcquire) == 0 {
// 等待
}
// 此时可以安全地读取data
fmt.Println("Data:", data)
}()
wg.Wait()
}
正确的内存顺序控制可以提高性能,同时保证程序的正确性。
ABA问题的解决¶
除了前面介绍的版本号和指针标记技术,还有其他解决ABA问题的方法:
- 双重CAS:检查两次值是否相同,减少ABA发生的概率
- 间接引用:通过指针的指针来访问对象,每次修改都创建新对象
- 使用专用内存回收机制:如前面提到的hazard pointers
在Go 1.19及以上版本,可以使用sync/atomic包中的CompareAndSwapPointer配合内存顺序控制来更安全地处理ABA问题。
性能权衡分析¶
无锁编程的性能权衡需要考虑以下因素:
- 冲突率:低冲突场景下,无锁结构性能优势明显;高冲突场景下,CAS重试成本高
- CPU缓存:原子操作可能导致缓存失效,影响性能
- 代码复杂性:无锁代码更难编写和维护
- 垃圾回收:Go的GC可能会影响无锁结构的性能
- 可移植性:不同硬件架构对原子操作的支持和性能不同
在实际应用中,建议先使用简单的锁机制实现,只有在性能瓶颈明确存在时,再考虑使用无锁编程技术,并进行充分的测试和验证。
无锁编程是并发编程中的高级技术,需要深入理解硬件特性、内存模型和并发理论,才能正确有效地应用。
下一节:5.3 Context包的设计与应用 - 掌握Go语言并发控制的核心工具