
概述 #
分布式系统设计模式是构建高可用、可扩展系统的必备知识。
核心模式 #
1. CQRS (Command Query Responsibility Segregation) #
命令查询职责分离,读写分离架构。
// 命令侧 - 写操作
type CreateOrderCommand struct {
UserID string
ProductID string
Quantity int
}
type CreateOrderHandler struct {
orderRepo OrderRepository
}
func (h *CreateOrderHandler) Handle(cmd CreateOrderCommand) error {
order := NewOrder(cmd.UserID, cmd.ProductID, cmd.Quantity)
return h.orderRepo.Save(order)
}
// 查询侧 - 读操作
type OrderQueryService struct {
orderReader OrderReader
}
func (s *OrderQueryService) GetOrders(userID string) ([]OrderDTO, error) {
return s.orderReader.FindByUserID(userID)
}2. Event Sourcing #
所有状态变更通过事件记录。
type OrderCreatedEvent struct {
OrderID string
UserID string
Timestamp time.Time
}
type OrderShippedEvent struct {
OrderID string
Carrier string
Tracking string
Timestamp time.Time
}
// 聚合根
type Order struct {
ID string
Events []Event
Version int
Status OrderStatus
}
func (o *Order) Ship(carrier, tracking string) error {
if o.Status != OrderStatusPayed {
return errors.New("Cannot ship unpaid order")
}
event := OrderShippedEvent{
OrderID: o.ID,
Carrier: carrier,
Tracking: tracking,
Timestamp: time.Now(),
}
o.Events = append(o.Events, event)
o.Version++
o.Status = OrderStatusShipped
return nil
}3. Saga Pattern #
分布式事务解决方案。
编排式 (Orchestration) #
type OrderService struct {
orderRepo OrderRepository
paymentCli PaymentClient
inventoryCli InventoryClient
}
func (s *OrderService) CreateOrder(req CreateOrderRequest) error {
// 1. 创建订单
order := NewOrder(req.UserID, req.ProductID, req.Quantity)
s.orderRepo.Save(order)
// 2. 扣减库存
if err := s.inventoryCli.Decrease(order.ProductID, order.Quantity); err != nil {
// 补偿: 删除订单
s.orderRepo.Delete(order.ID)
return err
}
// 3. 执行支付
if err := s.paymentCli.Process(order.UserID, order.Amount); err != nil {
// 补偿: 恢复库存 + 删除订单
s.inventoryCli.Increase(order.ProductID, order.Quantity)
s.orderRepo.Delete(order.ID)
return err
}
// 4. 完成订单
order.Status = OrderStatusPayed
s.orderRepo.Update(order)
return nil
}编排式 (Choreography) #
// 订单服务
func (s *OrderService) HandleOrderCreated() {
// 发布订单创建事件
s.eventBus.Publish(OrderCreated{...})
}
// 支付服务
func (s *PaymentService) HandleOrderCreated(event OrderCreated) {
// 订阅订单创建事件
// 执行支付逻辑
s.eventBus.Publish(PaymentProcessed{...})
}
// 库存服务
func (s *InventoryService) HandleOrderCreated(event OrderCreated) {
// 扣减库存
s.eventBus.Publish(InventoryDecremented{...})
}4. Outbox Pattern #
可靠的消息传递。
// 使用事务写入 outbox 表
type OutboxMessage struct {
ID string
EventType string
Payload string
Status string // pending/processed
CreatedAt time.Time
}
func (s *OrderService) CreateOrder(req CreateOrderRequest) error {
// 开始事务
tx, _ := s.db.Begin()
// 1. 创建订单
order := NewOrder(...)
s.orderRepo.Save(tx, order)
// 2. 写入 outbox
msg := OutboxMessage{
EventType: "OrderCreated",
Payload: json.Marshal(order),
Status: "pending",
}
s.outboxRepo.Save(tx, msg)
// 提交事务
tx.Commit()
// 异步 process outbox
go s.processOutbox()
return nil
}
func (s *OrderService) processOutbox() {
messages := s.outboxRepo.FindPending()
for _, msg := range messages {
// 发送消息到事件总线
if err := s.eventBus.Publish(msg.EventType, msg.Payload); err != nil {
continue // 稍后重试
}
// 标记为已处理
s.outboxRepo.MarkProcessed(msg.ID)
}
}5. Bounded Context #
限界上下文,领域驱动设计。
// 用户上下文
package user
type User struct {
ID string
Username string
Email string
}
type UserService struct {
repo UserRepository
}
func (s *UserService) Register(req RegisterRequest) error {
// 用户注册逻辑
}
// 订单上下文
package order
type Order struct {
ID string
UserID string // 只引用用户ID,不包含用户详情
Items []OrderItem
Total float64
}
type OrderService struct {
repo OrderRepository
}通信模式 #
同步通信 #
Client → HTTP/REST → Service
Client → gRPC → Service
Client → GraphQL → Gateway异步通信 #
Producer → Message Queue → Consumer
Producer → Kafka → Consumer
Producer → RabbitMQ → Consumer容错模式 #
1. Circuit Breaker #
import "github.com/sony/gobreaker"
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
func NewCircuitBreaker() *CircuitBreaker {
cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "external-service",
MaxErrors: 5,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
return &CircuitBreaker{cb: cb}
}
func (b *CircuitBreaker) Execute(fn func() error) error {
_, err := b.cb.Execute(func() (interface{}, error) {
return nil, fn()
})
return err
}2. Bulkhead #
type Bulkhead struct {
sem chan struct{}
}
func NewBulkhead(limit int) *Bulkhead {
return &Bulkhead{
sem: make(chan struct{}, limit),
}
}
func (b *Bulkhead) Execute(fn func() error) error {
b.sem <- struct{}{}
defer func() { <-b.sem }()
return fn()
}3. Retry with Backoff #
func ExecuteWithRetry(fn func() error, maxRetries int) error {
backoff := 100 * time.Millisecond
for i := 0; i < maxRetries; i++ {
err := fn()
if err == nil {
return nil
}
if i < maxRetries-1 {
time.Sleep(backoff)
backoff *= 2 // 指数退避
}
}
return fmt.Errorf("operation failed after %d retries", maxRetries)
}服务发现 #
客户端发现 #
Client → Service Registry → Service Instances
→ direct connection服务端发现 #
Client → Proxy/Load Balancer → Service InstanceConsul 示例 #
import "github.com/hashicorp/consul/api"
func DiscoverService(serviceName string) ([]*api.ServiceEntry, error) {
client, _ := api.NewClient(api.DefaultConfig())
entries, _, err := client.Service().Get(serviceName, nil)
return entries, err
}Consistency 模式 #
1. Optimistic Locking #
type Account struct {
ID string
Balance int
Version int // 乐观锁版本
}
func (s *AccountService) Transfer(from, to string, amount int) error {
// 查询版本
acc1 := s.repo.GetWithVersion(from)
acc2 := s.repo.GetWithVersion(to)
// 更新
acc1.Balance -= amount
acc2.Balance += amount
acc1.Version++
acc2.Version++
// 更新时检查版本
if !s.repo.UpdateIfVersionMatch(acc1) {
return errors.New("concurrent modification")
}
return s.repo.UpdateIfVersionMatch(acc2)
}2. Read Repair #
func (s *KeyValueService) Get(key string) (interface{}, error) {
// 从多个节点读取
v1 := s.node1.Get(key)
v2 := s.node2.Get(key)
v3 := s.node3.Get(key)
// 比较版本,修复不一致
if v1.version > v2.version && v1.version > v3.version {
s.node2.Put(key, v1.value, v1.version)
s.node3.Put(key, v1.value, v1.version)
return v1.value, nil
}
// 类似处理其他情况...
}可观测性 #
分布式追踪 #
// 使用 OpenTelemetry
import "go.opentelemetry.io/otel"
func (s *OrderService) GetOrder(id string) (*Order, error) {
ctx, span := otel.Tracer("order-service").Start(context.Background(), "GetOrder")
defer span.End()
order := s.repo.Get(id)
span.SetAttributes(
attribute.String("order.id", id),
attribute.Int("order.items_count", len(order.Items)),
)
return order, nil
}结构化日志 #
type Logger struct {
log *zap.Logger
}
func (l *Logger) Info(msg string, fields ...Field) {
l.log.Info(msg, fields...)
}
// 使用
l.Info("order created",
zap.String("order_id", order.ID),
zap.String("user_id", order.UserID),
zap.Float64("amount", order.Total),
)部署模式 #
Blue-Green Deployment #
Production (Blue)
├── v1.0
Staging (Green)
├── v2.0
切换:
用户流量 → Green (v2.0)
待稳定后 → Blue 灰度上线Canary Release #
100% v1.0
逐步流量切到 v2.0:
1% → 5% → 10% → 50% → 100%最佳实践 #
- 从简单开始 - 单体优先,后期拆分
- 监控先行 - 无监控不部署
- 幂等设计 - 重试安全
- 超时设置 - 避免级联故障
- 人工干预点 - 重要操作需人工确认
总结 #
分布式系统设计需要权衡一致性和可用性,选择合适的模式至关重要。