メッセージングシステムで送信されたメッセージが正常に処理されなかった場合どうするのか?
RabbitMQ 2.8 以降では expire したメッセージや拒否されたメッセージを dead letter として専用の exchange に転送する事ができる。
dead letter になるユースケース
以下のケースで、メッセージは dead letter になる
- The message is rejected (basic.reject or basic.nack) with requeue=false
- The TTL for the message expires
Enterprise Integratin Patterns との対比
Gregor Hohpe & Bobby Woolf の “Enterprise Integratin Patterns” では
- 前者は Invalid Message Channel パターン(P.115)
- 後者は Dead Letter Channel パターン(P.119)
に対応
Invalid Message Channel
The receiver should move the improper message to an Invalid Message Channel, a special channel for messages that could not be processed by their receivers.
Dead Letter Channel
When a messaging system determines that it cannot or should not deliver a message, it may elect to move the message to a Dead Letter Channel.
Message Lifecycle
メッセージのライフサイクルは下図のようになる。
Dead Letter Exchange の使い方
キューの宣言で arguments
に x-dead-letter-exchange
を指定する。
channel.queue_declare(queue='task_queue', arguments={ 'x-message-ttl' : 1000, "x-dead-letter-exchange" : "dlx", "x-dead-letter-routing-key" : "dl", # if not specified, queue's routing-key is used } )
x-dead-letter-routing-key
を指定しなかった場合、 メッセージに設定された x-dead-letter-routing-key
が利用される。
Dead-Lettered Message の特徴
- confirm mode
- exchange 名が dead letter exchange のものに書き換えられる
- ルーティングキーが書き換えられる(
x-dead-letter-routing-key
を指定した場合) 'x-death'
ヘッダーが追加される。
ヘッダーで特に有益なのは dead letter の原因を表す reason
フィールド。以下のいずれかの値を取る。
- expired – the TTL of the message expired
- rejected – the message was rejected with requeue=false
注意点としては、 dead letter message を受け取るまでは invalid message で reject されたのか expire したのか判断がつかない。
x-death ヘッダーの詳細は Dead Letter Exchanges のドキュメントを参照のこと。
プログラミング例
“Work Queues” チュートリアルをベースにして、 キューに TTL を追加し、ワーカーで basic.reject を実装し、意図的に dead letter を発生させて動作を確認。
TTLを設定
メッセージに TTL(time to live)を設定するには、キューの宣言で arguments
に x-message-ttl
をミリ秒で指定する。
http://www.rabbitmq.com/ttl.html
channel.queue_declare(queue='task_queue', arguments={ 'x-message-ttl' : 1000, } )
メッセージを拒否
キューのコールバック関数で basic_reject
すれば OK.
コード全体
[gist https://gist.github.com/3844767 /]プログラムの実行
producer
メッセージを5つ送信
$ python new_task.py a [x] Sent 'a' $ python new_task.py b [x] Sent 'b' $ python new_task.py c [x] Sent 'c' $ python new_task.py d [x] Sent 'd' $ python new_task.py e [x] Sent 'e'
worker
1/2 の確率で sleep または reject する
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'a' [x] Done [x] Received 'b' [x] Rejected [x] Received 'e' [x] Done
dead-letter worker
reject されたメッセージ または TTL が過ぎたメッセージを処理する。
$ python receive_dl.py [*] Waiting for dead-letters. To exit press CTRL+C [x] <BasicProperties(["headers={'x-death': [{'queue': 'task_queue', 'reason': 'expired', 'exchange': '', 'routing-keys': ['task_queue'], 'time': datetime.datetime(2012, 10, 6, 11, 53, 22)}]}"])> [reason] : expired : 'c' [x] <BasicProperties(["headers={'x-death': [{'queue': 'task_queue', 'reason': 'expired', 'exchange': '', 'routing-keys': ['task_queue'], 'time': datetime.datetime(2012, 10, 6, 11, 53, 24)}]}"])> [reason] : expired : 'd' [x] <BasicProperties(["headers={'x-death': [{'queue': 'task_queue', 'reason': 'rejected', 'exchange': '', 'routing-keys': ['task_queue'], 'time': datetime.datetime(2012, 10, 6, 11, 53, 24)}]}"])> [reason] : rejected : 'b'
References
- RabbitMQ Docs : Dead Letter Exchanges
http://www.rabbitmq.com/dlx.html - Apache Camel : dead letter channel pattern
http://camel.apache.org/dead-letter-channel.html - Gregor Hohpe & Bobby Woolf : “Enterprise Integratin Patterns”, Addison-Wesley, 2003.
http://www.eaipatterns.com/