
Redis Stream 做消息队列那些坑和填坑方法一、先说结论Redis Stream 能用的但别当 RabbitMQ 用用 Redis 做消息队列入门确实简单——XADD生产XREAD消费两条命令搞定。但生产环境上线后问题才真正开始。我们有个 RAG 系统用 Redis Stream 做文档入库管道每天 50 万条消息。上线第一周就挂了两次第一次是消费者进程 OOM 重启正在处理但没 ACK 的 2000 条消息直接丢了。第二次是 Redis 主从切换消费者组的偏移量回退3 万条消息被重复处理向量数据库里全是重复向量检索结果乱成一团。问题根源就一句话Redis Stream 的至少一次语义不是默认保证的。ACK 机制和 RabbitMQ 完全不同——Redis 的 PELPending Entry List只记录未确认消息不会自动重投递。你需要自己写 XCLAIM 逻辑自己处理幂等自己管死信。很多人以为 ACK 完就没事了其实那只是开始。二、Redis Stream 消费者组到底在管什么2.1 消息的生命周期一条消息从进入 Stream 到最终被处理会经历这些状态XADD → XREADGROUP → 处理中 → XACK → 完成 ↓ 超时未ACK → XPENDING 检测 → XCLAIM 转移 → 重处理 ↓ 重试耗尽 → 死信队列几个关键概念PELPending Entry List每个消费者组维护一个未确认消息列表。消息被XREADGROUP读取后进入 PELXACK后移出。PEL 里的消息不会自动重投递——这点和 RabbitMQ 区别很大。XCLAIM把 PEL 中属于某个消费者的消息转移给另一个消费者。这是实现重投递的核心命令。XPENDING查看 PEL 的统计信息包括未确认消息数量、每个消费者的待处理数量、最长等待时间。2.2 可靠消费要自己搭三层第一层至少一次投递 XADD → XREADGROUP → 处理 → XACK 第二层故障恢复 超时 → XPENDING 检测 → XCLAIM 转移 → 重新处理 第三层幂等 死信 幂等检查 → 未处理则执行业务 → 成功则 XACK ↓ 重试耗尽 → 写入死信队列三、一个能用的可靠消费框架 Redis Stream 可靠消费框架 import asyncio import json import time import uuid from dataclasses import dataclass from enum import Enum from typing import Any, Callable, Coroutine, Optional import redis.asyncio as aioredis class MessageStatus(Enum): PENDING pending PROCESSING processing ACKED acked DEAD dead dataclass class StreamMessage: msg_id: str stream: str data: dict[str, Any] consumer: str status: MessageStatus MessageStatus.PENDING delivered_count: int 0 class ReliableConsumer: 可靠消费者自动管理消费确认、重投递和死信处理 def __init__( self, redis: aioredis.Redis, stream: str, group: str, consumer: Optional[str] None, *, batch_size: int 10, block_ms: int 2000, ack_timeout_ms: int 30000, max_retries: int 3, dead_letter_stream: Optional[str] None, idempotency_ttl: int 3600, ): self._redis redis self._stream stream self._group group self._consumer consumer or fconsumer-{uuid.uuid4().hex[:8]} self._batch_size batch_size self._block_ms block_ms self._ack_timeout_ms ack_timeout_ms self._max_retries max_retries self._dead_letter_stream dead_letter_stream or f{stream}:dead self._idempotency_ttl idempotency_ttl self._running False self._handler: Optional[Callable[[StreamMessage], Coroutine]] None self._idempotency_prefix fidempotency:{stream}: async def _ensure_group(self): 确保消费者组存在 try: await self._redis.xgroup_create( self._stream, self._group, id0, mkstreamTrue ) except aioredis.ResponseError as e: if BUSYGROUP not in str(e): raise def on_message(self, handler: Callable[[StreamMessage], Coroutine]): 注册消息处理函数 self._handler handler async def start(self): 启动消费者 await self._ensure_group() self._running True await asyncio.gather( self._consume_loop(), self._claim_loop(), ) async def stop(self): 优雅停止 self._running False async def _consume_loop(self): 消费新消息 while self._running: try: messages await self._redis.xreadgroup( groupnameself._group, consumernameself._consumer, streams{self._stream: }, countself._batch_size, blockself._block_ms, ) if not messages: continue for stream_name, msg_list in messages: for msg_id, fields in msg_list: msg StreamMessage( msg_idmsg_id.decode() if isinstance(msg_id, bytes) else msg_id, streamstream_name.decode() if isinstance(stream_name, bytes) else stream_name, data{k.decode(): v.decode() for k, v in fields.items()} if fields else {}, statusMessageStatus.PROCESSING, delivered_count1, ) await self._process_message(msg) except aioredis.ConnectionError: await asyncio.sleep(1) except asyncio.CancelledError: break async def _claim_loop(self): 回收超时消息 while self._running: try: await asyncio.sleep(self._ack_timeout_ms / 1000) pending await self._redis.xpending_range( self._stream, self._group, min-, max, countself._batch_size, idleself._ack_timeout_ms, ) if not pending: continue pending_ids [ p[message_id].decode() if isinstance(p[message_id], bytes) else p[message_id] for p in pending ] claimed await self._redis.xclaim( self._stream, self._group, self._consumer, min_idle_timeself._ack_timeout_ms, message_idspending_ids, ) for msg_id, fields in claimed: msg StreamMessage( msg_idmsg_id.decode() if isinstance(msg_id, bytes) else msg_id, streamself._stream, data{k.decode(): v.decode() for k, v in fields.items()} if fields else {}, statusMessageStatus.PROCESSING, delivered_count2, ) await self._process_message(msg) except aioredis.ConnectionError: await asyncio.sleep(1) except asyncio.CancelledError: break async def _process_message(self, msg: StreamMessage): 处理单条消息幂等检查 → 执行 → 确认/死信 idempotency_key f{self._idempotency_prefix}{msg.msg_id} already_processed await self._redis.get(idempotency_key) if already_processed: await self._redis.xack(self._stream, self._group, msg.msg_id) return if self._handler is None: return try: await self._handler(msg) await self._redis.xack(self._stream, self._group, msg.msg_id) await self._redis.setex( idempotency_key, self._idempotency_ttl, 1 ) except Exception as e: msg.delivered_count 1 if msg.delivered_count self._max_retries: await self._send_to_dead_letter(msg, str(e)) await self._redis.xack(self._stream, self._group, msg.msg_id) async def _send_to_dead_letter(self, msg: StreamMessage, error: str): 写入死信队列 dead_letter_data { original_id: msg.msg_id, original_stream: msg.stream, original_data: json.dumps(msg.data, ensure_asciiFalse), error: error, delivered_count: str(msg.delivered_count), dead_at: str(int(time.time() * 1000)), } await self._redis.xadd(self._dead_letter_stream, dead_letter_data) # 使用示例 async def handle_document(msg: StreamMessage): 处理文档入库消息 doc_id msg.data.get(doc_id) content msg.data.get(content, ) print(f处理文档: {doc_id}, 内容长度: {len(content)}) await asyncio.sleep(0.1) async def main(): redis aioredis.from_url(redis://localhost:6379) consumer ReliableConsumer( redisredis, streamrag:document:ingest, groupingest-workers, batch_size10, block_ms2000, ack_timeout_ms30000, max_retries3, ) consumer.on_message(handle_document) try: await consumer.start() except KeyboardInterrupt: await consumer.stop() finally: await redis.aclose() if __name__ __main__: asyncio.run(main())框架的关键点双循环_consume_loop消费新消息_claim_loop回收超时消息。两者并发运行互不阻塞。幂等用 Redis String 存储已处理消息 ID设置 TTL 防止无限增长。幂等检查在处理之前执行避免重复处理。死信重试耗尽的消息写入独立的死信 Stream保留原始数据和错误信息方便后续排查和重放。四、什么时候该换掉 Redis Stream消息堆积Redis Stream 的消息存在内存里。消费跟不上生产时消息持续堆积最终 Redis 内存溢出。Redis 7.0 的MAXLEN近似裁剪XADD stream MAXLEN ~ 10000是不可逆的——被裁剪的消息直接丢失。做法设置合理的MAXLEN配合监控告警。PEL 深度超过阈值时触发限流或扩容。消息量不可控时换 Kafka 或 RabbitMQ。没有延迟消息Redis Stream 不支持延迟投递。如果需要30 分钟后执行得自己实现——通常用 Sorted Set以执行时间戳作为 score定时轮询到期消息。适用边界消息量超过内存Redis 是内存数据库TB 级别的消息量应该用 Kafka。严格顺序消费者组不保证全局顺序只保证同一分区内有序。需要全局顺序的话只能单消费者吞吐量会大幅下降。事务性消费ACK 和业务处理不在同一个事务中。业务处理成功但 ACK 失败会导致重复消费必须用幂等消费兜底。五、小结Redis Stream 适合中小规模的消息管道。可靠性机制需要自己搭XCLAIM 做重投递幂等键防重复死信队列管异常。双循环消费 回收是核心模式。但如果消息量超过内存、需要延迟投递或严格顺序还是选专业的消息中间件更省心。