Skip to main content

Distributed Systems Pattern - 实战案例解析

·1342 words·3 mins

概述
#

分布式系统设计模式是构建高可用、可扩展系统的必备知识。

核心模式
#

1. CQRS (Command Query Responsibility Segregation)
#

命令查询职责分离,读写分离架构。

// 命令侧 - 写操作
type CreateOrderCommand struct {
    UserID    string
    ProductID string
    Quantity  int
}

type CreateOrderHandler struct {
    orderRepo OrderRepository
}

func (h *CreateOrderHandler) Handle(cmd CreateOrderCommand) error {
    order := NewOrder(cmd.UserID, cmd.ProductID, cmd.Quantity)
    return h.orderRepo.Save(order)
}

// 查询侧 - 读操作
type OrderQueryService struct {
    orderReader OrderReader
}

func (s *OrderQueryService) GetOrders(userID string) ([]OrderDTO, error) {
    return s.orderReader.FindByUserID(userID)
}

2. Event Sourcing
#

所有状态变更通过事件记录。

type OrderCreatedEvent struct {
    OrderID   string
    UserID    string
    Timestamp time.Time
}

type OrderShippedEvent struct {
    OrderID   string
    Carrier   string
    Tracking  string
    Timestamp time.Time
}

// 聚合根
type Order struct {
    ID       string
    Events   []Event
    Version  int
    Status   OrderStatus
}

func (o *Order) Ship(carrier, tracking string) error {
    if o.Status != OrderStatusPayed {
        return errors.New("Cannot ship unpaid order")
    }
    
    event := OrderShippedEvent{
        OrderID: o.ID,
        Carrier: carrier,
        Tracking: tracking,
        Timestamp: time.Now(),
    }
    
    o.Events = append(o.Events, event)
    o.Version++
    o.Status = OrderStatusShipped
    
    return nil
}

3. Saga Pattern
#

分布式事务解决方案。

编排式 (Orchestration)
#

type OrderService struct {
    orderRepo    OrderRepository
    paymentCli   PaymentClient
    inventoryCli InventoryClient
}

func (s *OrderService) CreateOrder(req CreateOrderRequest) error {
    // 1. 创建订单
    order := NewOrder(req.UserID, req.ProductID, req.Quantity)
    s.orderRepo.Save(order)
    
    // 2. 扣减库存
    if err := s.inventoryCli.Decrease(order.ProductID, order.Quantity); err != nil {
        // 补偿: 删除订单
        s.orderRepo.Delete(order.ID)
        return err
    }
    
    // 3. 执行支付
    if err := s.paymentCli.Process(order.UserID, order.Amount); err != nil {
        // 补偿: 恢复库存 + 删除订单
        s.inventoryCli.Increase(order.ProductID, order.Quantity)
        s.orderRepo.Delete(order.ID)
        return err
    }
    
    // 4. 完成订单
    order.Status = OrderStatusPayed
    s.orderRepo.Update(order)
    
    return nil
}

编排式 (Choreography)
#

// 订单服务
func (s *OrderService) HandleOrderCreated() {
    // 发布订单创建事件
    s.eventBus.Publish(OrderCreated{...})
}

// 支付服务
func (s *PaymentService) HandleOrderCreated(event OrderCreated) {
    // 订阅订单创建事件
    // 执行支付逻辑
    s.eventBus.Publish(PaymentProcessed{...})
}

// 库存服务
func (s *InventoryService) HandleOrderCreated(event OrderCreated) {
    // 扣减库存
    s.eventBus.Publish(InventoryDecremented{...})
}

4. Outbox Pattern
#

可靠的消息传递。

// 使用事务写入 outbox 表
type OutboxMessage struct {
    ID          string
    EventType   string
    Payload     string
    Status      string  // pending/processed
    CreatedAt   time.Time
}

func (s *OrderService) CreateOrder(req CreateOrderRequest) error {
    // 开始事务
    tx, _ := s.db.Begin()
    
    // 1. 创建订单
    order := NewOrder(...)
    s.orderRepo.Save(tx, order)
    
    // 2. 写入 outbox
    msg := OutboxMessage{
        EventType: "OrderCreated",
        Payload:   json.Marshal(order),
        Status:    "pending",
    }
    s.outboxRepo.Save(tx, msg)
    
    // 提交事务
    tx.Commit()
    
    // 异步 process outbox
    go s.processOutbox()
    
    return nil
}

func (s *OrderService) processOutbox() {
    messages := s.outboxRepo.FindPending()
    
    for _, msg := range messages {
        // 发送消息到事件总线
        if err := s.eventBus.Publish(msg.EventType, msg.Payload); err != nil {
            continue // 稍后重试
        }
        
        // 标记为已处理
        s.outboxRepo.MarkProcessed(msg.ID)
    }
}

5. Bounded Context
#

限界上下文,领域驱动设计。

// 用户上下文
package user

type User struct {
    ID       string
    Username string
    Email    string
}

type UserService struct {
    repo UserRepository
}

func (s *UserService) Register(req RegisterRequest) error {
    // 用户注册逻辑
}

