Skip to main content

Go 语言并发编程深入详解

·1304 words·3 mins

English Reference

概述
#

Go 语言的并发模型是其最大优势之一,采用了 CSP (Communication Sequential Processes) 模型。

并发模型
#

CSP 模型核心
#

Goroutine A  →  Channel  →  Goroutine B
  (生产者)      (通信)       (消费者)

goroutine 特性
#

特性 goroutine 线程
内存占用 ~2KB ~1-8MB
创建开销 极小 较大
切换开销 极小 较大
最大数量 几十万 几千
package main

import (
    "fmt"
    "time"
)

func main() {
    // 创建 100000 个 goroutine
    for i := 0; i < 100000; i++ {
        go func(n int) {
            fmt.Println("Goroutine:", n)
        }(i)
    }
    
    time.Sleep(time.Second) // 等待 goroutine 执行
}

Channel 基础
#

无缓冲 Channel
#

ch := make(chan int)

// 发送 (阻塞)
ch <- 42

// 接收 (阻塞)
val := <-ch

有缓冲 Channel
#

bufCh := make(chan int, 10)

// 非阻塞发送 (直到缓冲满)
for i := 0; i < 10; i++ {
    bufCh <- i
}

// 阻塞发送 (缓冲已满)
bufCh <- 11 // 阻塞直到有接收

单向 Channel
#

// 只发送
func sender(ch chan<- int) {
    ch <- 42
}

// 只接收
func receiver(ch <-chan int) {
    val := <-ch
}

// 双向
func worker(ch chan int) {
    ch <- 42
    val := <-ch
}

高级并发模式
#

Worker Pool 模式
#

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    id int
}

type Result struct {
    job   Job
    value string
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        time.Sleep(time.Millisecond * 100) // 模拟任务
        
        result := Result{
            job:   job,
            value: fmt.Sprintf("Job %d completed", job.id),
        }
        
        results <- result
    }
}

func main() {
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动 workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // 发送任务
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{id: i}
    }
    close(jobs)
    
    // 等待完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Println(result.value)
    }
}

Pipeline 模式
#

// 数据流处理管道
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func squarer(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func printer(in <-chan int) {
    for n := range in {
        fmt.Println(n)
    }
}

func main() {
    nums := generator(1, 2, 3, 4, 5)
    squares := squarer(nums)
    printer(squares)
}

生产者-消费者模式
#

type Task struct {
    ID   int
    Data string
}

type Result struct {
    TaskID int
    Output string
}

func producer(jobs chan<- Task) {
    for i := 1; i <= 100; i++ {
        jobs <- Task{ID: i, Data: fmt.Sprintf("data-%d", i)}
    }
    close(jobs)
}

func consumer(jobs <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        // 处理任务
        output := fmt.Sprintf("Processed: %s", job.Data)
        
        results <- Result{
            TaskID: job.ID,
            Output: output,
        }
    }
}

func main() {
    jobs := make(chan Task, 10)
    results := make(chan Result, 10)
    
    var wg sync.WaitGroup
    
    // 启动生产者
    go producer(jobs)
    
    // 启动多个消费者
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go consumer(jobs, results, &wg)
    }
    
    // 等待消费者完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("Task %d: %s\n", result.TaskID, result.Output)
    }
}

并发安全
#

sync.Mutex
#

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

sync.RWMutex
#

type Cache struct {
    mu      sync.RWMutex
    data    map[string]string
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

func (c *Cache) Get(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}

sync.Map
#

var m sync.Map

// 存储
m.Store("key", "value")

// 读取
if val, ok := m.Load("key"); ok {
    fmt.Println(val)
}

// 删除
m.Delete("key")

// 遍历
m.Range(func(key, value interface{}) bool {
    fmt.Println(key, value)
    return true
})

atomic 包
#

var counter int64

// 原子操作
atomic.AddInt64(&counter, 1)
atomic.LoadInt64(&counter)

// CAS (Compare And Swap)
old := atomic.LoadInt64(&counter)
if atomic.CompareAndSwapInt64(&counter, old, old+1) {
    fmt.Println("Successfully incremented")
}

Context 控制
#

func worker(ctx context.Context, id int) {
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d cancelled\n", id)
            return
        default:
            fmt.Printf("Worker %d processing %d\n", id, i)
            time.Sleep(time.Millisecond * 100)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()
    
    go worker(ctx, 1)
    go worker(ctx, 2)
    
    time.Sleep(time.Second)
}

性能优化
#

避免内存逃逸
#

// 逃逸到堆
func bad() {
    s := make([]int, 1000)
    process(s) // 可能逃逸
}

// 栈上分配
func good() {
    buf := make([]byte, 1024) // 小缓冲
    process(buf)              // 通常在栈上
}

sync.Pool
#

var pool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func process(data []byte) {
    buf := pool.Get().(*bytes.Buffer)
    defer pool.Put(buf)
    
    buf.Reset()
    buf.Write(data)
}

常见并发问题
#

死锁
#

// 错误示例
ch := make(chan int)
<-ch  // 死锁!没有 goroutine 发送

// 正确示例
ch := make(chan int)
go func() { ch <- 42 }()
<-ch

数据竞争
#

// 错误 - 多 goroutine 同时写
var count int
go func() { count++ }()
go func() { count++ }() // 竞争!

// 正确 - 使用互斥锁
var (
    count int
    mu    sync.Mutex
)
go func() {
    mu.Lock()
    count++
    mu.Unlock()
}()

内存泄漏
#

// 泄漏 - channel 未关闭
func leak() {
    ch := make(chan int)
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        // 忘记关闭
    }()
    // ch 一直持有引用
}

// 正确 - 使用 context
func good() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    
    ch := make(chan int)
    go func() {
        defer close(ch)
        for {
            select {
            case ch <- 42:
            case <-ctx.Done():
                return
            }
        }
    }()
}

最佳实践
#

  1. 使用 channel 进行通信 - 不共享内存
  2. ** contexto 控制生命周期** - 优雅退出
  3. 合理使用缓冲 - 平衡性能和内存
  4. 使用 sync.Map - 替代 map + mutex
  5. 测试并发代码 - 使用 race detector
# 运行竞态检测
go test -race ./...

总结
#

Go 的并发模型简单而强大,合理使用可以构建高性能服务。