2.5. 并发模式
2.5.1. 概述
Go 语言中有许多经典的并发模式,掌握这些模式可以帮助你编写高效、可维护的并发代码。
2.5.2. 模式 1:Worker Pool (工作池)
适用于需要限制并发数量的场景。
type Job struct {
ID int
Data interface{}
}
type Result struct {
JobID int
Value interface{}
Err error
}
func WorkerPool(numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
result := processJob(job)
results <- result
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
func processJob(job Job) Result {
// 处理任务
return Result{JobID: job.ID, Value: "done"}
}
// 使用示例
func main() {
jobs := make(chan Job, 100)
results := WorkerPool(5, jobs)
// 提交任务
go func() {
for i := 0; i < 100; i++ {
jobs <- Job{ID: i, Data: i}
}
close(jobs)
}()
// 收集结果
for result := range results {
fmt.Printf("Job %d: %v\n", result.JobID, result.Value)
}
}
2.5.3. 模式 2:Rate Limiter (速率限制器)
2.5.3.1. 使用 time.Ticker
func rateLimiter(requests <-chan Request, rate time.Duration) <-chan Request {
limited := make(chan Request)
go func() {
ticker := time.NewTicker(rate)
defer ticker.Stop()
defer close(limited)
for req := range requests {
<-ticker.C // 等待下一个 tick
limited <- req
}
}()
return limited
}
2.5.3.2. 使用 golang.org/x/time/rate
import "golang.org/x/time/rate"
func main() {
// 每秒 10 个请求,最多突发 5 个
limiter := rate.NewLimiter(10, 5)
for i := 0; i < 20; i++ {
if err := limiter.Wait(context.Background()); err != nil {
log.Fatal(err)
}
fmt.Printf("Request %d at %v\n", i, time.Now())
}
}
2.5.4. 模式 3:Circuit Breaker (熔断器)
防止级联故障的模式。
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
mu sync.Mutex
state State
failures int
threshold int
timeout time.Duration
lastFailure time.Time
}
func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
threshold: threshold,
timeout: timeout,
}
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
switch cb.state {
case StateOpen:
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = StateHalfOpen
} else {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = StateOpen
}
return err
}
cb.failures = 0
cb.state = StateClosed
return nil
}
2.5.5. 模式 4:Pub/Sub (发布订阅)
type PubSub struct {
mu sync.RWMutex
subs map[string][]chan interface{}
closed bool
}
func NewPubSub() *PubSub {
return &PubSub{
subs: make(map[string][]chan interface{}),
}
}
func (ps *PubSub) Subscribe(topic string) <-chan interface{} {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan interface{}, 1)
ps.subs[topic] = append(ps.subs[topic], ch)
return ch
}
func (ps *PubSub) Publish(topic string, msg interface{}) {
ps.mu.RLock()
defer ps.mu.RUnlock()
if ps.closed {
return
}
for _, ch := range ps.subs[topic] {
select {
case ch <- msg:
default:
// 如果订阅者处理太慢,跳过
}
}
}
func (ps *PubSub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.closed {
return
}
ps.closed = true
for _, subs := range ps.subs {
for _, ch := range subs {
close(ch)
}
}
}
2.5.6. 模式 5:Context Cancellation (上下文取消)
func longRunningTask(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 执行一小步工作
if done := doOneStep(); done {
return nil
}
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := longRunningTask(ctx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
fmt.Println("Task timed out")
} else if errors.Is(err, context.Canceled) {
fmt.Println("Task was canceled")
}
}
}
2.5.7. 模式 6:Graceful Shutdown (优雅关闭)
func main() {
server := &http.Server{Addr: ":8080"}
// 启动服务器
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("HTTP server error: %v", err)
}
}()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
// 优雅关闭
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatalf("Server shutdown error: %v", err)
}
fmt.Println("Server gracefully stopped")
}
2.5.8. 模式 7:Barrier (屏障)
等待多个 goroutine 到达同一点后再继续。
type Barrier struct {
total int
count int
mu sync.Mutex
cond *sync.Cond
}
func NewBarrier(n int) *Barrier {
b := &Barrier{total: n}
b.cond = sync.NewCond(&b.mu)
return b
}
func (b *Barrier) Wait() {
b.mu.Lock()
defer b.mu.Unlock()
b.count++
if b.count == b.total {
b.count = 0 // 重置
b.cond.Broadcast()
} else {
b.cond.Wait()
}
}
// 使用示例
func main() {
barrier := NewBarrier(3)
for i := 0; i < 3; i++ {
go func(id int) {
fmt.Printf("Worker %d preparing\n", id)
time.Sleep(time.Duration(id) * time.Second)
barrier.Wait() // 等待所有 worker 准备好
fmt.Printf("Worker %d running\n", id)
}(i)
}
time.Sleep(5 * time.Second)
}
2.5.9. 模式 8:Tee Channel (T 形分流)
将一个 channel 的数据分发到多个 channel。
func tee(done <-chan struct{}, in <-chan interface{}) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, in) {
var o1, o2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
return
case o1 <- val:
o1 = nil // 已发送,置为 nil
case o2 <- val:
o2 = nil
}
}
}
}()
return out1, out2
}
2.5.10. 模式 9:Retry with Backoff (指数退避重试)
type BackoffConfig struct {
InitialInterval time.Duration
MaxInterval time.Duration
MaxRetries int
Multiplier float64
}
func RetryWithBackoff(ctx context.Context, cfg BackoffConfig, fn func() error) error {
interval := cfg.InitialInterval
for i := 0; i < cfg.MaxRetries; i++ {
err := fn()
if err == nil {
return nil
}
if i == cfg.MaxRetries-1 {
return fmt.Errorf("max retries exceeded: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(interval):
// 计算下一次重试间隔
interval = time.Duration(float64(interval) * cfg.Multiplier)
if interval > cfg.MaxInterval {
interval = cfg.MaxInterval
}
}
}
return nil
}
// 使用示例
func main() {
cfg := BackoffConfig{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 5 * time.Second,
MaxRetries: 5,
Multiplier: 2.0,
}
err := RetryWithBackoff(context.Background(), cfg, func() error {
// 可能失败的操作
return callExternalService()
})
if err != nil {
log.Fatal(err)
}
}
2.5.11. 模式选择指南
场景 |
推荐模式 |
|---|---|
限制并发请求数 |
Worker Pool |
限制请求频率 |
Rate Limiter |
防止服务雪崩 |
Circuit Breaker |
事件驱动解耦 |
Pub/Sub |
任务超时控制 |
Context Cancellation |
服务停止 |
Graceful Shutdown |
等待多个任务到达 |
Barrier |
重试失败操作 |
Retry with Backoff |