前回の、このブログエントリ↓の続きです:
Mosquitto クライアントを使って、IBM IoTF(IoT Foundations) 内の QuickStart MQTT ブローカーにメッセージをパブリッシュする手順が分かりました。 今回は同じ手続きを自分がプログラミングするアプリケーションから実行するコードを紹介します。これができると、実際のデバイスやスマホ、PC などから取得した情報を自分なりに加工した上で QuickStart に送信して、Node-RED フローエディタでも使えるようになります。 なお、今回は Java でのアプリケーションコーディング例を紹介します。
ではその手順を紹介します。まず Java で MQTT プロトコルを扱うため、便利な Paho のライブラリをあらかじめダウンロードしておきます。
ちなみに Paho は MQTT のオープンソース実装を目的とした Eclipse プロジェクトの1つです。Java に限らず、非常に多くの言語向けに MQTT ライブラリが提供されています。
(2015/08/01 追記: 実際に利用可能な Paho のライブラリの入手場所が間違っていました)
では Paho のリリースディレクトリから mqtt-client-0.4.0.jar をダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/mqtt-client/0.4.0/
では Paho のリリースディレクトリから最新の org.eclipse.paho.client.mqttv3-*.*.*.jar ファイルをダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/
ダウンロードした org.eclipse.paho.client.mqttv3-*.*.*.jar を使って、以下の様なソースコードを作成します:
QuickStart の MQTT ブローカーホストは quickstart.messaging.internetofthings.ibmcloud.com です。ここに 1883 番ポートでアクセスします。またこの例ではデバイスID を "91a19d112233" という固定値にしていますが、実際にはここはユニークな値(MAC アドレスなど)を使ってください。
このプログラムの肝になっているのは runClient() 関数部分です。この runClient() の中でまず MQTT クライアントインスタンスを生成しています。そして
d:quickstart:MyDevice:(デバイスID)
というクライアント ID を指定して、MQTT ブローカーに接続しています。
接続後は "iot-2/evt/myeventtype/fmt/json" というトピックを指定しながら for ループで10回メッセージを送信(パブリッシュ)する、という内容の処理を実行するプログラム構成になっています。
そして、Node-RED でもアプリケーションを用意します。Node-RED アプリケーションは前回 Mosquitto で動作確認した時と同じ内容で、"91a19d112233" (Java のプログラムで指定しているデバイスIDと同じもの)というデバイス ID を指定した IBM IoT ノードを用意し、デバッグノードに接続しただけのものです。このアプリを Deploy して、動作させておきます:

ここで上記の Java プログラムを実行してみます。ソースコード内のデバッグライト(青文字部分)が実行され、コンソールに接続時と送信時のメッセージが表示され、処理が実行されたことが分かります:

同時に Node-RED 側のデバッグタブを見ると、いままさにこの Java プログラムからパブリッシュされたメッセージを受け取って表示する様子が分かります。1秒毎にメッセージが送信されて、計10回のメッセージが送られた所で止まります:

