2.2. Channel 详解
2.2.1. Channel 基础
Channel 是 Go 语言中 Goroutine 之间通信的管道,是 CSP (Communicating Sequential Processes) 并发模型的核心。
Tip
Go 的并发哲学:不要通过共享内存来通信,而要通过通信来共享内存。
See also
延伸阅读:通过通信来共享内存 <https://www.fanyamin.com/journal/2025-03-26-tong-guo-tong-xin-lai-gong-xiang-nei-cun-er-bu-shi-tong-guo.html>_ — C++/Java/Go 三种语言实现事件循环的对比。
2.2.2. Channel 类型
2.2.2.1. 无缓冲 Channel (Unbuffered)
ch := make(chan int) // 无缓冲 channel
// 发送操作会阻塞,直到有接收者
go func() {
ch <- 42 // 阻塞,直到有人接收
}()
value := <-ch // 接收
2.2.2.2. 有缓冲 Channel (Buffered)
ch := make(chan int, 3) // 容量为 3 的 buffered channel
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 不阻塞
ch <- 4 // 阻塞!缓冲区已满
2.2.2.3. 单向 Channel
// 只读 channel
func receive(ch <-chan int) {
value := <-ch
// ch <- 1 // 编译错误:不能发送
}
// 只写 channel
func send(ch chan<- int) {
ch <- 1
// <-ch // 编译错误:不能接收
}
2.2.3. ⚠️ Channel 常见陷阱
2.2.3.1. 陷阱 1:向 nil channel 发送/接收
// ❌ 永远阻塞
var ch chan int // nil channel
ch <- 1 // 永远阻塞
<-ch // 永远阻塞
// ✅ 正确初始化
ch := make(chan int)
2.2.3.2. 陷阱 2:向已关闭的 channel 发送数据
ch := make(chan int)
close(ch)
// ❌ panic: send on closed channel
ch <- 1
// ✅ 只有发送方应该关闭 channel
2.2.3.3. 陷阱 3:重复关闭 channel
ch := make(chan int)
close(ch)
// ❌ panic: close of closed channel
close(ch)
// ✅ 使用 sync.Once 确保只关闭一次
var once sync.Once
safeClose := func() {
once.Do(func() {
close(ch)
})
}
2.2.3.4. 陷阱 4:从已关闭的 channel 读取
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
// 可以继续读取缓冲区中的数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
// 读取已关闭的空 channel 返回零值
fmt.Println(<-ch) // 0 (零值)
// ✅ 正确方式:检查是否关闭
value, ok := <-ch
if !ok {
fmt.Println("channel closed")
}
2.2.4. Channel 操作总结
操作 |
nil channel |
正常 channel |
已关闭 channel |
|---|---|---|---|
发送 |
永久阻塞 |
阻塞或成功 |
panic |
接收 |
永久阻塞 |
阻塞或成功 |
零值 + false |
关闭 |
panic |
成功 |
panic |
len |
0 |
缓冲区元素数 |
0 |
cap |
0 |
缓冲区容量 |
缓冲区容量 |
2.2.5. Select 语句
select 用于同时监听多个 channel 操作。
2.2.5.1. 基本用法
select {
case v := <-ch1:
fmt.Println("received from ch1:", v)
case v := <-ch2:
fmt.Println("received from ch2:", v)
case ch3 <- 42:
fmt.Println("sent to ch3")
default:
fmt.Println("no operation ready")
}
2.2.5.2. 超时处理
select {
case result := <-ch:
fmt.Println("got result:", result)
case <-time.After(5 * time.Second):
fmt.Println("timeout!")
}
2.2.5.3. 非阻塞操作
select {
case v := <-ch:
fmt.Println("received:", v)
default:
fmt.Println("channel empty, not blocking")
}
2.2.5.4. ⚠️ Select 陷阱:随机选择
当多个 case 同时就绪时,select 会随机选择一个执行:
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
ch1 <- 1
ch2 <- 2
// 结果是不确定的!
select {
case v := <-ch1:
fmt.Println("ch1:", v)
case v := <-ch2:
fmt.Println("ch2:", v)
}
2.2.6. 常用 Channel 模式
2.2.6.1. 模式 1:Done Channel (信号通知)
func worker(done chan struct{}) {
for {
select {
case <-done:
fmt.Println("worker stopping")
return
default:
// 工作逻辑
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
done := make(chan struct{})
go worker(done)
time.Sleep(time.Second)
close(done) // 发送停止信号
time.Sleep(100 * time.Millisecond)
}
2.2.6.2. 模式 2:Pipeline (管道)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// 组装管道
nums := generator(1, 2, 3, 4, 5)
squares := square(nums)
for result := range squares {
fmt.Println(result) // 1, 4, 9, 16, 25
}
}
2.2.6.3. 模式 3:Fan-out, Fan-in
// Fan-out: 多个 goroutine 从同一个 channel 读取
func fanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
outs[i] = worker(in)
}
return outs
}
// Fan-in: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
2.2.6.4. 模式 4:Semaphore (信号量)
// 使用 buffered channel 实现信号量
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(chan struct{}, n)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
// 使用示例
func main() {
sem := NewSemaphore(3) // 最多 3 个并发
for i := 0; i < 10; i++ {
sem.Acquire()
go func(id int) {
defer sem.Release()
fmt.Printf("Worker %d running\n", id)
time.Sleep(time.Second)
}(i)
}
}
2.2.6.5. 模式 5:Or-Done Channel
// 当 done 关闭时,停止从 c 读取
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-done:
return
}
}
}
}()
return out
}
2.2.7. Channel 性能考虑
2.2.7.1. 缓冲区大小选择
// 无缓冲:强同步,适合信号通知
done := make(chan struct{})
// 小缓冲:减少阻塞,适合突发流量
ch := make(chan Task, 10)
// 大缓冲:解耦生产消费速度,注意内存占用
ch := make(chan LargeData, 1000) // 可能占用大量内存
2.2.7.2. 避免过度使用 Channel
// ❌ 不必要的 channel 使用
func add(a, b int) int {
result := make(chan int)
go func() {
result <- a + b
}()
return <-result
}
// ✅ 简单操作直接返回
func add(a, b int) int {
return a + b
}
2.2.8. 最佳实践
明确 Channel 所有权:通常由创建者负责关闭
使用单向 Channel:在函数签名中明确意图
避免在循环中创建 Channel:考虑复用
合理设置缓冲区大小:基于实际场景测试
使用 context 代替 done channel:更标准、功能更丰富