2.2. Channel 详解

2.2.1. Channel 基础

Channel 是 Go 语言中 Goroutine 之间通信的管道,是 CSP (Communicating Sequential Processes) 并发模型的核心。

Tip

Go 的并发哲学:不要通过共享内存来通信,而要通过通信来共享内存

See also

延伸阅读:通过通信来共享内存 <https://www.fanyamin.com/journal/2025-03-26-tong-guo-tong-xin-lai-gong-xiang-nei-cun-er-bu-shi-tong-guo.html>_ — C++/Java/Go 三种语言实现事件循环的对比。

2.2.2. Channel 类型

2.2.2.1. 无缓冲 Channel (Unbuffered)

ch := make(chan int) // 无缓冲 channel

// 发送操作会阻塞,直到有接收者
go func() {
    ch <- 42 // 阻塞,直到有人接收
}()

value := <-ch // 接收

2.2.2.2. 有缓冲 Channel (Buffered)

ch := make(chan int, 3) // 容量为 3 的 buffered channel

ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 不阻塞
ch <- 4 // 阻塞!缓冲区已满

2.2.2.3. 单向 Channel

// 只读 channel
func receive(ch <-chan int) {
    value := <-ch
    // ch <- 1 // 编译错误:不能发送
}

// 只写 channel
func send(ch chan<- int) {
    ch <- 1
    // <-ch // 编译错误:不能接收
}

2.2.3. ⚠️ Channel 常见陷阱

2.2.3.1. 陷阱 1:向 nil channel 发送/接收

// ❌ 永远阻塞
var ch chan int // nil channel
ch <- 1         // 永远阻塞
<-ch            // 永远阻塞

// ✅ 正确初始化
ch := make(chan int)

2.2.3.2. 陷阱 2:向已关闭的 channel 发送数据

ch := make(chan int)
close(ch)

// ❌ panic: send on closed channel
ch <- 1

// ✅ 只有发送方应该关闭 channel

2.2.3.3. 陷阱 3:重复关闭 channel

ch := make(chan int)
close(ch)

// ❌ panic: close of closed channel
close(ch)

// ✅ 使用 sync.Once 确保只关闭一次
var once sync.Once
safeClose := func() {
    once.Do(func() {
        close(ch)
    })
}

2.2.3.4. 陷阱 4:从已关闭的 channel 读取

ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)

// 可以继续读取缓冲区中的数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2

// 读取已关闭的空 channel 返回零值
fmt.Println(<-ch) // 0 (零值)

// ✅ 正确方式:检查是否关闭
value, ok := <-ch
if !ok {
    fmt.Println("channel closed")
}

2.2.4. Channel 操作总结

操作

nil channel

正常 channel

已关闭 channel

发送

永久阻塞

阻塞或成功

panic

接收

永久阻塞

阻塞或成功

零值 + false

关闭

panic

成功

panic

len

0

缓冲区元素数

0

cap

0

缓冲区容量

缓冲区容量

2.2.5. Select 语句

select 用于同时监听多个 channel 操作。

2.2.5.1. 基本用法

select {
case v := <-ch1:
    fmt.Println("received from ch1:", v)
case v := <-ch2:
    fmt.Println("received from ch2:", v)
case ch3 <- 42:
    fmt.Println("sent to ch3")
default:
    fmt.Println("no operation ready")
}

2.2.5.2. 超时处理

select {
case result := <-ch:
    fmt.Println("got result:", result)
case <-time.After(5 * time.Second):
    fmt.Println("timeout!")
}

2.2.5.3. 非阻塞操作

select {
case v := <-ch:
    fmt.Println("received:", v)
default:
    fmt.Println("channel empty, not blocking")
}

2.2.5.4. ⚠️ Select 陷阱:随机选择

当多个 case 同时就绪时,select 会随机选择一个执行:

ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
ch1 <- 1
ch2 <- 2

// 结果是不确定的!
select {
case v := <-ch1:
    fmt.Println("ch1:", v)
case v := <-ch2:
    fmt.Println("ch2:", v)
}

2.2.6. 常用 Channel 模式

2.2.6.1. 模式 1:Done Channel (信号通知)

func worker(done chan struct{}) {
    for {
        select {
        case <-done:
            fmt.Println("worker stopping")
            return
        default:
            // 工作逻辑
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    done := make(chan struct{})
    go worker(done)
    
    time.Sleep(time.Second)
    close(done) // 发送停止信号
    time.Sleep(100 * time.Millisecond)
}

2.2.6.2. 模式 2:Pipeline (管道)

func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // 组装管道
    nums := generator(1, 2, 3, 4, 5)
    squares := square(nums)
    
    for result := range squares {
        fmt.Println(result) // 1, 4, 9, 16, 25
    }
}

2.2.6.3. 模式 3:Fan-out, Fan-in

// Fan-out: 多个 goroutine 从同一个 channel 读取
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = worker(in)
    }
    return outs
}

// Fan-in: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

2.2.6.4. 模式 4:Semaphore (信号量)

// 使用 buffered channel 实现信号量
type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
    return make(chan struct{}, n)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

// 使用示例
func main() {
    sem := NewSemaphore(3) // 最多 3 个并发
    
    for i := 0; i < 10; i++ {
        sem.Acquire()
        go func(id int) {
            defer sem.Release()
            fmt.Printf("Worker %d running\n", id)
            time.Sleep(time.Second)
        }(i)
    }
}

2.2.6.5. 模式 5:Or-Done Channel

// 当 done 关闭时,停止从 c 读取
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

2.2.7. Channel 性能考虑

2.2.7.1. 缓冲区大小选择

// 无缓冲:强同步,适合信号通知
done := make(chan struct{})

// 小缓冲:减少阻塞,适合突发流量
ch := make(chan Task, 10)

// 大缓冲:解耦生产消费速度,注意内存占用
ch := make(chan LargeData, 1000) // 可能占用大量内存

2.2.7.2. 避免过度使用 Channel

// ❌ 不必要的 channel 使用
func add(a, b int) int {
    result := make(chan int)
    go func() {
        result <- a + b
    }()
    return <-result
}

// ✅ 简单操作直接返回
func add(a, b int) int {
    return a + b
}

2.2.8. 最佳实践

  1. 明确 Channel 所有权:通常由创建者负责关闭

  2. 使用单向 Channel:在函数签名中明确意图

  3. 避免在循环中创建 Channel:考虑复用

  4. 合理设置缓冲区大小:基于实际场景测试

  5. 使用 context 代替 done channel:更标准、功能更丰富

2.2.9. 参考资源