
概述 #
Redis 是高性能的键值存储系统,本文深入讲解其原理和应用场景。
核心数据结构 #
String #
# 基本操作
SET key value
GET key
DEL key
# 数值操作
INCR key # 自增
DECR key # 自减
INCRBY key 10 # 增加指定值
# 批量操作
MSET k1 v1 k2 v2
MGET k1 k2Hash #
# 设置字段
HSET user:1001 name "Alice"
HSET user:1001 age 25
# 获取字段
HGET user:1001 name
HGETALL user:1001
# 批量操作
HMSET user:1002 name "Bob" age 30 email "bob@example.com"
HMGET user:1002 name age
# 长度
HLEN user:1001List #
# 从右侧推入
RPUSH users "Alice" "Bob" "Charlie"
# 从左侧推入
LPUSH users "David"
# 范围获取
LRANGE users 0 2
# 弹出
RPOP users
LPOP users
# 长度
LLEN users
# 插入
LINSERT users BEFORE "Bob" "Eve"Set #
# 添加成员
SADD tags "golang" "redis" "database"
# 移除成员
SREM tags "redis"
# 成员数量
SCARD tags
# 判断成员是否存在
SISMEMBER tags "golang"
# 集合运算
SADD set1 "a" "b" "c"
SADD set2 "b" "c" "d"
SINTER set1 set2 # 交集: b, c
SUNION set1 set2 # 并集: a, b, c, d
SDIFF set1 set2 # 差集: aSorted Set #
# 添加成员(带分数)
ZADD leaderboard 100 "Alice"
ZADD leaderboard 200 "Bob"
ZADD leaderboard 150 "Charlie"
# 范围查询
ZRANGE leaderboard 0 2
ZREVRANGE leaderboard 0 2
# 分数查询
ZSCORE leaderboard "Bob"
ZCOUNT leaderboard 100 200
# 排名
ZRANK leaderboard "Alice"
ZREVRANK leaderboard "Alice"
# 删除
ZREM leaderboard "Bob"持久化机制 #
RDB (快照) #
触发条件:
- implementation save
- save 900 1 (15分钟1次修改)
- save 300 10 (5分钟10次修改)
- save 60 10000 (1分钟10000次修改)AOF (追加日志) #
appendfsync always # 每次写操作都同步 (最安全)
appendfsync everysec # 每秒同步一次 (推荐)
appendfsync no # 不同步,由操作系统决定混合持久化 #
# Redis 4.0+ 支持 RDB + AOF 混合
aof-use-rdb-preamble yes内存管理 #
内存回收策略 #
# 查看内存使用
INFO memory
# 最大内存限制
maxmemory 2gb
# 回收策略
maxmemory-policy allkeys-lru # 删除最久未使用的键
maxmemory-policy volatile-lru # 删除设置了过期的最久未使用的键
maxmemory-policy allkeys-random # 随机删除
maxmemory-policy volatile-random # 随机删除过期键
maxmemory-policy volatile-ttl # 删除即将过期的键
maxmemory-policy noeviction # 不删除,返回错误过期键处理 #
# 设置过期时间
EXPIRE key 3600 # 1小时后过期
PEXPIRE key 3600000 # 1小时后过期 (毫秒)
# 查看过期时间
TTL key # 秒
PTTL key # 毫秒
# 移除过期时间
PERSIST key内存优化 #
# 压缩列表 (ziplist)
# 小的 list/hash/sorted set 使用压缩存储
# 哈希表优化
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
# 整数集合 (intset)
# 小的 set 全是整数时使用高阶特性 #
事务 #
# MULTI/EXEC 事务
MULTI
SET key1 "value1"
SET key2 "value2"
EXEC
# WATCH 乐观锁
WATCH key
MULTI
SET key "newvalue"
EXEC
# 事务失败(key 被修改)
##Watch key
MULTI
SET key "value"
EXEC
# -> (nil) 事务回滚Pipeline #
# 批量执行,减少网络往返
redis-cli pipeline <<EOF
SET key1 "value1"
SET key2 "value2"
GET key1
GET key2
EOF发布订阅 #
# 订阅
SUBSCRIBE channel1
# 发布
PUBLISH channel1 "Hello"
# 模式订阅
PSUBSCRIBE channel*
# 取消订阅
UNSUBSCRIBE channel1
PUNSUBSCRIBE channel*Lua 脚本 #
# 执行 Lua 脚本
EVAL "return redis.call('GET', KEYS[1])" 1 key1
# 复杂事务
EVAL "
local val = redis.call('GET', KEYS[1])
if val == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
" 1 mykey "expected_value"Geo (地理位置) #
# 添加地理位置
GEOADD locations 116.40 39.90 "Beijing"
GEOADD locations 121.47 31.23 "Shanghai"
# 获取距离
GEOPOS locations "Beijing"
GEODIST locations "Beijing" "Shanghai" km
# 查询附近
GEOHASH locations "Beijing"Bitmaps #
# 用户活跃状态
SETBIT user:active:2026-03-02 1 1 # 用户1活跃
SETBIT user:active:2026-03-02 2 1 # 用户2活跃
# 统计活跃用户
BITCOUNT user:active:2026-03-02
# 交集并集
BITOP AND result user:active:2026-03-01 user:active:2026-03-02
BITCOUNT resultHyperLogLog #
# 去重计数
PFADD users "user1" "user2" "user3"
PFCOUNT users # 估算基数: 3
# 合并
PFADD users2 "user4" "user5"
PFMERGE allusers users users2
PFCOUNT allusers # 估算: 5集群方案 #
主从复制 #
# 从节点配置
replicaof 192.168.1.1 6379
masterauth "password"哨兵模式 #
# sentinel.conf
sentinel monitor mymaster 192.168.1.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel failover-timeout mymaster 180000集群模式 #
# 创建集群
redis-cli --cluster create 192.168.1.1:7000 \
192.168.1.2:7001 192.168.1.3:7002 \
192.168.1.4:7003 192.168.1.5:7004 \
192.168.1.6:7005 --cluster-replicas 1应用场景 #
缓存 #
# Python 示例
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
def get_user(user_id):
cache_key = f"user:{user_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 查询数据库
user = db.query(User, user_id)
if user:
r.setex(cache_key, 3600, json.dumps(user))
return user分布式锁 #
import redis
import time
r = redis.Redis()
def acquire_lock(lock_name, acquire_timeout=10, lock_timeout=60):
identifier = str(time.time())
end = time.time() + acquire_timeout
while time.time() < end:
# 尝试获取锁
if r.setnx(f"lock:{lock_name}", identifier):
r.expire(f"lock:{lock_name}", lock_timeout)
return identifier
# 检查锁是否过期
current = r.get(f"lock:{lock_name}")
if current and float(current) < time.time() - lock_timeout:
old_value = r.getset(f"lock:{lock_name}", identifier)
if old_value == current:
return identifier
time.sleep(0.01)
return None
def release_lock(lock_name, identifier):
pipe = r.pipeline(True)
while True:
try:
pipe.watch(f"lock:{lock_name}")
if pipe.get(f"lock:{lock_name}") == identifier:
pipe.delete(f"lock:{lock_name}")
pipe.execute()
return True
pipe.unwatch()
break
except redis.WatchError:
continue
return False队列 #
# 使用 List 实现队列
class Queue:
def __init__(self, r, name):
self.r = r
self.name = name
def enqueue(self, item):
return self.r.rpush(f"queue:{self.name}", item)
def dequeue(self, timeout=0):
# BLPOP 返回 (队列名, 值) 或 None
result = self.r.blpop(f"queue:{self.name}", timeout)
if result:
return result[1]
return None性能优化 #
连接池 #
// Go 示例
import "github.com/go-redis/redis"
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 10, // 连接池大小
MinIdleConns: 5,
MaxConnAge: time.Hour,
PoolTimeout: time.Second,
IdleTimeout: time.Minute,
})批量操作 #
// Pipeline
pipe := client.Pipeline()
for i := 0; i < 1000; i++ {
pipe.Set(fmt.Sprintf("key:%d", i), "value", 0)
}
_, err := pipe.Exec()高可用架构 #
架构图:
client
↓
sentinel (监控主从)
↓
master (写)
↑
replica (读)
故障转移:
1. Sentinel 检测 master 不可用
2. 选举新的 master
3. 其他 replica 重新复制最佳实践 #
- 合理设置过期时间
- 使用连接池
- 批量操作减少网络开销
- 监控内存使用
- 定期备份
- 使用集群分摊压力
总结 #
Redis 功能强大,适用场景广泛,需要深入理解其原理才能充分发挥优势。