Publisher Confirms
介紹 Producer 端如何透過 Publisher Confirms 機制確認訊息已成功送達 Broker,以及 Client 端處理 ack 的三種策略

publish() 成功不代表 broker 收到
AMQP 預設不告訴 producer 訊息有沒有送達——這不是 bug,是讓你自己在吞吐量與可靠性之間選擇。publish() return 只代表「訊息已寫進 TCP buffer」,接下來幾個環節都可能出事:資料還在 buffer 時網路斷線、訊息在網路路徑上 broker crash、broker 收到但還沒落盤、或訊息抵達 exchange 卻找不到任何 binding 被默默丟掉。
對於使用者點擊紀錄這類可丟資料沒差,但付款通知、訂單、庫存扣減丟一筆就是事故,必須靠 Publisher Confirms 讓 broker 主動回報「我收到了、也處理好了」。
為什麼不用 AMQP Transaction
AMQP 原生的 tx.select/tx.commit 看起來像 DB transaction 很親切,但幾乎沒人用——完全同步阻塞,channel 其他操作全被序列化,吞吐比 fire-and-forget 慢 2–10 倍。Publisher Confirms 聰明在於沒有強制同步:broker 處理完後主動送 basic.ack frame,producer 可以選擇同步等、批次收、或 callback 處理。對本來就要做 fsync 的 broker 來說多送一個 frame 幾乎沒成本。
| 機制 | 可靠性 | 效能 | 推薦 |
|---|---|---|---|
| AMQP Transaction | 強 | 比 fire-forget 慢 2–10x | 不推薦 |
| Publisher Confirms | 強 | 幾乎無損耗 | 推薦 |
| Fire-and-Forget | 無 | 基準 | 僅限可丟資料 |
開啟 Confirm 模式
aio_pika 預設就開啟,大多數時候什麼都不用做:
connection = await aio_pika.connect_robust("amqp://localhost/")
channel = await connection.channel(publisher_confirms=True) # 預設 True
exchange = await channel.declare_exchange("orders", aio_pika.ExchangeType.DIRECT)
開啟後每次 publish() 等 broker ack 才 return,broker nack 會拋例外。
Client 端處理 ack 的三種策略
Broker 端只有一種機制:channel 進入 confirm mode 後,每筆訊息處理完回 basic.ack 或 basic.nack。以下三種「模式」的差異完全在 client 端怎麼收這些 ack,不是 broker 提供的不同功能。
單筆同步
aio_pika 預設行為:每發一筆 await 到 ack 再發下一筆。
await exchange.publish(
aio_pika.Message(body=b"order #123"),
routing_key="order.created",
)
語意最清楚,publish return 就代表送達成功,debug 容易。但慢——同機房 0.5ms RTT 單執行緒上限約 2000 msg/s,跨 AZ 5ms RTT 只剩 200 msg/s。低頻關鍵訊息(財務、審計)唯一合理選擇。
批次同步
把多筆 publish 平行發出再集中等 ack,讓訊息在網路上並行傳輸隱藏 RTT。
messages = [aio_pika.Message(body=f"msg-{i}".encode()) for i in range(100)]
tasks = [exchange.publish(m, routing_key="batch") for m in messages]
await asyncio.gather(*tasks)
吞吐通常是單筆同步的 10–50 倍,複雜度只多一個 gather。缺點是 batch 內有 nack 時 gather 會整批失敗,只能整批重發(需消費端冪等)或用 return_exceptions=True 逐筆檢查。
異步 Callback
完全不 await 持續發送,broker ack/nack 透過 callback 通知,producer 自己維護 pending set。
pending: dict[int, aio_pika.Message] = {}
async def publish_async(message: aio_pika.Message):
result = await exchange.publish(message, routing_key="events")
pending[result.delivery_tag] = message
def on_confirm(delivery_tag: int, multiple: bool, is_ack: bool):
if multiple:
to_remove = [t for t in pending if t <= delivery_tag]
else:
to_remove = [delivery_tag]
for t in to_remove:
msg = pending.pop(t)
if not is_ack:
handle_nack(msg)
注意 multiple=True——broker 會把多筆連續 ack 合併成一個 frame,代表「到這個 tag 為止全部確認」。沒處理好會讓訊息永遠留在 pending set 變記憶體洩漏。
吞吐可以接近網路頻寬上限、每秒幾十萬筆,但要自己處理 pending 記憶體管理、nack 重試、逾時判斷、程序重啟狀態恢復,只建議高頻 event stream / metrics pipeline 場景用,最好用現成 library 而不是從零寫。
| 模式 | 吞吐 | 複雜度 | 場景 |
|---|---|---|---|
| 單筆同步 | 低(數百~數千) | 最低 | 低頻關鍵訊息 |
| 批次同步 | 中高(數萬) | 低 | 批次匯入、日誌批次 |
| 異步 Callback | 極高(數十萬) | 高 | 高頻 stream / metrics |
預設用單筆同步,成為瓶頸才升級。
ACK vs NACK
basic.ack 代表 broker 已完整處理。對 persistent message 來說 ack 代表訊息已寫入磁碟(fsync 完成),queue durable 的話 broker 重啟也不丟。
basic.nack 代表 broker 收到但無法處理——通常是 broker 內部問題(磁碟寫入失敗、資源耗盡、異常狀態)。正確反應是 log、告警、重試;多次 nack 就是基礎設施問題要立刻介入。
注意:收到 ack 不代表訊息被 consumer 處理了,甚至不代表被路由到任何 queue。Ack 只到「broker 做完 broker 能做的事」。
Mandatory + Return Listener
Publisher Confirms 只保證送到 broker,不保證路由到 queue。Publish 到沒有任何 binding 符合的 exchange 時,broker 會默默丟掉訊息並且回 ack——從 producer 看一切正常,但訊息已消失。
mandatory=True 要求「至少一個 queue 收到,否則退回」:
try:
await exchange.publish(
Message(body=b"order #123"),
routing_key="order.created",
mandatory=True,
)
except aio_pika.exceptions.DeliveryError as e:
log.error("no route for message", exc_info=e)
底層機制是 AMQP 的 basic.return frame——broker 把無法路由的訊息原封不動退回 producer。在 pika 這類底層 client 裡你需要自己註冊一個 return listener(channel.add_on_return_callback)來接收退回訊息;aio_pika 把這一步包成了 DeliveryError exception,所以上面的寫法看不到 callback,但背後就是 return listener 在運作。
收到 return 通常是系統設定錯誤(routing key 拼錯、queue 還沒建立),要當成嚴重錯誤告警,不要悄悄忽略。
可靠性是 producer + broker 合作
RabbitMQ broker 自己不無條件保證持久性——它提供一組機制,producer 必須主動驗證每個環節。這個心態決定了你寫 producer 的方式:每則重要訊息都要檢查 broker 有沒有 ack、路由有沒有成功、訊息是不是 persistent。broker 不會猜你要什麼保證,只按 flag 做事。
衍生:RabbitMQ 內部的訊息移動(DLX 轉送、主 queue 到 retry queue)沒有 producer,所以這些環節沒有 publisher confirm 可依賴,只能靠事前配置測試與監控告警補強,詳見 DLX 篇。
可靠發送的完整組合
保證訊息送達 queue 並不遺失需要三個條件同時成立:
- Publisher Confirms:broker 真的收到並處理
- Mandatory + Return Listener:至少被路由到一個 queue
- Durable Queue + Persistent Message:進 queue 後重啟不失(詳見 Message Persistence & Durability)
一個參考實作:
async def reliable_publish(exchange, routing_key, body):
message = aio_pika.Message(
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
message_id=str(uuid.uuid4()),
)
try:
await exchange.publish(message, routing_key=routing_key, mandatory=True)
except aio_pika.exceptions.DeliveryError:
log.error("routing failed", routing_key=routing_key)
raise
except Exception:
log.exception("publish failed", routing_key=routing_key)
raise