BlazeDSをメッセージブローカにして FlexでPub/Subメッセージングを行う
※このエントリは Publish/Subscribeモデルのメッセージ交換についての知識を前提とする。
Flexでは Publisherを Producer(プロデューサ)、Subscriberを Consumer(コンシューマ)、Topicを Destination(ディスティネーション)と呼ぶようである。
BlazeDSの組み込まれたWebアプリケーションは、これらを取り扱うメッセージブローカとして動作することが出来る。
しかしながら、Webアプリケーションは所詮 Webアプリケーションであり、通信プロトコルとしては HTTPしか使えない。Pub/SubメッセージングをHTTPで擬似的に実現する(よく疑似プッシュと呼ばれる)方法は基本的に Cometの考え方と根を同じにするが、Flex/BlazeDSでは利用形態や環境に応じていくつかのバリエーションが用意されている(その辺りについてはAkihiro Kamijo氏のブログが詳しい)。
ここでは最もシンプルな Long-pollingを取り扱う。なのでひとまず「俺のサーバは64bitでメモリが湯水のように使えて好きなだけスレッドも立てられるからC10K問題について考える必要がない」と思いこむこと。同時接続数が多くなると fastcgiがおかしな動作をするなと思ってソースを追いかけたら一部で select() が使われたままになって放置されてることを発見して 1024本(-α)までですか!と絶叫した思い出などは封印すること。
BlazeDSで最も簡単にLong-pollingなメッセージングを有効にするには下記のようにする。
1) polling-enabled = true な AMFチャンネル(mx.messaging.channels.AMFChannel)を定義する
2) 1を使って、メッセージング用のサービス(flex.messaging.services.MessageService)を定義する
3) 2の中に必要なだけ destinationを定義する(サーバサイドで動的にdestinationを生成することも出来る)
以前のエントリ BlazeDSを自分のWebアプリケーションに組み込む で最小限 RPCだけが可能なように作成した WEB-INF/flex/services-config.xmlをベースに、これらを追加分として例示してみることにする。

