2.4. sync 包详解
2.4.1. sync 包概述
sync 包提供了基本的同步原语,用于低级别的内存访问同步。
Warning
sync 包中的类型在使用后不能被复制!
2.4.2. sync.Mutex (互斥锁)
2.4.2.1. 基本用法
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
2.4.2.2. ⚠️ 常见陷阱
2.4.2.2.1. 陷阱 1:忘记解锁
// ❌ 如果中间 return,锁不会释放
func (c *SafeCounter) Inc() {
c.mu.Lock()
if someCondition {
return // 锁没有释放!
}
c.count++
c.mu.Unlock()
}
// ✅ 使用 defer
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
if someCondition {
return // defer 会确保解锁
}
c.count++
}
2.4.2.2.2. 陷阱 2:锁的复制
type Counter struct {
mu sync.Mutex
count int
}
// ❌ 值传递会复制锁
func (c Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++ // 修改的是副本
}
// ✅ 使用指针接收者
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
2.4.2.2.3. 陷阱 3:死锁
// ❌ 同一个 goroutine 重复加锁
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.addOne() // 如果 addOne 也加锁,死锁!
}
func (c *Counter) addOne() {
c.mu.Lock() // 死锁!
defer c.mu.Unlock()
c.count++
}
// ✅ 内部方法不加锁,或使用读写锁
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.addOneInternal()
}
func (c *Counter) addOneInternal() {
c.count++ // 假设调用者已持有锁
}
2.4.3. sync.RWMutex (读写锁)
适用于读多写少的场景。
type Cache struct {
mu sync.RWMutex
items map[string]string
}
// 读操作使用 RLock
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.items[key]
return val, ok
}
// 写操作使用 Lock
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = value
}
2.4.3.1. 读写锁规则
操作 |
条件 |
|---|---|
RLock |
可以多个 goroutine 同时持有读锁 |
Lock |
必须等待所有读锁和写锁释放 |
RUnlock |
释放一个读锁 |
Unlock |
释放写锁 |
2.4.4. sync.WaitGroup
等待一组 Goroutine 完成。
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d done\n", id)
}(i)
}
wg.Wait() // 等待所有 goroutine 完成
fmt.Println("All workers done")
}
2.4.4.1. ⚠️ WaitGroup 陷阱
2.4.4.1.1. 陷阱 1:Add 在 goroutine 外调用
// ❌ 可能出现 race condition
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
go func(id int) {
wg.Add(1) // 太晚了!Wait 可能已经返回
defer wg.Done()
// ...
}(i)
}
wg.Wait()
// ✅ Add 在启动 goroutine 前调用
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// ...
}(i)
}
wg.Wait()
2.4.4.1.2. 陷阱 2:Done 调用次数不匹配
// ❌ Done 调用太多次会 panic
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer wg.Done() // panic: negative WaitGroup counter
}()
2.4.5. sync.Once
确保某个操作只执行一次,常用于单例模式。
var (
instance *Database
once sync.Once
)
func GetDatabase() *Database {
once.Do(func() {
instance = &Database{}
instance.Connect()
})
return instance
}
2.4.5.1. ⚠️ Once 陷阱
// ❌ 如果 Do 中的函数 panic,Once 仍然被标记为已执行
var once sync.Once
once.Do(func() {
panic("oops") // panic 后,once 被标记为已执行
})
once.Do(func() {
fmt.Println("这行不会执行")
})
2.4.6. sync.Cond (条件变量)
用于等待或通知条件状态变化。
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&sync.Mutex{})
return q
}
func (q *Queue) Push(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 通知一个等待者
}
func (q *Queue) Pop() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 等待队列非空
for len(q.items) == 0 {
q.cond.Wait() // 释放锁并等待信号
}
item := q.items[0]
q.items = q.items[1:]
return item
}
2.4.6.1. Signal vs Broadcast
Signal(): 唤醒一个等待的 goroutineBroadcast(): 唤醒所有等待的 goroutine
2.4.7. sync.Map
并发安全的 map,适用于以下场景:
key 只会写入一次,但读取多次
多个 goroutine 读写不相交的 key 集合
var m sync.Map
// 存储
m.Store("key", "value")
// 读取
value, ok := m.Load("key")
// 读取或存储
actual, loaded := m.LoadOrStore("key", "default")
// 删除
m.Delete("key")
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true // 返回 false 停止遍历
})
2.4.7.1. ⚠️ sync.Map 的适用场景
// ❌ 不适合:频繁写入的场景
// sync.Map 写入性能比 map + RWMutex 差
// ✅ 适合:读多写少,或 key 分布不均匀
// 例如:缓存、配置、连接池
2.4.8. sync.Pool
对象池,用于复用临时对象,减少内存分配。
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func GetBuffer() *bytes.Buffer {
return bufferPool.Get().(*bytes.Buffer)
}
func PutBuffer(buf *bytes.Buffer) {
buf.Reset()
bufferPool.Put(buf)
}
// 使用示例
func ProcessData(data []byte) string {
buf := GetBuffer()
defer PutBuffer(buf)
buf.Write(data)
// 处理数据...
return buf.String()
}
2.4.8.1. ⚠️ sync.Pool 注意事项
Pool 中的对象可能随时被 GC 回收
不能用于连接池(连接有状态)
放回前要重置对象状态
// ❌ 危险:放回的 buffer 还有旧数据
bufferPool.Put(buf)
// ✅ 正确:重置后再放回
buf.Reset()
bufferPool.Put(buf)
2.4.9. atomic 包
提供原子操作,比锁更轻量。
import "sync/atomic"
var counter int64
// 原子增加
atomic.AddInt64(&counter, 1)
// 原子读取
value := atomic.LoadInt64(&counter)
// 原子存储
atomic.StoreInt64(&counter, 100)
// CAS (Compare And Swap)
swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
2.4.9.1. Go 1.19+ 新增的 atomic 类型
var counter atomic.Int64
counter.Add(1)
counter.Load()
counter.Store(100)
counter.CompareAndSwap(100, 200)
2.4.10. 选择合适的同步原语
场景 |
推荐方案 |
|---|---|
简单计数器 |
atomic |
读多写少 |
sync.RWMutex |
临时对象复用 |
sync.Pool |
单例初始化 |
sync.Once |
等待多个 goroutine |
sync.WaitGroup |
并发安全 map |
sync.Map 或 map + RWMutex |
goroutine 间通信 |
channel (首选) |