Erlang で書かれたメッセージングミドルウェア RabbitMQ でリクエスト・リプライパターンを実現する方法を調べた。
Request-Reply の処理の流れ
メッセージキューは一般に producer から consumer への fire & forget な一方向通信を行う。
一方で、リクエスト・リプライパターンでは、メッセージを受信したあと、送信者へレスポンスを返信する。
メッセージングシステムは多数あれど、やっていることはだいたい同じ。
- requester-replier 間でチャンネルを用意
- 他の requester が返信を受け取れないように、返信用のプライベート(point-to-point)なキュー(テンポラリーキューなど)を用意
- requester がメッセージを replier に送信。メッセージには作成した返信先アドレスと送信リクエストを特定するための correlation id を含める。
- replier はプライベートキュー経由で requester にメッセージを返信。
- requester はプライベートキューからメッセージを受信し、 correlation id に応じて処理を変える。
RabbitMQ での実装上のポイント
RabbitMQ のサイトでは、(用語はともかくとして)チュートリアルの “6:Remote procedure call implementation” で request-reply パターンが扱われている。
テンポラリーキューの作成方法
queue_declare の時にキュー名を指定(queue=xxx)しなければ、RabbitMQ がランダムなキュー名を振ってくれる。
プライベートキューの作成方法
queue_declare の時にキューを排他宣言(exclusive=True)すれば、他の consumer はキューを受け取れなくなる。
返信先・correlation idの指定方法
requester がメッセージを送信する際のBasicPropertiesで reply_to と correlation_id を指定する。
リクエスト送信のたびにテンポラリーキューを作成するなどの一連の処理のオーバーヘッドが気にならないのであれば、correlation_id は不要。replier->requester の exclusive なキューを用意したあとは、correlation_id を変えて何度もリクエストすることを想定している。
返信方法
replier が返信する際は、exchange を空にし(=default exchange)、routing_key に requester が送信してきた reply_to を設定する。
Default Exchange
The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.
via RabbitMQ : AMQP 0-9-1 Model Explainedwhen the RPC server publishes its reply message to RabbitMQ without an exchange specified, RabbitMQ knows that it’s targeted for a reply queue and that the routing key is the queue’s name.
via “RabbitMQ in Action: Distributed Messaging for Everyone” P. 82
RabbitMQ での実装例
チュートリアルの RPC をさらに単純化して echo サーバにすると、以下の様になる:
Server Code
#!/usr/bin/env python # a la http://www.rabbitmq.com/tutorials/tutorial-six-python.html import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def on_request(ch, method, props, body): ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(on_request, queue='rpc_queue') print " [x] Awaiting RPC requests" channel.start_consuming()
Client Code
#!/usr/bin/env python # a la http://www.rabbitmq.com/tutorials/tutorial-six-python.html import pika import uuid import sys class EchoRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, body): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=body) while self.response is None: self.connection.process_data_events() return self.response echo_rpc = EchoRpcClient() msg = sys.argv[1] if len(sys.argv) > 1 else 'dummy' print " [x] Requesting %s" % msg response = echo_rpc.call(msg) print " [.] Got %s" % response
実行例
$ python rpc_server.py & [1] 10701 [x] Awaiting RPC requests $ python rpc_client.py [x] Requesting dummy [.] Got dummy $ python rpc_client.py 'request-reply pattern' [x] Requesting request-reply pattern [.] Got request-reply pattern
References
- Alvaro Videla and Jason J.W. Williams : RabbitMQ in Action
§4.3 Remember me: RPC over RabbitMQ and waiting for answers - JMS Request/Reply Example
http://www.eaipatterns.com/RequestReplyJmsExample.html - Oracle GlassFish Server Message Queue
- Apache Qpid Request/Reply Pattern
- Apache Camel Request/Reply Pattern
- Michael Kerrisk : The Linux Programming Interface.
§46.7 Client-Server Programming with Message Queues
System V Message Queue Server-Client Pattern