2.4. sync 包详解

2.4.1. sync 包概述

sync 包提供了基本的同步原语,用于低级别的内存访问同步。

Warning

sync 包中的类型在使用后不能被复制!

2.4.2. sync.Mutex (互斥锁)

2.4.2.1. 基本用法

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

2.4.2.2. ⚠️ 常见陷阱

2.4.2.2.1. 陷阱 1:忘记解锁

// ❌ 如果中间 return,锁不会释放
func (c *SafeCounter) Inc() {
    c.mu.Lock()
    if someCondition {
        return // 锁没有释放!
    }
    c.count++
    c.mu.Unlock()
}

// ✅ 使用 defer
func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if someCondition {
        return // defer 会确保解锁
    }
    c.count++
}

2.4.2.2.2. 陷阱 2:锁的复制

type Counter struct {
    mu    sync.Mutex
    count int
}

// ❌ 值传递会复制锁
func (c Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++ // 修改的是副本
}

// ✅ 使用指针接收者
func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

2.4.2.2.3. 陷阱 3:死锁

// ❌ 同一个 goroutine 重复加锁
func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.addOne() // 如果 addOne 也加锁,死锁!
}

func (c *Counter) addOne() {
    c.mu.Lock() // 死锁!
    defer c.mu.Unlock()
    c.count++
}

// ✅ 内部方法不加锁,或使用读写锁
func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.addOneInternal()
}

func (c *Counter) addOneInternal() {
    c.count++ // 假设调用者已持有锁
}

2.4.3. sync.RWMutex (读写锁)

适用于读多写少的场景。

type Cache struct {
    mu    sync.RWMutex
    items map[string]string
}

// 读操作使用 RLock
func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    val, ok := c.items[key]
    return val, ok
}

// 写操作使用 Lock
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.items[key] = value
}

2.4.3.1. 读写锁规则

操作

条件

RLock

可以多个 goroutine 同时持有读锁

Lock

必须等待所有读锁和写锁释放

RUnlock

释放一个读锁

Unlock

释放写锁

2.4.4. sync.WaitGroup

等待一组 Goroutine 完成。

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Worker %d done\n", id)
        }(i)
    }
    
    wg.Wait() // 等待所有 goroutine 完成
    fmt.Println("All workers done")
}

2.4.4.1. ⚠️ WaitGroup 陷阱

2.4.4.1.1. 陷阱 1:Add 在 goroutine 外调用

// ❌ 可能出现 race condition
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
    go func(id int) {
        wg.Add(1) // 太晚了!Wait 可能已经返回
        defer wg.Done()
        // ...
    }(i)
}
wg.Wait()

// ✅ Add 在启动 goroutine 前调用
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
    wg.Add(1)
    go func(id int) {
        defer wg.Done()
        // ...
    }(i)
}
wg.Wait()

2.4.4.1.2. 陷阱 2:Done 调用次数不匹配

// ❌ Done 调用太多次会 panic
var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    defer wg.Done() // panic: negative WaitGroup counter
}()

2.4.5. sync.Once

确保某个操作只执行一次,常用于单例模式。

var (
    instance *Database
    once     sync.Once
)

func GetDatabase() *Database {
    once.Do(func() {
        instance = &Database{}
        instance.Connect()
    })
    return instance
}

2.4.5.1. ⚠️ Once 陷阱

// ❌ 如果 Do 中的函数 panic,Once 仍然被标记为已执行
var once sync.Once
once.Do(func() {
    panic("oops") // panic 后,once 被标记为已执行
})
once.Do(func() {
    fmt.Println("这行不会执行")
})

2.4.6. sync.Cond (条件变量)

用于等待或通知条件状态变化。

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{}
    q.cond = sync.NewCond(&sync.Mutex{})
    return q
}

func (q *Queue) Push(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    q.items = append(q.items, item)
    q.cond.Signal() // 通知一个等待者
}

func (q *Queue) Pop() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    // 等待队列非空
    for len(q.items) == 0 {
        q.cond.Wait() // 释放锁并等待信号
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

2.4.6.1. Signal vs Broadcast

  • Signal(): 唤醒一个等待的 goroutine

  • Broadcast(): 唤醒所有等待的 goroutine

2.4.7. sync.Map

并发安全的 map,适用于以下场景:

  1. key 只会写入一次,但读取多次

  2. 多个 goroutine 读写不相交的 key 集合

var m sync.Map

// 存储
m.Store("key", "value")

// 读取
value, ok := m.Load("key")

// 读取或存储
actual, loaded := m.LoadOrStore("key", "default")

// 删除
m.Delete("key")

// 遍历
m.Range(func(key, value interface{}) bool {
    fmt.Printf("%v: %v\n", key, value)
    return true // 返回 false 停止遍历
})

2.4.7.1. ⚠️ sync.Map 的适用场景

// ❌ 不适合:频繁写入的场景
// sync.Map 写入性能比 map + RWMutex 差

// ✅ 适合:读多写少,或 key 分布不均匀
// 例如:缓存、配置、连接池

2.4.8. sync.Pool

对象池,用于复用临时对象,减少内存分配。

var bufferPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func GetBuffer() *bytes.Buffer {
    return bufferPool.Get().(*bytes.Buffer)
}

func PutBuffer(buf *bytes.Buffer) {
    buf.Reset()
    bufferPool.Put(buf)
}

// 使用示例
func ProcessData(data []byte) string {
    buf := GetBuffer()
    defer PutBuffer(buf)
    
    buf.Write(data)
    // 处理数据...
    return buf.String()
}

2.4.8.1. ⚠️ sync.Pool 注意事项

  1. Pool 中的对象可能随时被 GC 回收

  2. 不能用于连接池(连接有状态)

  3. 放回前要重置对象状态

// ❌ 危险:放回的 buffer 还有旧数据
bufferPool.Put(buf)

// ✅ 正确:重置后再放回
buf.Reset()
bufferPool.Put(buf)

2.4.9. atomic 包

提供原子操作,比锁更轻量。

import "sync/atomic"

var counter int64

// 原子增加
atomic.AddInt64(&counter, 1)

// 原子读取
value := atomic.LoadInt64(&counter)

// 原子存储
atomic.StoreInt64(&counter, 100)

// CAS (Compare And Swap)
swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)

2.4.9.1. Go 1.19+ 新增的 atomic 类型

var counter atomic.Int64

counter.Add(1)
counter.Load()
counter.Store(100)
counter.CompareAndSwap(100, 200)

2.4.10. 选择合适的同步原语

场景

推荐方案

简单计数器

atomic

读多写少

sync.RWMutex

临时对象复用

sync.Pool

单例初始化

sync.Once

等待多个 goroutine

sync.WaitGroup

并发安全 map

sync.Map 或 map + RWMutex

goroutine 间通信

channel (首选)

2.4.11. 参考资源