Consumer Retry 策略
整理 Consumer 端訊息處理失敗時的重試策略,包含 Immediate Retry、Delayed Retry(搭配 DLX)與 Exponential Backoff

雲端環境裡 consumer 呼叫下游失敗是常態,所以 retry 不可避免。但沒想清楚的 retry 常常比不 retry 更糟——對下游狂打 retry 造成二次崩潰、poison message 燒 CPU 把其他訊息擋住,都是真實事故。好的 retry 要同時回答:什麼錯誤該重試?重試節奏怎麼控?
暫時性錯誤 vs 永久性錯誤
| 類別 | 例子 | 處理 |
|---|---|---|
| 暫時性錯誤 | 下游 timeout、網路抖動、DB 中斷、5xx | 有節奏地重試 |
| 永久性錯誤 | 格式錯誤、業務驗證失敗、資源已刪除 | 直接進 DLQ |
Consumer 程式碼要透過 exception 類型明確區分(例如 TransientError、PermanentError)。只有一個 except Exception 代表你還沒想清楚分類。
requeue=True 是陷阱
# 不要這樣寫
async with message.process(requeue=True):
await do_something(message.body)
看起來很直覺,但有四個問題:requeue 立刻發生,下游掛掉時 consumer 以每秒幾千次的速度狂打;Classic Queue 不追蹤 requeue 次數,沒有自然終止條件;CPU 與 channel 被吃滿讓其他訊息拿不到資源;訊息被放回 queue 頭部破壞順序,poison message 會永遠擋在最前面。
生產環境幾乎不該用 requeue=True,都改用 requeue=False 送進 DLX。
策略一:In-Memory Immediate Retry
Consumer 內部在 ack 之前本地重試幾次,適合 100ms 等級的瞬斷。
MAX_IN_MEMORY_ATTEMPTS = 3
async def handler(message: aio_pika.IncomingMessage):
for attempt in range(MAX_IN_MEMORY_ATTEMPTS):
try:
await do_work(message.body)
await message.ack()
return
except TransientError:
if attempt < MAX_IN_MEMORY_ATTEMPTS - 1:
await asyncio.sleep(0.2 * (2 ** attempt))
continue
await message.reject(requeue=False)
return
except PermanentError:
await message.reject(requeue=False)
return
優點是延遲極低、不走 queue round-trip。缺點是重試期間 consumer 阻塞、狀態無持久性(consumer 重啟計數歸零)。次數 2–3 次、總時間 < 1 秒為宜,作為第一層防線,更長的錯誤交給下一層。
策略二:Delayed Retry with DLX
生產最推薦的策略:把延遲放在 broker 端,用 TTL + DLX 讓訊息在 work queue 與 retry queue 之間循環,consumer 完全不需要等待。
work_ex = await channel.declare_exchange("work", aio_pika.ExchangeType.DIRECT)
work_queue = await channel.declare_queue(
"work.queue",
durable=True,
arguments={
"x-dead-letter-exchange": "retry",
"x-dead-letter-routing-key": "retry",
},
)
await work_queue.bind(work_ex, routing_key="task")
retry_ex = await channel.declare_exchange("retry", aio_pika.ExchangeType.DIRECT)
retry_queue = await channel.declare_queue(
"retry.queue",
durable=True,
arguments={
"x-message-ttl": 30000,
"x-dead-letter-exchange": "work",
"x-dead-letter-routing-key": "task",
},
)
await retry_queue.bind(retry_ex, routing_key="retry")
Consumer 失敗只要 reject(requeue=False),剩下交給 broker:
async def handler(message: aio_pika.IncomingMessage):
try:
await do_work(message.body)
await message.ack()
except TransientError:
await message.reject(requeue=False)
except PermanentError:
await send_to_parking_lot(message)
await message.ack()
完全非同步,consumer 不需要 sleep、不管理狀態,吞吐不受影響。訊息 persistent 下 broker 重啟也不丟。除非需要指數退避或 per-message 延遲,這套固定延遲 DLX retry 就能解決 90% 場景。
策略三:Exponential Backoff
固定延遲假設所有錯誤恢復時間差不多,但錯誤持續越久代表問題越嚴重,繼續用同樣間隔反而加重下游負擔。指數退避(1s/2s/4s/8s/16s/32s)讓延遲隨失敗次數成長。
實作一:多個固定 TTL 的 Retry Queue
建立 retry.1s / retry.2s / retry.4s … 各自 TTL 對應層級。Consumer 從 x-death.count 讀出已死幾次,送到對應層級。
async def handler(message: aio_pika.IncomingMessage):
try:
await do_work(message.body)
await message.ack()
return
except TransientError:
pass
except PermanentError:
await send_to_parking_lot(message)
await message.ack()
return
x_death = message.headers.get("x-death", [])
death_count = x_death[0]["count"] if x_death else 0
MAX_RETRY = 6
if death_count >= MAX_RETRY:
await send_to_parking_lot(message)
await message.ack()
return
delay_level = 2 ** death_count # 1, 2, 4, 8, 16, 32
await channel.default_exchange.publish(
aio_pika.Message(
body=message.body,
headers=message.headers,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key=f"retry.{delay_level}s",
)
await message.ack()
因為每個 retry queue 內部 TTL 一致,避開 head-of-line blocking。缺點是層級固定不易改。
實作二:延遲插件 + Per-Message TTL
用 rabbitmq_delayed_message_exchange 插件搭配 publish 時的 expiration,每筆訊息獨立計時。寫起來最優雅,但需要安裝插件且 pending 上限建議不超過 100 萬筆。
Parking Lot 與最大重試次數
無上限的重試等於沒有錯誤處理。超過上限的 poison message 應送進 parking lot queue(不自動重試、只給人工檢視)。處理流程是告警 → 看 x-death 歷史 → 判斷是 bug 還是髒資料 → 手動重送或刪除。
上限怎麼設?retry 總時間要小於「業務可接受的延遲上限」,又要夠長能吸收常見下游故障(通常 5 分鐘以上)。超過 10 次多半是浪費。
策略組合建議
| 錯誤特性 | 建議 |
|---|---|
| 極短暫(< 1 秒) | In-memory retry(2–3 次) |
| 中等持續(幾秒到幾分鐘) | DLX-based 固定延遲 retry |
| 不確定持續 | Exponential backoff + max retry |
| 下游可能長時間掛 | 指數退避 + 多次數 + 熔斷 |
| 永久性錯誤 | 不重試,直接進 DLQ |
典型完整流程:work queue → in-memory retry 2 次 → reject 進 retry queue → 6 層指數退避(1/2/4/8/16/32 秒)→ 超過送 parking lot → 告警。涵蓋毫秒到分鐘級,且有明確失敗邊界。