
概述 #
Go 语言的并发模型是其最大优势之一,采用了 CSP (Communication Sequential Processes) 模型。
并发模型 #
CSP 模型核心 #
Goroutine A → Channel → Goroutine B
(生产者) (通信) (消费者)goroutine 特性 #
| 特性 | goroutine | 线程 |
|---|---|---|
| 内存占用 | ~2KB | ~1-8MB |
| 创建开销 | 极小 | 较大 |
| 切换开销 | 极小 | 较大 |
| 最大数量 | 几十万 | 几千 |
package main
import (
"fmt"
"time"
)
func main() {
// 创建 100000 个 goroutine
for i := 0; i < 100000; i++ {
go func(n int) {
fmt.Println("Goroutine:", n)
}(i)
}
time.Sleep(time.Second) // 等待 goroutine 执行
}Channel 基础 #
无缓冲 Channel #
ch := make(chan int)
// 发送 (阻塞)
ch <- 42
// 接收 (阻塞)
val := <-ch有缓冲 Channel #
bufCh := make(chan int, 10)
// 非阻塞发送 (直到缓冲满)
for i := 0; i < 10; i++ {
bufCh <- i
}
// 阻塞发送 (缓冲已满)
bufCh <- 11 // 阻塞直到有接收单向 Channel #
// 只发送
func sender(ch chan<- int) {
ch <- 42
}
// 只接收
func receiver(ch <-chan int) {
val := <-ch
}
// 双向
func worker(ch chan int) {
ch <- 42
val := <-ch
}高级并发模式 #
Worker Pool 模式 #
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
id int
}
type Result struct {
job Job
value string
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
time.Sleep(time.Millisecond * 100) // 模拟任务
result := Result{
job: job,
value: fmt.Sprintf("Job %d completed", job.id),
}
results <- result
}
}
func main() {
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 启动 workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{id: i}
}
close(jobs)
// 等待完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println(result.value)
}
}Pipeline 模式 #
// 数据流处理管道
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func squarer(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func printer(in <-chan int) {
for n := range in {
fmt.Println(n)
}
}
func main() {
nums := generator(1, 2, 3, 4, 5)
squares := squarer(nums)
printer(squares)
}生产者-消费者模式 #
type Task struct {
ID int
Data string
}
type Result struct {
TaskID int
Output string
}
func producer(jobs chan<- Task) {
for i := 1; i <= 100; i++ {
jobs <- Task{ID: i, Data: fmt.Sprintf("data-%d", i)}
}
close(jobs)
}
func consumer(jobs <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// 处理任务
output := fmt.Sprintf("Processed: %s", job.Data)
results <- Result{
TaskID: job.ID,
Output: output,
}
}
}
func main() {
jobs := make(chan Task, 10)
results := make(chan Result, 10)
var wg sync.WaitGroup
// 启动生产者
go producer(jobs)
// 启动多个消费者
for i := 0; i < 5; i++ {
wg.Add(1)
go consumer(jobs, results, &wg)
}
// 等待消费者完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Task %d: %s\n", result.TaskID, result.Output)
}
}并发安全 #
sync.Mutex #
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}sync.RWMutex #
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func (c *Cache) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}sync.Map #
var m sync.Map
// 存储
m.Store("key", "value")
// 读取
if val, ok := m.Load("key"); ok {
fmt.Println(val)
}
// 删除
m.Delete("key")
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})atomic 包 #
var counter int64
// 原子操作
atomic.AddInt64(&counter, 1)
atomic.LoadInt64(&counter)
// CAS (Compare And Swap)
old := atomic.LoadInt64(&counter)
if atomic.CompareAndSwapInt64(&counter, old, old+1) {
fmt.Println("Successfully incremented")
}Context 控制 #
func worker(ctx context.Context, id int) {
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
fmt.Printf("Worker %d cancelled\n", id)
return
default:
fmt.Printf("Worker %d processing %d\n", id, i)
time.Sleep(time.Millisecond * 100)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go worker(ctx, 1)
go worker(ctx, 2)
time.Sleep(time.Second)
}性能优化 #
避免内存逃逸 #
// 逃逸到堆
func bad() {
s := make([]int, 1000)
process(s) // 可能逃逸
}
// 栈上分配
func good() {
buf := make([]byte, 1024) // 小缓冲
process(buf) // 通常在栈上
}sync.Pool #
var pool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func process(data []byte) {
buf := pool.Get().(*bytes.Buffer)
defer pool.Put(buf)
buf.Reset()
buf.Write(data)
}常见并发问题 #
死锁 #
// 错误示例
ch := make(chan int)
<-ch // 死锁!没有 goroutine 发送
// 正确示例
ch := make(chan int)
go func() { ch <- 42 }()
<-ch数据竞争 #
// 错误 - 多 goroutine 同时写
var count int
go func() { count++ }()
go func() { count++ }() // 竞争!
// 正确 - 使用互斥锁
var (
count int
mu sync.Mutex
)
go func() {
mu.Lock()
count++
mu.Unlock()
}()内存泄漏 #
// 泄漏 - channel 未关闭
func leak() {
ch := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
// 忘记关闭
}()
// ch 一直持有引用
}
// 正确 - 使用 context
func good() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ch := make(chan int)
go func() {
defer close(ch)
for {
select {
case ch <- 42:
case <-ctx.Done():
return
}
}
}()
}最佳实践 #
- 使用 channel 进行通信 - 不共享内存
- ** contexto 控制生命周期** - 优雅退出
- 合理使用缓冲 - 平衡性能和内存
- 使用 sync.Map - 替代 map + mutex
- 测试并发代码 - 使用 race detector
# 运行竞态检测
go test -race ./...总结 #
Go 的并发模型简单而强大,合理使用可以构建高性能服务。