Skip to main content

Redis 深度剖析 - 从原理到高阶应用

·1449 words·3 mins

概述
#

Redis 是高性能的键值存储系统,本文深入讲解其原理和应用场景。

核心数据结构
#

String
#

# 基本操作
SET key value
GET key
DEL key

# 数值操作
INCR key        # 自增
DECR key        # 自减
INCRBY key 10   # 增加指定值

# 批量操作
MSET k1 v1 k2 v2
MGET k1 k2

Hash
#

# 设置字段
HSET user:1001 name "Alice"
HSET user:1001 age 25

# 获取字段
HGET user:1001 name
HGETALL user:1001

# 批量操作
HMSET user:1002 name "Bob" age 30 email "bob@example.com"
HMGET user:1002 name age

# 长度
HLEN user:1001

List
#

# 从右侧推入
RPUSH users "Alice" "Bob" "Charlie"

# 从左侧推入
LPUSH users "David"

# 范围获取
LRANGE users 0 2

# 弹出
RPOP users
LPOP users

# 长度
LLEN users

# 插入
LINSERT users BEFORE "Bob" "Eve"

Set
#

# 添加成员
SADD tags "golang" "redis" "database"

# 移除成员
SREM tags "redis"

# 成员数量
SCARD tags

# 判断成员是否存在
SISMEMBER tags "golang"

# 集合运算
SADD set1 "a" "b" "c"
SADD set2 "b" "c" "d"
SINTER set1 set2    # 交集: b, c
SUNION set1 set2    # 并集: a, b, c, d
SDIFF set1 set2     # 差集: a

Sorted Set
#

# 添加成员(带分数)
ZADD leaderboard 100 "Alice"
ZADD leaderboard 200 "Bob"
ZADD leaderboard 150 "Charlie"

# 范围查询
ZRANGE leaderboard 0 2
ZREVRANGE leaderboard 0 2

# 分数查询
ZSCORE leaderboard "Bob"
ZCOUNT leaderboard 100 200

# 排名
ZRANK leaderboard "Alice"
ZREVRANK leaderboard "Alice"

# 删除
ZREM leaderboard "Bob"

持久化机制
#

RDB (快照)
#

触发条件:
- implementation save
- save 900 1 (15分钟1次修改)
- save 300 10 (5分钟10次修改)
- save 60 10000 (1分钟10000次修改)

AOF (追加日志)
#

appendfsync always    # 每次写操作都同步 (最安全)
appendfsync everysec  # 每秒同步一次 (推荐)
appendfsync no        # 不同步,由操作系统决定

混合持久化
#

# Redis 4.0+ 支持 RDB + AOF 混合
aof-use-rdb-preamble yes

内存管理
#

内存回收策略
#

# 查看内存使用
INFO memory

# 最大内存限制
maxmemory 2gb

# 回收策略
maxmemory-policy allkeys-lru      # 删除最久未使用的键
maxmemory-policy volatile-lru     # 删除设置了过期的最久未使用的键
maxmemory-policy allkeys-random   # 随机删除
maxmemory-policy volatile-random  # 随机删除过期键
maxmemory-policy volatile-ttl     # 删除即将过期的键
maxmemory-policy noeviction       # 不删除,返回错误

过期键处理
#

# 设置过期时间
EXPIRE key 3600       # 1小时后过期
PEXPIRE key 3600000   # 1小时后过期 (毫秒)

# 查看过期时间
TTL key       # 秒
PTTL key      # 毫秒

# 移除过期时间
PERSIST key

内存优化
#

# 压缩列表 (ziplist)
# 小的 list/hash/sorted set 使用压缩存储

# 哈希表优化
hash-max-ziplist-entries 512
hash-max-ziplist-value 64

# 整数集合 (intset)
# 小的 set 全是整数时使用

高阶特性
#

事务
#

# MULTI/EXEC 事务
MULTI
SET key1 "value1"
SET key2 "value2"
EXEC

# WATCH 乐观锁
WATCH key
MULTI
SET key "newvalue"
EXEC

# 事务失败(key 被修改)
##Watch key
MULTI
SET key "value"
EXEC
# -> (nil) 事务回滚

Pipeline
#

# 批量执行,减少网络往返
redis-cli pipeline <<EOF
SET key1 "value1"
SET key2 "value2"
GET key1
GET key2
EOF

发布订阅
#

# 订阅
SUBSCRIBE channel1

# 发布
PUBLISH channel1 "Hello"

# 模式订阅
PSUBSCRIBE channel*

# 取消订阅
UNSUBSCRIBE channel1
PUNSUBSCRIBE channel*

Lua 脚本
#

# 执行 Lua 脚本
EVAL "return redis.call('GET', KEYS[1])" 1 key1

