まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

タグ:mqtt

IBM IoT Foundation が無料提供している Quickstart MQTT ブローカーを使ったアプリのサンプルを紹介します。今回紹介するのは IBM Bluemix各サービスの中で止まっているサービスがあったらそれを知らせる、という機能を Node-RED で作成するような内容にします。

このアプリを作成する上で必要なサービスの稼働状況確認は estado (エスタド)を使って調べます:
http://estado.ng.bluemix.net/  
(※この URL は北米データセンターのサービス状況確認用です。英国データセンター用は URL 内文字列の ng の代わりに eu-gb を指定してください)

2015092001


estado は Bluemix ランタイムやサービスの単位での稼働状況を教えてくれるサービスサイトです。問題なく稼働しているランタイム/サービスについては "up" と表示されますが、稼働が止まってたり、なんらかの障害を抱えていると判断されたサービスに関しては赤背景で "down" と表示されます。
2015092002


Bluemix の各機能の稼動状態は上記 URL にアクセスすることで確認することはできます。ただそれでは「上記サイトを見に行った人が稼働状況を確認できる」というだけです。情報そのものは便利なのですが、稼働ステータスをもっとインタラクティブにプッシュしたり、ユーザーフレンドリーな形で提供することを考えてみましょう。

具体的にはこのようなシステムを作ります。この図の緑の丸で描かれた「Java アプリ」部分を実装します:
2015092004


この Java(スタンドアロン)アプリケーションは定期的(この例では1分に1度)に北米データセンターの estado に HTTP GET リクエストを出して Bluemix サービスの稼動状態を確認します。確認した結果、停止中のサービスの一覧を IBM IoT Foundation の Quickstart MQTT ブローカー(quickstart.messaging.internetofthings.ibmcloud.com:1883)へランダムに生成した deviceId を指定して JSON でパブリッシュする、というものです。この時の deviceId はアプリケーション起動時に動的に作成し、コンソールへ出力します。英国データセンター用に作り変えたり、実行感覚を変更したい場合はソースコードの該当箇所を適宜変更してください。
 

estado を参照した結果は Quickstart MQTT ブローカーに格納されるので、パブリッシュ時と同じ deviceId を指定すれば MQTT サブスクライバーを使って取り出せます。特に IBM Bluemix 上の IoT Starter や NodeRED Starter ビルドパックからプロジェクトを作成した場合であれば、IBM IoT Input ノードを使えばこの deviceId だけで簡単にメッセージを取り出すことができるので、その結果をデバッグタブに出力したり、メールで通知したり、といった管理機能が簡単に実現できます。この辺りは Bluemix で NodeRED を使ったことがある方であれば、あまり難しくないと思っています。この辺りの詳しい説明はこちらのエントリを参照ください:
Bluemix の Node-RED サービスで IoT アプリを作る(1/2)


では話を Java アプリケーションの実装に戻します。今回は MQTT のオープンソース Java ライブラリである Paho と、Jakarta CommonsHTTP Client V3 を使ったサンプルを作成してみました。ソースコードのコンパイルや実行時には Paho の Java ライブラリ V3 の最新版(org.eclipse.paho.client.mqttv3-X.X.X.jar)が必要です。こちらからダウンロードしてください:
https://www.eclipse.org/paho/clients/java/


また HTTP Client の JAR ファイルも必要です。別途ダウンロードしておいてください。

なお、今回作成した Java アプリのソースコードは github で公開しました。必要であればこちらからダウンロードしておいてください:


