Advanced Message Queuing Protocol (AMQP) モデルでは Producer -> Exchange -> Binding(topic exchange の場合のみ) -> Queue とメッセージは流れる。ここで中心的な役割を果たす Exchange は主に fanout/direct/topic の3パターンがある。このうち Kestrel もサポートしている fanout を触ってみた。

AMQP の3つの Exchange

1. Fanout Exchange
fan-out exchange ではメッセージは exchange とつながっている各キューに配信される。

2. Direct Exchange
direct exchange ではメッセージの routing key に応じて配信先キューが振り分けられる。

3. Topic Exchange
topic exchange ではメッセージの routing key とブローカーの binding key に応じて配信先キューが振り分けられる。

# Exchange 画像は RedHat の Enterprise MRG のマニュアルから

Kestrel での fanout

kestrel ではキュー名の命名規則により、fanout として扱えるようにしている。

If a queue name has a + in it (like “orders+audit”), it’s treated as a fanout queue, using the format <parent>+<child>. These queues belong to a parent queue — in this example, the “orders” queue. Every item written into a parent queue will also be written into each of its children.

fanout の使用例

RabbitMQ のサイトの pub/sub のチュートリアルを Kestrel に移植してみる。

プロデューサーはキュー名 “logs” に対してメッセージを送信する。プロデューサーは fanout を一切意識しない。

import sys
import kestrel

client = kestrel.Client(servers=['%s:%s'%(host, port)])
message = ' '.join(sys.argv[1:]) or "info: Hello World!"

client.add('logs', message)
# client.add('logs+alice', message) # direct exchange

コンシューマーはキュー名”logs”のメッセージを受信。プログラム起動時の引数で、fanout 用の child name を指定するようにしている。

import sys
import kestrel
client = kestrel.Client(servers=['%s:%s'%(host, port)])

queue_name = 'logs'
if len(sys.argv) > 1:
    queue_name += '+%s' % sys.argv[1] # fanout queue

while True:
        body =, timeout=10)
        if body is not None:
            print " [%s] %r" % (queue_name, body) # キュー名全体[parent+child]を確認用に出力
    except KeyboardInterrupt, err:

まずは consumer を複数起動。

# terminal #1
$ python
 [logs] 'info: Hello World!'
 [logs] 'test'

# terminal #2
$ python alice
 [logs+alice] 'info: Hello World!'
 [logs+alice] 'test'

# terminal #3
$ python bob
 [logs+bob] 'info: Hello World!'
 [logs+bob] 'test'

あとは producer がメッセージを送信すれば consumer に届く。

$ python
$ python test


  • 子供のキュー名を直接指定して、キュー操作を直接行うことも可能。
  • 設定ファイルは親のものを利用。(queue_name=”foo+bar”のfanoutキューの場合、queue_name=”foo”のネームスペースで定義された設定が利用される)
  • 親とは独立したジャーナルファイルを持つ(デフォルトは /var/spool/kestrel/ 以下)

fanout に関連する設定項目
kestrel の設定ファイル(production.scala/development.scala)で fanout 用キューの QueueBuilder に “fanoutOnly = true” の設定を追加すると、子供にしかメッセージは配信されなくなる。上の例では キュー名に “+” が含まれる “logs+alice”, “logs+bob” には配信されるが、 “logs” には配信されない。この機能を利用したうまいユースケースは思い浮かばない、、、

  queues = new QueueBuilder {
    name = "logs"
    fanoutOnly = true

RabbitMQ in Action での fanout の使い
“RabbitMQ in Action” では exchange はCh.2 Understanding messaging で、fanout の実例は、ユーザの画像投稿を例に §4.2.2 Parallel processing で解説されている。

Tagged with: , , ,
Posted in middleware

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

  • RT @__apf__: How to write a research paper: a guide for software engineers & practitioners.… /cc @inwyrd 6 months ago
  • RT @HayatoChiba: 昔、自然と対話しながら数学に打ち込んだら何かを悟れるのではと思いたち、専門書1つだけ持ってパワースポットで名高い奈良の山奥に1週間籠ったことがある。しかし泊まった民宿にドカベンが全巻揃っていたため、水島新司と対話しただけで1週間過ぎた。 それ… 6 months ago
  • RT @googlecloud: Ever wonder what underwater fiber optic internet cables look like? Look no further than this deep dive w/ @NatAndLo: https… 6 months ago
  • @ijin UTC+01:00 な時間帯で生活しています、、、 1 year ago
  • RT @mattcutts: Google's world-class Site Reliability Engineering team wrote a new book:… It's about managing produc… 1 year ago
%d bloggers like this: