Back to Blogs
rabbitmq

Publisher Confirms

介紹 Producer 端如何透過 Publisher Confirms 機制確認訊息已成功送達 Broker,以及 Client 端處理 ack 的三種策略

Aaron

publish() 成功不代表 broker 收到

AMQP 預設不告訴 producer 訊息有沒有送達——這不是 bug,是讓你自己在吞吐量與可靠性之間選擇。publish() return 只代表「訊息已寫進 TCP buffer」,接下來幾個環節都可能出事:資料還在 buffer 時網路斷線、訊息在網路路徑上 broker crash、broker 收到但還沒落盤、或訊息抵達 exchange 卻找不到任何 binding 被默默丟掉。

對於使用者點擊紀錄這類可丟資料沒差,但付款通知、訂單、庫存扣減丟一筆就是事故,必須靠 Publisher Confirms 讓 broker 主動回報「我收到了、也處理好了」。

為什麼不用 AMQP Transaction

AMQP 原生的 tx.select/tx.commit 看起來像 DB transaction 很親切,但幾乎沒人用——完全同步阻塞,channel 其他操作全被序列化,吞吐比 fire-and-forget 慢 2–10 倍。Publisher Confirms 聰明在於沒有強制同步:broker 處理完後主動送 basic.ack frame,producer 可以選擇同步等、批次收、或 callback 處理。對本來就要做 fsync 的 broker 來說多送一個 frame 幾乎沒成本。

機制可靠性效能推薦
AMQP Transaction比 fire-forget 慢 2–10x不推薦
Publisher Confirms幾乎無損耗推薦
Fire-and-Forget基準僅限可丟資料

開啟 Confirm 模式

aio_pika 預設就開啟,大多數時候什麼都不用做:

connection = await aio_pika.connect_robust("amqp://localhost/")
channel = await connection.channel(publisher_confirms=True)  # 預設 True
exchange = await channel.declare_exchange("orders", aio_pika.ExchangeType.DIRECT)

開啟後每次 publish() 等 broker ack 才 return,broker nack 會拋例外。

Client 端處理 ack 的三種策略

Broker 端只有一種機制:channel 進入 confirm mode 後,每筆訊息處理完回 basic.ackbasic.nack。以下三種「模式」的差異完全在 client 端怎麼收這些 ack,不是 broker 提供的不同功能。

單筆同步

aio_pika 預設行為:每發一筆 await 到 ack 再發下一筆。

await exchange.publish(
    aio_pika.Message(body=b"order #123"),
    routing_key="order.created",
)

語意最清楚,publish return 就代表送達成功,debug 容易。但慢——同機房 0.5ms RTT 單執行緒上限約 2000 msg/s,跨 AZ 5ms RTT 只剩 200 msg/s。低頻關鍵訊息(財務、審計)唯一合理選擇。

批次同步

把多筆 publish 平行發出再集中等 ack,讓訊息在網路上並行傳輸隱藏 RTT。

messages = [aio_pika.Message(body=f"msg-{i}".encode()) for i in range(100)]
tasks = [exchange.publish(m, routing_key="batch") for m in messages]
await asyncio.gather(*tasks)

吞吐通常是單筆同步的 10–50 倍,複雜度只多一個 gather。缺點是 batch 內有 nack 時 gather 會整批失敗,只能整批重發(需消費端冪等)或用 return_exceptions=True 逐筆檢查。

異步 Callback

完全不 await 持續發送,broker ack/nack 透過 callback 通知,producer 自己維護 pending set。

pending: dict[int, aio_pika.Message] = {}

async def publish_async(message: aio_pika.Message):
    result = await exchange.publish(message, routing_key="events")
    pending[result.delivery_tag] = message

def on_confirm(delivery_tag: int, multiple: bool, is_ack: bool):
    if multiple:
        to_remove = [t for t in pending if t <= delivery_tag]
    else:
        to_remove = [delivery_tag]
    for t in to_remove:
        msg = pending.pop(t)
        if not is_ack:
            handle_nack(msg)