よく見ると、デバイスタイプが "MyDevice"、イベントタイプが "myeventtype" という文字列になっています。これは MQTT パブリッシュをする際のクライアント ID とトピックで指定しているものなので、これらの値を変更したい場合はクライアント ID やトピックを変更すればよい、ということもわかりました。
というわけで、Java のプログラムから IBM IoT Foundations の QuickStart MQTT ブローカーにメッセージを送信し、その内容を Node-RED で受け取る、というオペレーションが実現できることが確認できました! これを応用すると、とりあえず何らかのデータを MQTT QuickStart に送り、Node-RED で処理して例えば DB に格納するとか、リアルタイム処理を加えるといった Node-RED 得意の土俵で扱うことができるようになりますね。
Mosquitto クライアントを使って、IBM IoTF(IoT Foundations) 内の QuickStart MQTT ブローカーにメッセージをパブリッシュする手順が分かりました。 今回は同じ手続きを自分がプログラミングするアプリケーションから実行するコードを紹介します。これができると、実際のデバイスやスマホ、PC などから取得した情報を自分なりに加工した上で QuickStart に送信して、Node-RED フローエディタでも使えるようになります。 なお、今回は Java でのアプリケーションコーディング例を紹介します。
ではその手順を紹介します。まず Java で MQTT プロトコルを扱うため、便利な Paho のライブラリをあらかじめダウンロードしておきます。
ちなみに Paho は MQTT のオープンソース実装を目的とした Eclipse プロジェクトの1つです。Java に限らず、非常に多くの言語向けに MQTT ライブラリが提供されています。
(2015/08/01 追記: 実際に利用可能な Paho のライブラリの入手場所が間違っていました)
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/mqtt-client/0.4.0/
では Paho のリリースディレクトリから最新の org.eclipse.paho.client.mqttv3-*.*.*.jar ファイルをダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/
ダウンロードした org.eclipse.paho.client.mqttv3-*.*.*.jar を使って、以下の様なソースコードを作成します:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; public class SampleQuickstartPublisher implements MqttCallback { MqttClient myClient; MqttConnectOptions connOpt; static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883"; static final String M2MIO_THING = "91a19d112233"; //. DeviceId /** * connectionLost */ @Override public void connectionLost(Throwable t) { System.out.println("Connection lost!"); } /** * MAIN */ public static void main(String[] args) { SampleQuickstartPublisher smc = new SampleQuickstartPublisher(); smc.runClient(); } /** * runClient */ public void runClient() { // MQTT クライアントのセットアップ String clientID = "d:quickstart:MyDevice:" + M2MIO_THING; // クライアントID connOpt = new MqttConnectOptions(); connOpt.setCleanSession(true); connOpt.setKeepAliveInterval(30); // ブローカーに接続 try { myClient = new MqttClient(BROKER_URL, clientID); myClient.setCallback(this); myClient.connect(connOpt); } catch (MqttException e) { e.printStackTrace(); System.exit(-1); } System.out.println("Connected to " + BROKER_URL); // トピックの指定 String myTopic = "iot-2/evt/myeventtype/fmt/json"; MqttTopic topic = myClient.getTopic(myTopic);
// 10回メッセージを送信する for (int i=1; i<=10; i++) { String pubMsg = "{\"d\":{\"intval\":" + i + "}}"; // 送信メッセージのJSON int pubQoS = 0; MqttMessage message = new MqttMessage(pubMsg.getBytes()); message.setQos(pubQoS); message.setRetained(false); // メッセージをブローカーにパブリッシュ System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS); MqttDeliveryToken token = null; try { // パブリッシュ token = topic.publish(message); // ブローカーへの送信完了を待つ token.waitForCompletion(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } // ブローカーから切断 try { myClient.disconnect(); } catch (Exception e) { e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } }
QuickStart の MQTT ブローカーホストは quickstart.messaging.internetofthings.ibmcloud.com です。ここに 1883 番ポートでアクセスします。またこの例ではデバイスID を "91a19d112233" という固定値にしていますが、実際にはここはユニークな値(MAC アドレスなど)を使ってください。
このプログラムの肝になっているのは runClient() 関数部分です。この runClient() の中でまず MQTT クライアントインスタンスを生成しています。そして
d:quickstart:MyDevice:(デバイスID)
というクライアント ID を指定して、MQTT ブローカーに接続しています。
接続後は "iot-2/evt/myeventtype/fmt/json" というトピックを指定しながら for ループで10回メッセージを送信(パブリッシュ)する、という内容の処理を実行するプログラム構成になっています。
そして、Node-RED でもアプリケーションを用意します。Node-RED アプリケーションは前回 Mosquitto で動作確認した時と同じ内容で、"91a19d112233" (Java のプログラムで指定しているデバイスIDと同じもの)というデバイス ID を指定した IBM IoT ノードを用意し、デバッグノードに接続しただけのものです。このアプリを Deploy して、動作させておきます:

ここで上記の Java プログラムを実行してみます。ソースコード内のデバッグライト(青文字部分)が実行され、コンソールに接続時と送信時のメッセージが表示され、処理が実行されたことが分かります:

同時に Node-RED 側のデバッグタブを見ると、いままさにこの Java プログラムからパブリッシュされたメッセージを受け取って表示する様子が分かります。1秒毎にメッセージが送信されて、計10回のメッセージが送られた所で止まります:

よく見ると、デバイスタイプが "MyDevice"、イベントタイプが "myeventtype" という文字列になっています。これは MQTT パブリッシュをする際のクライアント ID とトピックで指定しているものなので、これらの値を変更したい場合はクライアント ID やトピックを変更すればよい、ということもわかりました。
というわけで、Java のプログラムから IBM IoT Foundations の QuickStart MQTT ブローカーにメッセージを送信し、その内容を Node-RED で受け取る、というオペレーションが実現できることが確認できました! これを応用すると、とりあえず何らかのデータを MQTT QuickStart に送り、Node-RED で処理して例えば DB に格納するとか、リアルタイム処理を加えるといった Node-RED 得意の土俵で扱うことができるようになりますね。