常見 Message Queue Pattern
整理 RabbitMQ 常見的使用模式,包含 Work Queue、Publish/Subscribe、Routing、Topic、RPC 與 Priority Queue
Pattern 是把 RabbitMQ 各種原料(exchange、queue、confirm、DLX、prefetch)組裝成解決方案的配方。以下是幾個最常用的。
Work Queue

一群 worker 從同一個 queue 搶任務做,每則訊息只會被其中一個拿到(competing consumers)。適合圖片處理、影片轉檔、PDF 生成、email 發送等可水平擴展的慢工。
# Producer
await channel.default_exchange.publish(
aio_pika.Message(
body=b"resize image #42",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key="image_tasks",
)
# Worker
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("image_tasks", durable=True)
async with queue.iterator() as iter:
async for msg in iter:
async with msg.process():
await do_resize(msg.body)
四個關鍵設定:durable queue、persistent message、prefetch_count=1、manual ACK。少任何一個都會在某處丟訊息或分配不均。
Publish / Subscribe

一則訊息要送給所有訂閱者。例如「訂單成立」要同時觸發庫存、通知、統計。核心是 fanout exchange 加上每個 consumer 自己一個 queue——共用 queue 會退化成 work queue 的競爭模式。
# Producer
exchange = await channel.declare_exchange("events", aio_pika.ExchangeType.FANOUT)
await exchange.publish(aio_pika.Message(body=b"order.created"), routing_key="")
# Consumer
queue = await channel.declare_queue("inventory_handler", durable=True)
await queue.bind(exchange)
Routing

按規則送到特定的幾個 queue,對應 direct exchange。典型例子是日誌分流——error 送告警與歸檔、warning/info 只歸檔。
exchange = await channel.declare_exchange("logs", aio_pika.ExchangeType.DIRECT)
error_q = await channel.declare_queue("errors")
await error_q.bind(exchange, routing_key="error")
log_q = await channel.declare_queue("all_logs")
for level in ["error", "warning", "info"]:
await log_q.bind(exchange, routing_key=level)
一個 queue 可以綁多個 routing key,達成「或」邏輯的訂閱。
Topic
事件有多個維度(例如「模組.動作.屬性」)時用 topic exchange,* 匹配一層、# 匹配任意層。Producer 送 order.created.vip,order.*.* 收所有訂單、#.vip 收所有 VIP、order.created.# 收所有新訂單。詳見 Exchange Types 那篇。
RPC

Client 發請求、server 處理、server 回傳。Client 用 reply_to 指定回覆 queue,用 correlation_id 配對請求與回應。
callback_queue = await channel.declare_queue("", exclusive=True)
correlation_id = str(uuid.uuid4())
await channel.default_exchange.publish(
aio_pika.Message(
body=b"compute 2+2",
correlation_id=correlation_id,
reply_to=callback_queue.name,
),
routing_key="rpc_queue",
)
async with callback_queue.iterator() as iter:
async for msg in iter:
if msg.correlation_id == correlation_id:
print("result:", msg.body)
await msg.ack()
break
RPC over MQ 不一定是好主意。多一層 broker 多一層延遲與失敗模式,一般 API 用 HTTP/gRPC 更直接。只有在跨網路區段、需要天然負載平衡、或已全面使用 RabbitMQ 時才值得。
Priority Queue
queue = await channel.declare_queue(
"tasks",
arguments={"x-max-priority": 10},
)
await channel.default_exchange.publish(
aio_pika.Message(body=b"urgent!", priority=9),
routing_key="tasks",
)
限制:優先級數字不建議超過 10(內部為每個優先級維護索引)、只在單一 queue 內生效、吞吐量會略降。適合「大部分同級、少數需插隊」,不是細粒度排程器。
Delay Queue
兩種實作:
- TTL + DLX:訊息進無 consumer 的 queue 設 TTL,到期被 dead letter 轉到處理 queue。純配置,但同 queue 不同 TTL 會 head-of-line blocking。
rabbitmq_delayed_message_exchange插件:真正的 per-message delay,不互相阻塞。需要安裝插件,某些託管服務可能不可用。
固定 TTL 用 DLX,不同 TTL 用 plugin。
Consumer Group(Pub/Sub + Work Queue)
Kafka consumer group 的語義在 RabbitMQ 靠組合模式達成:topic/fanout exchange 把訊息複製給多個 group queue,每個 group queue 內跑 work queue。
→ Group A Queue → [A1, A2, A3]
Topic Exchange →
→ Group B Queue → [B1, B2]
每個 group 可獨立擴縮容、獨立訂閱、獨立設 DLX。缺點是新增 group 要手動宣告 queue 與 binding,不如 Kafka 動態。
選型速查表
| 需求 | 推薦 Pattern |
|---|---|
| 多個 worker 分攤任務 | Work Queue |
| 所有訂閱者都要收到 | Pub/Sub (Fanout) |
| 固定分類分流 | Routing (Direct) |
| 彈性訂閱多維度事件 | Topic |
| 請求回應 | RPC(需斟酌) |
| 某些訊息要插隊 | Priority Queue |
| 延遲執行 | Delay Queue |
| 多組訂閱者各自消費 | Pub/Sub + Work Queue |