前回の、このブログエントリ↓の続きです:


Mosquitto クライアントを使って、IBM IoTF(IoT Foundations) 内の QuickStart MQTT ブローカーにメッセージをパブリッシュする手順が分かりました。 今回は同じ手続きを自分がプログラミングするアプリケーションから実行するコードを紹介します。これができると、実際のデバイスやスマホ、PC などから取得した情報を自分なりに加工した上で QuickStart に送信して、Node-RED フローエディタでも使えるようになります。 なお、今回は Java でのアプリケーションコーディング例を紹介します。


ではその手順を紹介します。まず Java で MQTT プロトコルを扱うため、便利な Paho のライブラリをあらかじめダウンロードしておきます。

ちなみに Paho は MQTT のオープンソース実装を目的とした Eclipse プロジェクトの1つです。Java に限らず、非常に多くの言語向けに MQTT ライブラリが提供されています。

(2015/08/01 追記: 実際に利用可能な Paho のライブラリの入手場所が間違っていました)
では Paho のリリースディレクトリから mqtt-client-0.4.0.jar をダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/mqtt-client/0.4.0/


では Paho のリリースディレクトリから最新の org.eclipse.paho.client.mqttv3-*.*.*.jar ファイルをダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/


ダウンロードした org.eclipse.paho.client.mqttv3-*.*.*.jar を使って、以下の様なソースコードを作成します:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class SampleQuickstartPublisher implements MqttCallback {
  MqttClient myClient;
  MqttConnectOptions connOpt;

  static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883";
  static final String M2MIO_THING = "91a19d112233"; //. DeviceId

  /**
   * connectionLost
   */
  @Override
  public void connectionLost(Throwable t) {
    System.out.println("Connection lost!");
  }

  /**
   * MAIN
   */
  public static void main(String[] args) {
    SampleQuickstartPublisher smc = new SampleQuickstartPublisher();
    smc.runClient();
  }

  /**
   * runClient
   */
  public void runClient() {
    // MQTT クライアントのセットアップ
    String clientID = "d:quickstart:MyDevice:" + M2MIO_THING; // クライアントID
    connOpt = new MqttConnectOptions();

    connOpt.setCleanSession(true);
    connOpt.setKeepAliveInterval(30);

    // ブローカーに接続
    try {
      myClient = new MqttClient(BROKER_URL, clientID);
      myClient.setCallback(this);
      myClient.connect(connOpt);
    } catch (MqttException e) {
      e.printStackTrace();
      System.exit(-1);
    }

    System.out.println("Connected to " + BROKER_URL);

    // トピックの指定
    String myTopic = "iot-2/evt/myeventtype/fmt/json";
    MqttTopic topic = myClient.getTopic(myTopic);

// 10回メッセージを送信する for (int i=1; i<=10; i++) { String pubMsg = "{\"d\":{\"intval\":" + i + "}}"; // 送信メッセージのJSON int pubQoS = 0; MqttMessage message = new MqttMessage(pubMsg.getBytes()); message.setQos(pubQoS); message.setRetained(false); // メッセージをブローカーにパブリッシュ System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS); MqttDeliveryToken token = null; try { // パブリッシュ token = topic.publish(message); // ブローカーへの送信完了を待つ token.waitForCompletion(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } // ブローカーから切断 try { myClient.disconnect(); } catch (Exception e) { e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } }

QuickStart の MQTT ブローカーホストは quickstart.messaging.internetofthings.ibmcloud.com です。ここに 1883 番ポートでアクセスします。またこの例ではデバイスID を "91a19d112233" という固定値にしていますが、実際にはここはユニークな値(MAC アドレスなど)を使ってください。

このプログラムの肝になっているのは runClient() 関数部分です。この runClient() の中でまず MQTT クライアントインスタンスを生成しています。そして
 d:quickstart:MyDevice:(デバイスID)
というクライアント ID を指定して、MQTT ブローカーに接続しています。

接続後は "iot-2/evt/myeventtype/fmt/json" というトピックを指定しながら for ループで10回メッセージを送信(パブリッシュ)する、という内容の処理を実行するプログラム構成になっています。


そして、Node-RED でもアプリケーションを用意します。Node-RED アプリケーションは前回 Mosquitto で動作確認した時と同じ内容で、"91a19d112233" (Java のプログラムで指定しているデバイスIDと同じもの)というデバイス ID を指定した IBM IoT ノードを用意し、デバッグノードに接続しただけのものです。このアプリを Deploy して、動作させておきます:
2015073101


ここで上記の Java プログラムを実行してみます。ソースコード内のデバッグライト(青文字部分)が実行され、コンソールに接続時と送信時のメッセージが表示され、処理が実行されたことが分かります:
2015073102


同時に Node-RED 側のデバッグタブを見ると、いままさにこの Java プログラムからパブリッシュされたメッセージを受け取って表示する様子が分かります。1秒毎にメッセージが送信されて、計10回のメッセージが送られた所で止まります:
2015073103


よく見ると、デバイスタイプが "MyDevice"、イベントタイプが "myeventtype" という文字列になっています。これは MQTT パブリッシュをする際のクライアント ID とトピックで指定しているものなので、これらの値を変更したい場合はクライアント ID やトピックを変更すればよい、ということもわかりました。

というわけで、Java のプログラムから IBM IoT Foundations の QuickStart MQTT ブローカーにメッセージを送信し、その内容を Node-RED で受け取る、というオペレーションが実現できることが確認できました! これを応用すると、とりあえず何らかのデータを MQTT QuickStart に送り、Node-RED で処理して例えば DB に格納するとか、リアルタイム処理を加えるといった Node-RED 得意の土俵で扱うことができるようになりますね。