Dead Letter Channel Pattern with RabbitMQ

メッセージングシステムで送信されたメッセージが正常に処理されなかった場合どうするのか?
RabbitMQ 2.8 以降では expire したメッセージや拒否されたメッセージを dead letter として専用の exchange に転送する事ができる。

dead letter になるユースケース
以下のケースで、メッセージは dead letter になる

  • The message is rejected (basic.reject or basic.nack) with requeue=false
  • The TTL for the message expires

Enterprise Integratin Patterns との対比

Gregor Hohpe & Bobby Woolf の “Enterprise Integratin Patterns” では

に対応

Invalid Message Channel

The receiver should move the improper message to an Invalid Message Channel, a special channel for messages that could not be processed by their receivers.

Dead Letter Channel

What will the messaging system do with a message it cannot deliver?

When a messaging system determines that it cannot or should not deliver a message, it may elect to move the message to a Dead Letter Channel.

Message Lifecycle
メッセージのライフサイクルは下図のようになる。

Dead Letter Exchange の使い方

キューの宣言で arguments x-dead-letter-exchange を指定する。

channel.queue_declare(queue='task_queue',
  arguments={
  'x-message-ttl' : 1000,
  "x-dead-letter-exchange" : "dlx",
  "x-dead-letter-routing-key" : "dl", # if not specified, queue's routing-key is used
  }
)

x-dead-letter-routing-key を指定しなかった場合、 メッセージに設定された x-dead-letter-routing-key が利用される。

Dead-Lettered Message の特徴

  • confirm mode
  • exchange 名が dead letter exchange のものに書き換えられる
  • ルーティングキーが書き換えられる(x-dead-letter-routing-key を指定した場合)
  • 'x-death' ヘッダーが追加される。

ヘッダーで特に有益なのは dead letter の原因を表す reason フィールド。以下のいずれかの値を取る。

  • expired – the TTL of the message expired
  • rejected – the message was rejected with requeue=false

注意点としては、 dead letter message を受け取るまでは invalid message で reject されたのか expire したのか判断がつかない。

x-death ヘッダーの詳細は Dead Letter Exchanges のドキュメントを参照のこと。

プログラミング例

“Work Queues” チュートリアルをベースにして、 キューに TTL を追加し、ワーカーで basic.reject を実装し、意図的に dead letter を発生させて動作を確認。

TTLを設定

メッセージに TTL(time to live)を設定するには、キューの宣言で arguments x-message-ttl をミリ秒で指定する。
http://www.rabbitmq.com/ttl.html

channel.queue_declare(queue='task_queue',
  arguments={
  'x-message-ttl' : 1000,
  }
)

メッセージを拒否

キューのコールバック関数で basic_reject すれば OK.

コード全体

[gist https://gist.github.com/3844767 /]

プログラムの実行

producer

メッセージを5つ送信

$ python new_task.py a
 [x] Sent 'a'
$ python new_task.py b
 [x] Sent 'b'
$ python new_task.py c
 [x] Sent 'c'
$ python new_task.py d
 [x] Sent 'd'
$ python new_task.py e
 [x] Sent 'e'

worker
1/2 の確率で sleep または reject する

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'a'
 [x] Done
 [x] Received 'b'
 [x] Rejected
 [x] Received 'e'
 [x] Done

dead-letter worker

reject されたメッセージ または TTL が過ぎたメッセージを処理する。

$ python receive_dl.py
 [*] Waiting for dead-letters. To exit press CTRL+C
 [x] <BasicProperties(["headers={'x-death': [{'queue': 'task_queue', 'reason': 'expired', 'exchange': '', 'routing-keys': ['task_queue'], 'time': datetime.datetime(2012, 10, 6, 11, 53, 22)}]}"])>
 [reason] : expired : 'c'
 [x] <BasicProperties(["headers={'x-death': [{'queue': 'task_queue', 'reason': 'expired', 'exchange': '', 'routing-keys': ['task_queue'], 'time': datetime.datetime(2012, 10, 6, 11, 53, 24)}]}"])>
 [reason] : expired : 'd'
 [x] <BasicProperties(["headers={'x-death': [{'queue': 'task_queue', 'reason': 'rejected', 'exchange': '', 'routing-keys': ['task_queue'], 'time': datetime.datetime(2012, 10, 6, 11, 53, 24)}]}"])>
 [reason] : rejected : 'b'

References

Leave a comment