// 订单上下文
package order

type Order struct {
    ID       string
    UserID   string  // 只引用用户ID,不包含用户详情
    Items    []OrderItem
    Total    float64
}

type OrderService struct {
    repo OrderRepository
}

通信模式
#

同步通信
#

Client → HTTP/REST → Service
Client → gRPC → Service
Client → GraphQL → Gateway

异步通信
#

Producer → Message Queue → Consumer
Producer → Kafka → Consumer
Producer → RabbitMQ → Consumer

容错模式
#

1. Circuit Breaker
#

import "github.com/sony/gobreaker"

type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}

func NewCircuitBreaker() *CircuitBreaker {
    cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
        Name:      "external-service",
        MaxErrors: 5,
        Timeout:   30 * time.Second,
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures > 5
        },
    })
    
    return &CircuitBreaker{cb: cb}
}

func (b *CircuitBreaker) Execute(fn func() error) error {
    _, err := b.cb.Execute(func() (interface{}, error) {
        return nil, fn()
    })
    return err
}

2. Bulkhead
#

type Bulkhead struct {
    sem chan struct{}
}

func NewBulkhead(limit int) *Bulkhead {
    return &Bulkhead{
        sem: make(chan struct{}, limit),
    }
}

func (b *Bulkhead) Execute(fn func() error) error {
    b.sem <- struct{}{}
    defer func() { <-b.sem }()
    
    return fn()
}

3. Retry with Backoff
#

func ExecuteWithRetry(fn func() error, maxRetries int) error {
    backoff := 100 * time.Millisecond
    
    for i := 0; i < maxRetries; i++ {
        err := fn()
        if err == nil {
            return nil
        }
        
        if i < maxRetries-1 {
            time.Sleep(backoff)
            backoff *= 2  // 指数退避
        }
    }
    
    return fmt.Errorf("operation failed after %d retries", maxRetries)
}

服务发现
#

客户端发现
#

Client → Service Registry → Service Instances
     → direct connection

服务端发现
#

Client → Proxy/Load Balancer → Service Instance

Consul 示例
#

import "github.com/hashicorp/consul/api"

func DiscoverService(serviceName string) ([]*api.ServiceEntry, error) {
    client, _ := api.NewClient(api.DefaultConfig())
    
    entries, _, err := client.Service().Get(serviceName, nil)
    return entries, err
}

Consistency 模式
#

1. Optimistic Locking
#

type Account struct {
    ID      string
    Balance int
    Version int  // 乐观锁版本
}

func (s *AccountService) Transfer(from, to string, amount int) error {
    // 查询版本
    acc1 := s.repo.GetWithVersion(from)
    acc2 := s.repo.GetWithVersion(to)
    
    // 更新
    acc1.Balance -= amount
    acc2.Balance += amount
    acc1.Version++
    acc2.Version++
    
    // 更新时检查版本
    if !s.repo.UpdateIfVersionMatch(acc1) {
        return errors.New("concurrent modification")
    }
    
    return s.repo.UpdateIfVersionMatch(acc2)
}

2. Read Repair
#

func (s *KeyValueService) Get(key string) (interface{}, error) {
    // 从多个节点读取
    v1 := s.node1.Get(key)
    v2 := s.node2.Get(key)
    v3 := s.node3.Get(key)
    
    // 比较版本,修复不一致
    if v1.version > v2.version && v1.version > v3.version {
        s.node2.Put(key, v1.value, v1.version)
        s.node3.Put(key, v1.value, v1.version)
        return v1.value, nil
    }
    
    // 类似处理其他情况...
}

可观测性
#

分布式追踪
#

// 使用 OpenTelemetry
import "go.opentelemetry.io/otel"

func (s *OrderService) GetOrder(id string) (*Order, error) {
    ctx, span := otel.Tracer("order-service").Start(context.Background(), "GetOrder")
    defer span.End()
    
    order := s.repo.Get(id)
    
    span.SetAttributes(
        attribute.String("order.id", id),
        attribute.Int("order.items_count", len(order.Items)),
    )
    
    return order, nil
}

结构化日志
#

type Logger struct {
    log *zap.Logger
}

func (l *Logger) Info(msg string, fields ...Field) {
    l.log.Info(msg, fields...)
}

// 使用
l.Info("order created",
    zap.String("order_id", order.ID),
    zap.String("user_id", order.UserID),
    zap.Float64("amount", order.Total),
)

部署模式
#

Blue-Green Deployment
#

Production (Blue)
  ├── v1.0

Staging (Green)
  ├── v2.0

切换:
用户流量 → Green (v2.0)
待稳定后 → Blue 灰度上线

Canary Release
#

100% v1.0

逐步流量切到 v2.0:
1% → 5% → 10% → 50% → 100%

最佳实践
#

  1. 从简单开始 - 单体优先,后期拆分
  2. 监控先行 - 无监控不部署
  3. 幂等设计 - 重试安全
  4. 超时设置 - 避免级联故障
  5. 人工干预点 - 重要操作需人工确认

总结
#

分布式系统设计需要权衡一致性和可用性,选择合适的模式至关重要。