実用的な用途のうえでは、Flexアプリケーション同士でダイレクトに Pub/Subメッセージ交換を行うケースは実際にはそれほどないと思われる。多くの場合 Producerとしての処理はサーバ側で行われ、Flexは Consumer専門になるのではないだろうか。
設定によりJMSとの相互運用レイヤを有効に出来るらしい?ので、既存のESB (Enterprise Service Bus)に Flexアプリケーションを取って付けるといった用途も考えられる。日本の企業で ESBなんて格好いい物が整備されている所がどれほどあるか知らないが。
JMSにない概念として Subtopicというものがあるようだ。
Flexでは Publisherを Producer(プロデューサ)、Subscriberを Consumer(コンシューマ)、Topicを Destination(ディスティネーション)と呼ぶようである。
BlazeDSの組み込まれたWebアプリケーションは、これらを取り扱うメッセージブローカとして動作することが出来る。
しかしながら、Webアプリケーションは所詮 Webアプリケーションであり、通信プロトコルとしては HTTPしか使えない。Pub/SubメッセージングをHTTPで擬似的に実現する(よく疑似プッシュと呼ばれる)方法は基本的に Cometの考え方と根を同じにするが、Flex/BlazeDSでは利用形態や環境に応じていくつかのバリエーションが用意されている(その辺りについてはAkihiro Kamijo氏のブログが詳しい)。
ここでは最もシンプルな Long-pollingを取り扱う。なのでひとまず「俺のサーバは64bitでメモリが湯水のように使えて好きなだけスレッドも立てられるからC10K問題について考える必要がない」と思いこむこと。同時接続数が多くなると fastcgiがおかしな動作をするなと思ってソースを追いかけたら一部で select() が使われたままになって放置されてることを発見して 1024本(-α)までですか!と絶叫した思い出などは封印すること。
BlazeDSで最も簡単にLong-pollingなメッセージングを有効にするには下記のようにする。
1) polling-enabled = true な AMFチャンネル(mx.messaging.channels.AMFChannel)を定義する
2) 1を使って、メッセージング用のサービス(flex.messaging.services.MessageService)を定義する
3) 2の中に必要なだけ destinationを定義する(サーバサイドで動的にdestinationを生成することも出来る)
以前のエントリ BlazeDSを自分のWebアプリケーションに組み込む で最小限 RPCだけが可能なように作成した WEB-INF/flex/services-config.xmlをベースに、これらを追加分として例示してみることにする。
channels要素内
<!--
チャンネルの定義。
RPC用のチャンネルとは別のidとエンドポイントURLを与えている。
また、polling-enabledプロパティに trueをセットしている。
-->
<channel-definition id="my-polling-amf"
class="mx.messaging.channels.AMFChannel">
<endpoint
url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling"
class="flex.messaging.endpoints.AMFEndpoint"/>
<properties>
<polling-enabled>true</polling-enabled>
<polling-interval-millis>0</polling-interval-millis>
<wait-interval-millis>-1</wait-interval-millis>
<max-waiting-poll-requests>300</max-waiting-poll-requests>
</properties>
</channel-definition>
services要素内
<!-- メッセージング用サービスと destinationの定義。-->上記の設定により、http://host:port/contextroot/messagebroker/amfpolling を通信先として Flexから Publisher/Subscriberメッセージングを行うことができるようになる。
<service id="messaging-service" class="flex.messaging.services.MessageService">
<adapters>
<adapter-definition id="actionscript"
class="flex.messaging.services.messaging.adapters.ActionScriptAdapter"
default="true"/>
</adapters>
<default-channels>
<!-- channel-definitionで定義したidをここから参照 -->
<channel ref="my-polling-amf"/>
</default-channels>
<!--
destination(JMSで言うところのTopic)は必要なだけ作成しておく。
また、動的に作成することもできる。動的生成については
BlazeDSサンプルコード内 ChatRoomService.javaを参照
-->
<destination id="hoge"/>
<destination id="honya"/>
<destination id="foobar"/>
</service>
Flexアプリケーション Publisher(Producer) 側
<?xml version="1.0" encoding="utf-8"?>
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml">
<mx:Script>
<![CDATA[
import mx.messaging.messages.AsyncMessage;
]]>
</mx:Script>
<mx:ChannelSet id="channelSet">
<mx:AMFChannel
uri="http://localhost:8080/demo/messagebroker/amfpolling"/>
</mx:ChannelSet>
<mx:Producer id="producer" channelSet="{channelSet}"
destination="hoge"/>
<mx:HBox width="100%">
<mx:Label text="送信するメッセージ"/>
<mx:TextInput id="message"/>
<mx:Button label="送信">
<mx:click>
<![CDATA[
var msg:AsyncMessage = new AsyncMessage;
msg.body = message.text;
producer.send(msg);
]]>
</mx:click>
</mx:Button>
</mx:HBox>
</mx:Application>
Flexアプリケーション Subscriber(Consumer) 側
<?xml version="1.0" encoding="utf-8"?>
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml">
<mx:Script>
<![CDATA[
import mx.controls.Alert;
]]>
</mx:Script>
<mx:ChannelSet id="channelSet">
<!--
requestTimeout=-1 HTTPリクエストをタイムアウトさせずメッセージを待ち続ける
pollingInterval=0 HTTPリクエストが終了したら即座に次のリクエストを発生させる
-->
<mx:AMFChannel
uri="http://localhost:8080/demo/messagebroker/amfpolling"
requestTimeout="-1" pollingInterval="0"/>
</mx:ChannelSet>
<mx:Consumer id="consumer"
channelSet="{channelSet}" destination="hoge">
<mx:message>
<![CDATA[
// メッセージが飛んできた時の処理
message.text =
(event as MessageEvent).message.body.toString();
]]>
</mx:message>
</mx:Consumer>
<mx:creationComplete>
<![CDATA[
// subscribeをしないとメッセージは飛んでこない
consumer.subscribe(null);
]]>
</mx:creationComplete>
<mx:HBox width="100%">
<mx:Label text="受信したメッセージ"/>
<mx:Text id="message" width="100%"/>
</mx:HBox>
</mx:Application>

補足事項
ラベル: Flex

0 件のコメント:
コメントを投稿
<< ホーム