Request-Reply Pattern with RabbitMQ

Erlang で書かれたメッセージングミドルウェア RabbitMQ でリクエスト・リプライパターンを実現する方法を調べた。

Request-Reply の処理の流れ

メッセージキューは一般に producer から consumer への fire & forget な一方向通信を行う。
一方で、リクエスト・リプライパターンでは、メッセージを受信したあと、送信者へレスポンスを返信する。

メッセージングシステムは多数あれど、やっていることはだいたい同じ。

  1. requester-replier 間でチャンネルを用意
  2. 他の requester が返信を受け取れないように、返信用のプライベート(point-to-point)なキュー(テンポラリーキューなど)を用意
  3. requester がメッセージを replier に送信。メッセージには作成した返信先アドレスと送信リクエストを特定するための correlation id を含める。
  4. replier はプライベートキュー経由で requester にメッセージを返信。
  5. requester はプライベートキューからメッセージを受信し、 correlation id に応じて処理を変える。

RabbitMQ での実装上のポイント

RabbitMQ のサイトでは、(用語はともかくとして)チュートリアルの “6:Remote procedure call implementation” で request-reply パターンが扱われている。

テンポラリーキューの作成方法
queue_declare の時にキュー名を指定(queue=xxx)しなければ、RabbitMQ がランダムなキュー名を振ってくれる。

プライベートキューの作成方法
queue_declare の時にキューを排他宣言(exclusive=True)すれば、他の consumer はキューを受け取れなくなる。

返信先・correlation idの指定方法
requester がメッセージを送信する際のBasicPropertiesで reply_to と correlation_id を指定する。
リクエスト送信のたびにテンポラリーキューを作成するなどの一連の処理のオーバーヘッドが気にならないのであれば、correlation_id は不要。replier->requester の exclusive なキューを用意したあとは、correlation_id を変えて何度もリクエストすることを想定している。

返信方法
replier が返信する際は、exchange を空にし(=default exchange)、routing_key に requester が送信してきた reply_to を設定する。

Default Exchange
The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.
via RabbitMQ : AMQP 0-9-1 Model Explained 

when the RPC server publishes its reply message to RabbitMQ without an exchange specified, RabbitMQ knows that it’s targeted for a reply queue and that the routing key is the queue’s name.
via “RabbitMQ in Action: Distributed Messaging for Everyone” P. 82

RabbitMQ での実装例

チュートリアルの RPC をさらに単純化して echo サーバにすると、以下の様になる:

Server Code

#!/usr/bin/env python
# a la http://www.rabbitmq.com/tutorials/tutorial-six-python.html
import pika

connection = pika.BlockingConnection(
  pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')

def on_request(ch, method, props, body):
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = props.correlation_id),
                     body=body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()

Client Code

#!/usr/bin/env python
# a la http://www.rabbitmq.com/tutorials/tutorial-six-python.html
import pika
import uuid
import sys

class EchoRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, body):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=body)
        while self.response is None:
            self.connection.process_data_events()
        return self.response

echo_rpc = EchoRpcClient()

msg = sys.argv[1] if len(sys.argv) > 1 else 'dummy'
print " [x] Requesting %s" % msg
response = echo_rpc.call(msg)
print " [.] Got %s" % response

実行例

$ python rpc_server.py  &
[1] 10701
 [x] Awaiting RPC requests
$ python rpc_client.py
 [x] Requesting dummy
 [.] Got dummy
$ python rpc_client.py 'request-reply pattern'
 [x] Requesting request-reply pattern
 [.] Got request-reply pattern

References

Leave a comment