# 复杂事务
EVAL "
local val = redis.call('GET', KEYS[1])
if val == ARGV[1] then
    return redis.call('DEL', KEYS[1])
else
    return 0
end
" 1 mykey "expected_value"

Geo (地理位置)
#

# 添加地理位置
GEOADD locations 116.40 39.90 "Beijing"
GEOADD locations 121.47 31.23 "Shanghai"

# 获取距离
GEOPOS locations "Beijing"
GEODIST locations "Beijing" "Shanghai" km

# 查询附近
GEOHASH locations "Beijing"

Bitmaps
#

# 用户活跃状态
SETBIT user:active:2026-03-02 1 1  # 用户1活跃
SETBIT user:active:2026-03-02 2 1  # 用户2活跃

# 统计活跃用户
BITCOUNT user:active:2026-03-02

# 交集并集
BITOP AND result user:active:2026-03-01 user:active:2026-03-02
BITCOUNT result

HyperLogLog
#

# 去重计数
PFADD users "user1" "user2" "user3"
PFCOUNT users  # 估算基数: 3

# 合并
PFADD users2 "user4" "user5"
PFMERGE allusers users users2
PFCOUNT allusers  # 估算: 5

集群方案
#

主从复制
#

# 从节点配置
replicaof 192.168.1.1 6379
masterauth "password"

哨兵模式
#

# sentinel.conf
sentinel monitor mymaster 192.168.1.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel failover-timeout mymaster 180000

集群模式
#

# 创建集群
redis-cli --cluster create 192.168.1.1:7000 \
  192.168.1.2:7001 192.168.1.3:7002 \
  192.168.1.4:7003 192.168.1.5:7004 \
  192.168.1.6:7005 --cluster-replicas 1

应用场景
#

缓存
#

# Python 示例
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def get_user(user_id):
    cache_key = f"user:{user_id}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 查询数据库
    user = db.query(User, user_id)
    if user:
        r.setex(cache_key, 3600, json.dumps(user))
    return user

分布式锁
#

import redis
import time

r = redis.Redis()

def acquire_lock(lock_name, acquire_timeout=10, lock_timeout=60):
    identifier = str(time.time())
    end = time.time() + acquire_timeout
    
    while time.time() < end:
        # 尝试获取锁
        if r.setnx(f"lock:{lock_name}", identifier):
            r.expire(f"lock:{lock_name}", lock_timeout)
            return identifier
        
        # 检查锁是否过期
        current = r.get(f"lock:{lock_name}")
        if current and float(current) < time.time() - lock_timeout:
            old_value = r.getset(f"lock:{lock_name}", identifier)
            if old_value == current:
                return identifier
        
        time.sleep(0.01)
    
    return None

def release_lock(lock_name, identifier):
    pipe = r.pipeline(True)
    while True:
        try:
            pipe.watch(f"lock:{lock_name}")
            if pipe.get(f"lock:{lock_name}") == identifier:
                pipe.delete(f"lock:{lock_name}")
                pipe.execute()
                return True
            pipe.unwatch()
            break
        except redis.WatchError:
            continue
    return False

队列
#

# 使用 List 实现队列
class Queue:
    def __init__(self, r, name):
        self.r = r
        self.name = name
    
    def enqueue(self, item):
        return self.r.rpush(f"queue:{self.name}", item)
    
    def dequeue(self, timeout=0):
        # BLPOP 返回 (队列名, 值) 或 None
        result = self.r.blpop(f"queue:{self.name}", timeout)
        if result:
            return result[1]
        return None

性能优化
#

连接池
#

// Go 示例
import "github.com/go-redis/redis"

client := redis.NewClient(&redis.Options{
    Addr:         "localhost:6379",
    PoolSize:     10,           // 连接池大小
    MinIdleConns: 5,
    MaxConnAge:   time.Hour,
    PoolTimeout:  time.Second,
    IdleTimeout:  time.Minute,
})

批量操作
#

// Pipeline
pipe := client.Pipeline()
for i := 0; i < 1000; i++ {
    pipe.Set(fmt.Sprintf("key:%d", i), "value", 0)
}
_, err := pipe.Exec()

高可用架构
#

架构图:

client
sentinel (监控主从)
master (写)
replica (读)

故障转移:
1. Sentinel 检测 master 不可用
2. 选举新的 master
3. 其他 replica 重新复制

最佳实践
#

  1. 合理设置过期时间
  2. 使用连接池
  3. 批量操作减少网络开销
  4. 监控内存使用
  5. 定期备份
  6. 使用集群分摊压力

总结
#

Redis 功能强大,适用场景广泛,需要深入理解其原理才能充分发挥优势。