返回文章列表
rabbitmq

常見 Message Queue Pattern

整理 RabbitMQ 常見的使用模式,包含 Work Queue、Publish/Subscribe、Routing、Topic、RPC 與 Priority Queue

Aaron

Pattern 是把 RabbitMQ 各種原料(exchange、queue、confirm、DLX、prefetch)組裝成解決方案的配方。以下是幾個最常用的。

Work Queue

一群 worker 從同一個 queue 搶任務做,每則訊息只會被其中一個拿到(competing consumers)。適合圖片處理、影片轉檔、PDF 生成、email 發送等可水平擴展的慢工。

# Producer
await channel.default_exchange.publish(
    aio_pika.Message(
        body=b"resize image #42",
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
    ),
    routing_key="image_tasks",
)

# Worker
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("image_tasks", durable=True)
async with queue.iterator() as iter:
    async for msg in iter:
        async with msg.process():
            await do_resize(msg.body)

四個關鍵設定:durable queue、persistent message、prefetch_count=1、manual ACK。少任何一個都會在某處丟訊息或分配不均。

Publish / Subscribe

一則訊息要送給所有訂閱者。例如「訂單成立」要同時觸發庫存、通知、統計。核心是 fanout exchange 加上每個 consumer 自己一個 queue——共用 queue 會退化成 work queue 的競爭模式。

# Producer
exchange = await channel.declare_exchange("events", aio_pika.ExchangeType.FANOUT)
await exchange.publish(aio_pika.Message(body=b"order.created"), routing_key="")

# Consumer
queue = await channel.declare_queue("inventory_handler", durable=True)
await queue.bind(exchange)

Routing

按規則送到特定的幾個 queue,對應 direct exchange。典型例子是日誌分流——error 送告警與歸檔、warning/info 只歸檔。

exchange = await channel.declare_exchange("logs", aio_pika.ExchangeType.DIRECT)

error_q = await channel.declare_queue("errors")
await error_q.bind(exchange, routing_key="error")

log_q = await channel.declare_queue("all_logs")
for level in ["error", "warning", "info"]:
    await log_q.bind(exchange, routing_key=level)

一個 queue 可以綁多個 routing key,達成「或」邏輯的訂閱。

Topic

事件有多個維度(例如「模組.動作.屬性」)時用 topic exchange,* 匹配一層、# 匹配任意層。Producer 送 order.created.viporder.*.* 收所有訂單、#.vip 收所有 VIP、order.created.# 收所有新訂單。詳見 Exchange Types 那篇。

RPC

Client 發請求、server 處理、server 回傳。Client 用 reply_to 指定回覆 queue,用 correlation_id 配對請求與回應。

callback_queue = await channel.declare_queue("", exclusive=True)
correlation_id = str(uuid.uuid4())
await channel.default_exchange.publish(
    aio_pika.Message(
        body=b"compute 2+2",
        correlation_id=correlation_id,
        reply_to=callback_queue.name,
    ),
    routing_key="rpc_queue",
)

async with callback_queue.iterator() as iter:
    async for msg in iter:
        if msg.correlation_id == correlation_id:
            print("result:", msg.body)
            await msg.ack()
            break

RPC over MQ 不一定是好主意。多一層 broker 多一層延遲與失敗模式,一般 API 用 HTTP/gRPC 更直接。只有在跨網路區段、需要天然負載平衡、或已全面使用 RabbitMQ 時才值得。

Priority Queue

queue = await channel.declare_queue(
    "tasks",
    arguments={"x-max-priority": 10},
)
await channel.default_exchange.publish(
    aio_pika.Message(body=b"urgent!", priority=9),
    routing_key="tasks",
)

限制:優先級數字不建議超過 10(內部為每個優先級維護索引)、只在單一 queue 內生效吞吐量會略降。適合「大部分同級、少數需插隊」,不是細粒度排程器。

Delay Queue

兩種實作:

  • TTL + DLX:訊息進無 consumer 的 queue 設 TTL,到期被 dead letter 轉到處理 queue。純配置,但同 queue 不同 TTL 會 head-of-line blocking。
  • rabbitmq_delayed_message_exchange 插件:真正的 per-message delay,不互相阻塞。需要安裝插件,某些託管服務可能不可用。

固定 TTL 用 DLX,不同 TTL 用 plugin。

Consumer Group(Pub/Sub + Work Queue)

Kafka consumer group 的語義在 RabbitMQ 靠組合模式達成:topic/fanout exchange 把訊息複製給多個 group queue,每個 group queue 內跑 work queue。

                      → Group A Queue → [A1, A2, A3]
Topic Exchange →
                      → Group B Queue → [B1, B2]

每個 group 可獨立擴縮容、獨立訂閱、獨立設 DLX。缺點是新增 group 要手動宣告 queue 與 binding,不如 Kafka 動態。

選型速查表

需求推薦 Pattern
多個 worker 分攤任務Work Queue
所有訂閱者都要收到Pub/Sub (Fanout)
固定分類分流Routing (Direct)
彈性訂閱多維度事件Topic
請求回應RPC(需斟酌)
某些訊息要插隊Priority Queue
延遲執行Delay Queue
多組訂閱者各自消費Pub/Sub + Work Queue