公開したソースコードの内容は以下の様なものです:
import java.util.Calendar;
import java.util.TimeZone;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class EstadoQuickstart implements MqttCallback {
  MqttClient myClient;
  MqttConnectOptions connOpt;

  static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883"; //. IBM IoT Quickstart MQTT ブローカー	
  static final String dc = "ng"; //. 監視対象データセンター

  static String deviceId = "net.bluemix." + dc + ".estado.mqtt.publish.";
  static int interval = 60;

  @Override
  public void connectionLost(Throwable t) {
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws Exception {
    // TODO Auto-generated method stub
    System.out.println("-------------------------------------------------");
    System.out.println("| Topic:" + topic);
    System.out.println("| Message: " + new String(message.getPayload()));
    System.out.println("-------------------------------------------------");
  }

  public static void main(String[] args) {
    try{
      if( args.length >; 0 ){
        interval = Integer.parseInt( args[0] );
      }

//. ランダムな deviceId の生成 String hex = "0123456789abcdef"; for( int i = 0; i < 8; i ++ ){ char c = hex.charAt( ( int )( Math.floor( Math.random() * 16 ) ) ); deviceId += c; } }catch( Exception e ){ } EstadoQuickstart eq = new EstadoQuickstart(); eq.runClient(); } public void runClient() { // TODO Auto-generated method stub String clientID = "d:quickstart:MyDevice:" + deviceId; System.out.println( "deviceId=" + deviceId ); connOpt = new MqttConnectOptions(); connOpt.setCleanSession( true ); connOpt.setKeepAliveInterval( 30 ); // Connect to Broker try{ myClient = new MqttClient( BROKER_URL, clientID ); myClient.setCallback( this ); myClient.connect( connOpt ); }catch( MqttException e ){ e.printStackTrace(); System.exit( -1 ); } String myTopic = "iot-2/evt/estado_" + dc + "/fmt/json"; //. MQTT トピック文字列を生成 MqttTopic topic = myClient.getTopic( myTopic ); while( true ){ try{
//. 日付文字列の生成 Calendar c = Calendar.getInstance(); TimeZone tz = TimeZone.getTimeZone( "Asia/Tokyo" ); c.setTimeZone( tz );; int y = c.get( Calendar.YEAR ); int m = c.get( Calendar.MONTH ) + 1; int d = c.get( Calendar.DAY_OF_MONTH ); int h = c.get( Calendar.HOUR_OF_DAY ); int n = c.get( Calendar.MINUTE ); int s = c.get( Calendar.SECOND ); String dt = y + "/" + ( m < 10 ? "0" : "" ) + m + "/" + ( d < 10 ? "0" : "" ) + d + "T" + ( h < 10 ? "0" : "" ) + h + ":" + ( n < 10 ? "0" : "" ) + n + ":" + ( s < 10 ? "0" : "" ) + s + "+09:00";
//. estado の HTML を取得 String out = ""; HttpClient client = new HttpClient(); GetMethod get = new GetMethod( "http://estado." + dc + ".bluemix.net/" ); int sc = client.executeMethod( get ); String html = get.getResponseBodyAsString();

//. HTML 内部を見て、サービスのステータスを確認 int x = html.indexOf( "<table" ); while( x >; -1 ){ int td1 = html.indexOf( "<td>;", x + 1 ); int td2 = html.indexOf( "</td>;", td1 + 4 ); int td3 = html.indexOf( "<td>;", td2 + 1 ); int td4 = html.indexOf( "</td>;", td3 + 4 ); if( td1 >; -1 && td2 >; -1 && td3 >; -1 && td4 >; -1 ){ String name = html.substring( td1 + 4, td2 ); String svalue = html.substring( td3 + 4, td4 ); if( svalue.equals( "down" ) ){ //. 止まっているサービスを探す String line = "\"" + name + "\""; //. 止まっているサービスの配列を返す if( out.length() >; 0 ){ line = "," + line; } out += line; } x = td4; } x = html.indexOf( "<tr ", x + 1 ); } out = "{\"datetime\":\"" + dt + "\",\"error_services\":[" + out + "]}"; //. MQTT Publish int pubQoS = 0; MqttMessage message = new MqttMessage( out.getBytes() ); message.setQos( pubQoS ); message.setRetained( false ); // Publish the message MqttDeliveryToken token = null; try{ // publish message to broker token = topic.publish( message ); // Wait until the message has been delivered to the broker token.waitForCompletion(); Thread.sleep( 1000 ); //. パブリッシュ後、とりあえず1秒待つ }catch( Exception e ){ e.printStackTrace(); } //. 次の実行タイミングを待つ Calendar c0 = Calendar.getInstance(); int s0 = ( c0.get( Calendar.SECOND ) % interval ); int w = 1000 * ( interval - s0 ); Thread.sleep( w ); }catch( Exception e ){ } } } }



Paho の Java ライブラリとソースコード(EstadoQuickstart.java)をダウンロードしてコンパイル&実行します。実行が成功すると、コンソール画面には次のように Quickstart MQTT ブローカーにパブリッシュした際の deviceId が表示されます。そして Ctrl+C などで終了するまでの間は(デフォルト状態であれば)60秒おきに estado をチェックしてその結果をパブリッシュする、という処理を繰り返します:
2015092101
 

IBM Bluemix のユーザーであれば、この結果を簡単に取り出すことができます。Bluemix 上に IoT Foundation Starter または NodeRED Starter のボイラープレートアプリケーションを作成して NodeRED 環境にアクセスします。そして IBM IoT Input ノードを1つ作成して、ノードの属性値に "Quickstart" と DeviceId に上述の Java アプリ実行時にコンソールに表示された deviceId 値と同じものを指定します:
2015092102


そしてとりあえず debug ノードを追加して IBM IoT ノードと線で結んでデプロイするだけで、このノードに60秒おきに停止している Bluemix サービスの情報が送られてくるようになります。
2015092003


このサンプルでは以下のような JSON フォーマットでメッセージを取得します。後はこのメッセージを Node-RED 側でハンドリングしてメールで通知したり、データベースに格納する、といった処理を実装することになります:
{
 "datetime": "2015/09/19T22:53:00+09:00",
 "error_services": [
  "account-usage", "meteringbackgroundprocess", "meteringdatabase", "runtime-usage-calculator", "dotnet", "alchemy_api [Free]", ・・・(止まっているサービス名の配列)・・・
 ] 
}



というわけで、後はこの Java アプリをどこかのクラウド上か、または(24時間電源ONが保証できる)ローカル環境で稼働させておけば、MQTT にステータスを送り続けてくれる環境が出来上がるので、このような Node-RED アプリを作って動かすことも可能になる、ということになります。

MQTT はデバイスを意識して作成された軽量なプロトコルですが、必ずしもデバイスでしか使えない、というものでもありません。今回のようなサーバーの死活情報を扱ってはいけない、というものではないし、現に某有名ソーシャルサイトのメッセージング機能で使われていたりもします。何をパブリッシュして、どこからサブスクライブするか、そしてサブスクライブした情報をどう扱うか(Node-RED はここが簡単にできる、というサービスです)、が重要だと思っています。みなさんも色んなアイデアを是非形にしてみてください。同様のアルゴリズムが実装できれば Java 以外の言語でも実現できると思っていますので、興味のある方は是非移植にも挑戦してみてください。



 

IoT アプリ開発をする上で避けて通れないのが MQTT(Message Queuing Telemetry Transport) プロトコルです。デバイスやセンサーなどのマシン間接続(いわゆる "M2M")用に IBM を中心に提唱された軽量プロトコルで、Node-RED や IBM IoT Foundation Quickstart などでも使われています。

そして、この MQTT をアプリケーション実装する上で便利なライブラリPaho です。これも元々は IBM が開発したものをオープンソースとして Eclipse Foundation に寄贈し、現在は Eclipse プロジェクトの1つとして公開されています:

http://www.eclipse.org/paho/
paho_logo_400


この Paho ライブラリを使うことで MQTT の詳しいプロトコル仕様を理解することなく、MQTT アプリケーションを開発することが可能になります。プログラミング言語としても C/Java/Java Script/Python など、多くの言語に対応しており、PC やスマホ、組込機器など、多くのデバイスで利用することができるようになっています。

そして、この Paho の Java ライブラリを使って、実際に IBM IoT Foundation Quickstart の MQTT ブローカー( quickstart.messaging.internetofthings.ibmcloud.com:1883 )に接続して、MQTT パブリッシュ処理を行うサンプルを作ってみた時の様子をこちらで紹介しています:
QuickStart MQTT ブローカーに Java からパブリッシュする

QuickStart は IBM が公開している MQTT ブローカー(サーバー)で、認証なしであれば誰でも使うことができます。また上記リンク先でも紹介していますが、ここにパブリッシュされたメッセージは IBM BluemixNodeRED アプリケーションを使って簡単に取得したり、データベースに格納したり、データフローを定義することができるようになります。


IoT アプリ開発者にとって、この Paho は慣れておくことが必須のライブラリだと思いました。


Node-RED フローエディタの出現などもあって MQTT プロトコルに新たな可能性を感じています。

MQTT といえば「IoT のプロトコル」というイメージを持っている人もいると思います。必ずしも間違いではないと思っていますが、こんな軽量で便利なプロトコルを IoT のためだけに使うのはもったいないかな、、、とも感じるようになりました。それを実感するためにちょっとしたアプリを作ってみました、という内容です。


今回紹介するための例として選んだのが投資相場情報です。個人的にもたしなむ程度には投資をやってますが、デイトレードするようなタイプの投資家からすると、リアルタイムにより近い情報が提供されるのであれば有用なはずです。ただ IoT と投資を結びつけて考えることはあまりないですよね。でも IBM IoT Foundation や Node-RED と組み合わせて投資を考えるとどうなるでしょうか?

まず、ためしに外国為替の価格情報を表示するような、こんなウェブサービスを作ってみました:
http://fx.mybluemix.net/

↑上記 URL にアクセスすると、そのタイミングでの20種類の為替情報を JSON フォーマットで返してくれます。週末とかでない限り、為替相場は24時間変化し続けます。これ自体はごく一般的な REST のウェブサービスです。(ドメインを見ればわかりますが)これは Bluemix 上で動いていますが、このサービスがどこで動いているかは関係ありません:
2015090701


この仕組をこのまま(必要な時にこの URL にアクセスして取得する形で)使うこともできますが、MQTT を使うとプッシュのような仕組みを実現することもできるようになります。別途 MQTT パブリッシャーのアプリケーションを用意して、例えば1分おきに最新の為替状況を取り出して、MQTT ブローカーへパブリッシュする、という方法です。特にパブリッシュ先の MQTT ブローカーを IBM IoT Foundation の QuickStart(quickstart.messaging.internetofthings.ibmcloud.com:1883) に指定すると、Bluemix 上の Node-RED の QuickStart からも参照できるようになるので、Node-RED を使って簡単に為替情報を取り出すことができるようになります。また取り出したデータをデータベースに格納したり、取り出した数値を元にデータフローを記述して実行することは Node-RED の機能を使って簡単に実現することができるようになります:
2015090702


実際に QuickStart の MQTT ブローカーに対してメッセージをパブリッシュするアプリケーションを Java で開発する場合の詳細はこちらを参照ください:
QuickStart MQTT ブローカーに Java からパブリッシュする


今回のアプリでは ClientID(Node-RED の QuickStart で言うところの deviceId)に "net.mybluemix.fx.mqtt.publish" という文字列を指定して quickstart.messaging.internetofthings.ibmcloud.com:1883 にパブリッシュしています。なので、Node-RED 側でも同じ deviceId を指定すれば取り出せるようになる、というものです:
2015090703


実際にこのアプリを作って、ローカルホストで動かしてみました(MQTT パブリッシャーはローカルホストだろうが、プライベートネットワーク内だろうが、MQTT ブローカーに接続できる環境下であれば動きます)。同時に Bluemix 上の Node-RED では MQTT パブリッシャーと同じ deviceId を指定して QuickStart からデータを取り出してみます:
2015090704


とりあえずは普通に Debug アウトプットノードだけを足してデプロイすると・・・
2015090705


為替情報が(この例では)30秒おきに Debug タブに表示されるようになりました!期待通りに動いてます。
2015090706


この結果を WebSocket にも送るように改良してみます:
2015090707


WebSocket に送られてくるデータを視覚化するような HTML を用意するとこんな感じ。MQTT から送られてくる為替データを元にリアルタイム為替チャートが簡単にできちゃいました:
2015090708


後は為替を売買する(業者の)API があれば、これと組み合わせて実際に売買するシステムが作れちゃいそうです。もちろん為替である必要はなく、株式でも同様のデータが取得できて、同様の売買 API があれば同じような仕組みを作ることもできそうですね。

なお、WebSocket 経由で取得したデータの視覚化方法についてはこのページの情報を参考にしています。具体的には Google Visualization API を使っています:
Node-REDを使ってセンサーデータをWebSocketで出力する


MQTT はデバイスデータやセンサーデータを扱うだけでなく、色んな応用ができそうです。アイデア勝負の世界になりそうな感じですね。


(2015/Sep/11 追記)
この中で紹介している Quickstart MQTT ブローカーに為替情報をパブリッシュする Java アプリケーションのソースコードを GitHub で公開します:
https://github.com/dotnsf/FxQuickstart



最近は多くのクラウド業者が IoTMQTT との連携をアピールしています。IBM を含めて各社が「いかに簡単に IoT データを扱う/応用することができるか」を競っている感じです。

この点で IBM の強みの1つが Node-RED フローエディタだと思っています。Bluemix に統合されたこの GUI エディタは MQTT サーバー(ブローカー)との接続を簡単に行い、データを集め、保存する仕組みを簡単に提供することができます。MQTT ブローカー/クライアントの Mosquitto といい、この Node-RED といい、IoT にもオープンソース製品が多くの場面で使われるようになってきて、更なる広まりを見せているように感じます:
2015082602


IBM の場合、この Node-RED をうまく Bluemix に合わせてカスタマイズして提供しており、Bluemix 内ではすぐに使えるようになっており、また Bluemix の各サービスと簡単に組み合わせて使えるような形で提供されています。


ただ Node-RED そのものはオープンソースで提供されているものです。他のクラウドインスタンスやオンプレミスサーバー環境でも(Bluemix 用の拡張がされていない素のエディションを)動かすことが可能です。つまり IBM 以外の業者のクラウド環境内で Node-RED を使った IoT アプリ開発環境を構築することだってできるわけです。


というわけで、今回のブログエントリでは SoftLayer や AWS、IDCF、オンプレミスなどの(クラウド)サーバー環境内で Node-RED を使って IoT アプリ開発環境を構築する手順を紹介します。


まず最初に Bluemix と IBM IoT Foundation を使った場合の IoT アプリ開発環境のシステムトポロジーを確認しておきます。必要になるサーバーとしては各種センサーデバイスからの情報を集約する MQTT ブローカーと、Node-RED が稼働する Node.js アプリケーションサーバーです。Bluemix 環境の場合、前者は IBM IoT Foundation が提供する quickstart サーバーを無料で使うことができます(つまり構築不要です)。後者は Bluemix のボイラープレートを使うことで Node-RED が動く状態まで含めて簡単に構築できてしまいます。なお収集したデータを保存しようとすると別途データベースサーバーを用意する必要がありますが、Bluemix であればデータベースも簡単に追加してバインドすることが可能です(今回はデータベースを使わない環境を前提とします):
2015082601


これと同じ環境を Bluemix を使わずに構築することを考えると、(物理的には1台のマシンでも構いませんが)上記の MQTT ブローカーと Node.js の2サーバーを手動で用意することになります:
2015082602


具体的には MQTT と Node-RED それぞれの環境を用意する必要があります。1台または2台のサーバーを用意し、それぞれのサーバーインスタンスに MQTT ブローカーおよび Node-RED 環境を構築します。それぞれの手順は(CentOS サーバーの場合であれば)以下の記事を参照ください:
ラズベリーパイにオープンソース MQTT の Mosquitto をインストールする (←ラズベリーパイだけでなく CentOS の場合のインストール方法も記載しています)
CentOS に Node-RED をインストールする


まずは MQTT ブローカーを起動しておく必要があります。MQTT ブローカー(Mosquitto サーバー)を導入したマシン上で以下のコマンドを実行するなどして Mosquitto サービスを動かしておきます:
# service mosquitto start

次に Node-RED を導入したサーバーで Node-RED を起動します:
2015082603


起動した Node-RED にブラウザでアクセスします。Node-RED はデフォルトでは 1880 番ポートで起動するので、 http://(Node-RED サーバー):1880/ でアクセスするとフローエディタの画面が開きます:
2015082604


そして下図のような、MQTT インプットと Debug アウトプットを繋げただけのシンプルなフローを記述します:
2015082608


MQTT インプットの属性は以下のようにします。Broker には Mosquitto を導入したサーバーの 1883 番ポートを指定します。Topic はなんでもいいのですが、ここでは "top/001" と指定しています(後で実行するコマンドで指定することになる文字列です)。準備ができたらデプロイして実行状態にします:
2015082605


これで環境は出来上がりました。では正しく動いているかどうかを確認してみましょう。
別途 Mosquitto クライアントを導入したマシン(これも同一マシンでもかまいません)から以下のコマンドを実行して、MQTT ブローカーにメッセージをパブリッシュします:
$ mosquitto_pub -h (MQTT ブローカーサーバー) -t "top/001(上記 Node-RED で指定した Topic 属性と同じ文字列)" -m "Hello."

2015082606


すると、Node-RED 画面の debug タブには -m オプションで指定された文字列が表示されるはずです。つまり MQTT ブローカーの top/001 トピックに送信されたメッセージを、Node-RED から正しく取得することができたことになります:
20150824_nodered



ちゃんと動きました。Node-RED は Bluemix や IBM IoT Foundation がなくても、普通の(?) MQTT 環境の中でも動くことが確認できました。


・・・とはいえ、Bluemix + IBM IoT Foundation 環境で Node-RED を使ったことのある者として言わせていただくと、ここまでの環境を整えないと使えないわけです。また現実的には取得したデータをデータベースに格納しようとすると、Bluemix 環境のように簡単にはいきません。動くか動かないかでいうと動きますが、その準備のためのハードルはまだまだ高いように感じています。

IoT アプリ開発環境を検討する上で IBM Bluemix + IBM IoT Foundation + Node-RED がいかに簡単で便利なのか、を改めて再確認するような実験になったとも感じました。




 

前回の、このブログエントリ↓の続きです:


Mosquitto クライアントを使って、IBM IoTF(IoT Foundations) 内の QuickStart MQTT ブローカーにメッセージをパブリッシュする手順が分かりました。 今回は同じ手続きを自分がプログラミングするアプリケーションから実行するコードを紹介します。これができると、実際のデバイスやスマホ、PC などから取得した情報を自分なりに加工した上で QuickStart に送信して、Node-RED フローエディタでも使えるようになります。 なお、今回は Java でのアプリケーションコーディング例を紹介します。


ではその手順を紹介します。まず Java で MQTT プロトコルを扱うため、便利な Paho のライブラリをあらかじめダウンロードしておきます。

ちなみに Paho は MQTT のオープンソース実装を目的とした Eclipse プロジェクトの1つです。Java に限らず、非常に多くの言語向けに MQTT ライブラリが提供されています。

(2015/08/01 追記: 実際に利用可能な Paho のライブラリの入手場所が間違っていました)
では Paho のリリースディレクトリから mqtt-client-0.4.0.jar をダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/mqtt-client/0.4.0/


では Paho のリリースディレクトリから最新の org.eclipse.paho.client.mqttv3-*.*.*.jar ファイルをダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/


ダウンロードした org.eclipse.paho.client.mqttv3-*.*.*.jar を使って、以下の様なソースコードを作成します:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class SampleQuickstartPublisher implements MqttCallback {
  MqttClient myClient;
  MqttConnectOptions connOpt;

  static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883";
  static final String M2MIO_THING = "91a19d112233"; //. DeviceId

  /**
   * connectionLost
   */
  @Override
  public void connectionLost(Throwable t) {
    System.out.println("Connection lost!");
  }

  /**
   * MAIN
   */
  public static void main(String[] args) {
    SampleQuickstartPublisher smc = new SampleQuickstartPublisher();
    smc.runClient();
  }

  /**
   * runClient
   */
  public void runClient() {
    // MQTT クライアントのセットアップ
    String clientID = "d:quickstart:MyDevice:" + M2MIO_THING; // クライアントID
    connOpt = new MqttConnectOptions();

    connOpt.setCleanSession(true);
    connOpt.setKeepAliveInterval(30);

    // ブローカーに接続
    try {
      myClient = new MqttClient(BROKER_URL, clientID);
      myClient.setCallback(this);
      myClient.connect(connOpt);
    } catch (MqttException e) {
      e.printStackTrace();
      System.exit(-1);
    }

    System.out.println("Connected to " + BROKER_URL);

    // トピックの指定
    String myTopic = "iot-2/evt/myeventtype/fmt/json";
    MqttTopic topic = myClient.getTopic(myTopic);

// 10回メッセージを送信する for (int i=1; i<=10; i++) { String pubMsg = "{\"d\":{\"intval\":" + i + "}}"; // 送信メッセージのJSON int pubQoS = 0; MqttMessage message = new MqttMessage(pubMsg.getBytes()); message.setQos(pubQoS); message.setRetained(false); // メッセージをブローカーにパブリッシュ System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS); MqttDeliveryToken token = null; try { // パブリッシュ token = topic.publish(message); // ブローカーへの送信完了を待つ token.waitForCompletion(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } // ブローカーから切断 try { myClient.disconnect(); } catch (Exception e) { e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } }

QuickStart の MQTT ブローカーホストは quickstart.messaging.internetofthings.ibmcloud.com です。ここに 1883 番ポートでアクセスします。またこの例ではデバイスID を "91a19d112233" という固定値にしていますが、実際にはここはユニークな値(MAC アドレスなど)を使ってください。

このプログラムの肝になっているのは runClient() 関数部分です。この runClient() の中でまず MQTT クライアントインスタンスを生成しています。そして
 d:quickstart:MyDevice:(デバイスID)
というクライアント ID を指定して、MQTT ブローカーに接続しています。

接続後は "iot-2/evt/myeventtype/fmt/json" というトピックを指定しながら for ループで10回メッセージを送信(パブリッシュ)する、という内容の処理を実行するプログラム構成になっています。


そして、Node-RED でもアプリケーションを用意します。Node-RED アプリケーションは前回 Mosquitto で動作確認した時と同じ内容で、"91a19d112233" (Java のプログラムで指定しているデバイスIDと同じもの)というデバイス ID を指定した IBM IoT ノードを用意し、デバッグノードに接続しただけのものです。このアプリを Deploy して、動作させておきます:
2015073101


ここで上記の Java プログラムを実行してみます。ソースコード内のデバッグライト(青文字部分)が実行され、コンソールに接続時と送信時のメッセージが表示され、処理が実行されたことが分かります:
2015073102


同時に Node-RED 側のデバッグタブを見ると、いままさにこの Java プログラムからパブリッシュされたメッセージを受け取って表示する様子が分かります。1秒毎にメッセージが送信されて、計10回のメッセージが送られた所で止まります:
2015073103


よく見ると、デバイスタイプが "MyDevice"、イベントタイプが "myeventtype" という文字列になっています。これは MQTT パブリッシュをする際のクライアント ID とトピックで指定しているものなので、これらの値を変更したい場合はクライアント ID やトピックを変更すればよい、ということもわかりました。

というわけで、Java のプログラムから IBM IoT Foundations の QuickStart MQTT ブローカーにメッセージを送信し、その内容を Node-RED で受け取る、というオペレーションが実現できることが確認できました! これを応用すると、とりあえず何らかのデータを MQTT QuickStart に送り、Node-RED で処理して例えば DB に格納するとか、リアルタイム処理を加えるといった Node-RED 得意の土俵で扱うことができるようになりますね。



このページのトップヘ