返回文章列表
rabbitmq

Exchange 類型詳解與實作

深入解析 RabbitMQ 四種 Exchange 類型(Direct、Fanout、Topic、Headers)的運作原理、適用場景與程式碼實作範例

Aaron

為什麼需要 Exchange

RabbitMQ 的設計原則是producer 永遠不應該知道有哪些 queue 存在。Producer 只描述「這則訊息是什麼」,consumer 只宣告「我感興趣什麼」。雙向解耦讓你可以自由增減 consumer 而不動 producer,反之亦然——這是事件驅動架構的核心。

Producer 發訊息時帶 routing key(訊息的標籤),queue bind 時帶 binding key(訂閱條件),exchange 的工作就是拿 routing key 比對 binding key 決定送到哪些 queue。比對規則不同就形成不同的 exchange 類型。

Routing Key vs Binding Key

Routing KeyBinding Key
誰設定Producer publishQueue bind
時機每次 publishBind 時一次,之後不變
語意訊息是什麼Queue 想收什麼
# Binding Key:queue 的訂閱宣告
await queue.bind(exchange, routing_key="order.created")
# aio_pika 的參數名叫 routing_key 但語意是 binding

# Routing Key:訊息的標籤
await exchange.publish(
    aio_pika.Message(body=b"order #123 created"),
    routing_key="order.created",
)

aio_pika 在 queue.bind() 的參數也叫 routing_key 是 AMQP 歷史遺留命名,看誰在呼叫就能分辨。

Direct:精確比對

規則:routing key 和 binding key 字串完全相等。O(1) hash lookup,效能好、debug 容易。

一個 exchange 可以有多個 queue 綁同一個 binding key(廣播),一個 queue 可以用不同 binding key 多次綁到同一 exchange(訂多類)。沒匹配就丟掉(除非設 mandatory)。

典型場景是依類別分流

exchange = await channel.declare_exchange("logs_direct", aio_pika.ExchangeType.DIRECT)

await exchange.publish(
    aio_pika.Message(body=b"disk almost full"),
    routing_key="error",
)

error_queue = await channel.declare_queue("error_logs")
await error_queue.bind(exchange, routing_key="error")

alert_queue = await channel.declare_queue("alerts")
await alert_queue.bind(exchange, routing_key="error")
await alert_queue.bind(exchange, routing_key="warning")

當路由條件是有限、固定的一組類別時 Direct 通常是最佳選擇。

Fanout:無差別廣播

完全忽略 routing key,廣播給所有綁定的 queue。沒有字串比對,效能最好,開銷比 Topic 低一個量級。

典型場景是多子系統消費同一事件(訂單成立 → 庫存、通知、統計、風控)、即時通知廣播(10 台 WebSocket server 收同一則公告)、快取失效廣播

exchange = await channel.declare_exchange("cache_invalidate", aio_pika.ExchangeType.FANOUT)

await exchange.publish(
    aio_pika.Message(body=b"user:42"),
    routing_key="",  # fanout 中無意義
)

queue = await channel.declare_queue("", exclusive=True)
await queue.bind(exchange)

匿名 + exclusive=True 的 queue 在 connection 關閉時自動刪除,是 fanout 場景的常見組合。

Topic:帶萬用字元的模糊比對

Direct 的加強版,routing key 必須是. 分隔的多段字串order.created.vip),bind 時可用萬用字元:

字元意義
*剛好一段
#零段或多段

例:binding order.*.vip 匹配 order.created.vip,不匹配 order.vip(少一段)或 order.created.paid.vip(多一段)。binding order.# 匹配 orderorder.createdorder.created.vip.asia 全部。

exchange = await channel.declare_exchange("events", aio_pika.ExchangeType.TOPIC)

vip_queue = await channel.declare_queue("vip_handler")
await vip_queue.bind(exchange, routing_key="#.vip")

order_queue = await channel.declare_queue("order_handler")
await order_queue.bind(exchange, routing_key="order.created.*")

await exchange.publish(
    aio_pika.Message(body=b"new order #123"),
    routing_key="order.created.vip",
)
# 會被 vip_handler 和 order_handler 同時收到

設計 routing key 格式的經驗:最常被過濾的維度放前面。電商常用 {domain}.{entity}.{action}.{variant},例如 retail.order.created.vip

Headers:用訊息屬性路由

不看 routing key,用訊息 headers 比對 binding 的 header 條件。x-match=all AND、x-match=any OR。

exchange = await channel.declare_exchange("headers_ex", aio_pika.ExchangeType.HEADERS)

queue = await channel.declare_queue("pdf_reports")
await queue.bind(exchange, arguments={
    "x-match": "all",
    "format": "pdf",
    "type": "report",
})

await exchange.publish(
    aio_pika.Message(body=b"monthly report", headers={"format": "pdf", "type": "report"}),
    routing_key="",
)

幾乎沒人用:比 Topic 慢一個量級、可讀性差(規則藏在 header map)、大多數需求都能串成 routing key 用 Topic 解決(pdf.report.monthly)。除非屬性是動態產生不適合嵌進 routing key,否則先想想能不能用 Topic。

選型

類型路由依據效能彈性場景
DirectRouting key 完全相符日誌分流、任務分派
Fanout無(廣播)最高最低通知廣播、快取失效
Topic萬用字元匹配多維度事件、微服務事件流
HeadersHeaders 屬性幾乎不用

三個問題決定選型:所有 binding 都要收到嗎? → Fanout。路由條件是有限固定類別嗎? → Direct。需要層次化訂閱或通配符嗎? → Topic。

中大型系統通常同時使用多個 exchange,核心事件用 Topic、廣播用 Fanout、簡單分流用 Direct。

Default Exchange

RabbitMQ 啟動時會建立幾個預設 exchange。amq.direct / amq.fanout / amq.topic / amq.headers 行為跟自己宣告的一樣,但 Default Exchange(空字串 "")有特殊行為:每個 queue 宣告時會自動以 queue 名稱作為 binding key 綁到它上面,不需要 queue.bind()

await channel.declare_queue("task_queue", durable=True)

await channel.default_exchange.publish(
    aio_pika.Message(body=b"process me"),
    routing_key="task_queue",  # routing key = queue 名稱
)