注意 multiple=True——broker 會把多筆連續 ack 合併成一個 frame,代表「到這個 tag 為止全部確認」。沒處理好會讓訊息永遠留在 pending set 變記憶體洩漏。

吞吐可以接近網路頻寬上限、每秒幾十萬筆,但要自己處理 pending 記憶體管理、nack 重試、逾時判斷、程序重啟狀態恢復,只建議高頻 event stream / metrics pipeline 場景用,最好用現成 library 而不是從零寫。

模式吞吐複雜度場景
單筆同步低(數百~數千)最低低頻關鍵訊息
批次同步中高(數萬)批次匯入、日誌批次
異步 Callback極高(數十萬)高頻 stream / metrics

預設用單筆同步,成為瓶頸才升級。

ACK vs NACK

basic.ack 代表 broker 已完整處理。對 persistent message 來說 ack 代表訊息已寫入磁碟(fsync 完成),queue durable 的話 broker 重啟也不丟。

basic.nack 代表 broker 收到但無法處理——通常是 broker 內部問題(磁碟寫入失敗、資源耗盡、異常狀態)。正確反應是 log、告警、重試;多次 nack 就是基礎設施問題要立刻介入。

注意:收到 ack 不代表訊息被 consumer 處理了,甚至不代表被路由到任何 queue。Ack 只到「broker 做完 broker 能做的事」。

Mandatory + Return Listener

Publisher Confirms 只保證送到 broker,不保證路由到 queue。Publish 到沒有任何 binding 符合的 exchange 時,broker 會默默丟掉訊息並且回 ack——從 producer 看一切正常,但訊息已消失。

mandatory=True 要求「至少一個 queue 收到,否則退回」:

try:
    await exchange.publish(
        Message(body=b"order #123"),
        routing_key="order.created",
        mandatory=True,
    )
except aio_pika.exceptions.DeliveryError as e:
    log.error("no route for message", exc_info=e)

底層機制是 AMQP 的 basic.return frame——broker 把無法路由的訊息原封不動退回 producer。在 pika 這類底層 client 裡你需要自己註冊一個 return listenerchannel.add_on_return_callback)來接收退回訊息;aio_pika 把這一步包成了 DeliveryError exception,所以上面的寫法看不到 callback,但背後就是 return listener 在運作。

收到 return 通常是系統設定錯誤(routing key 拼錯、queue 還沒建立),要當成嚴重錯誤告警,不要悄悄忽略。

可靠性是 producer + broker 合作

RabbitMQ broker 自己不無條件保證持久性——它提供一組機制,producer 必須主動驗證每個環節。這個心態決定了你寫 producer 的方式:每則重要訊息都要檢查 broker 有沒有 ack、路由有沒有成功、訊息是不是 persistent。broker 不會猜你要什麼保證,只按 flag 做事。

衍生:RabbitMQ 內部的訊息移動(DLX 轉送、主 queue 到 retry queue)沒有 producer,所以這些環節沒有 publisher confirm 可依賴,只能靠事前配置測試與監控告警補強,詳見 DLX 篇

可靠發送的完整組合

保證訊息送達 queue 並不遺失需要三個條件同時成立:

  1. Publisher Confirms:broker 真的收到並處理
  2. Mandatory + Return Listener:至少被路由到一個 queue
  3. Durable Queue + Persistent Message:進 queue 後重啟不失(詳見 Message Persistence & Durability

一個參考實作:

async def reliable_publish(exchange, routing_key, body):
    message = aio_pika.Message(
        body=body,
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
        message_id=str(uuid.uuid4()),
    )
    try:
        await exchange.publish(message, routing_key=routing_key, mandatory=True)
    except aio_pika.exceptions.DeliveryError:
        log.error("routing failed", routing_key=routing_key)
        raise
    except Exception:
        log.exception("publish failed", routing_key=routing_key)
        raise