まだプログラマーですが何か?

プログラマーネタ中心。たまに作成したウェブサービス関連の話も https://twitter.com/dotnsf

タグ:iotf

前回の、このブログエントリ↓の続きです:


Mosquitto クライアントを使って、IBM IoTF(IoT Foundations) 内の QuickStart MQTT ブローカーにメッセージをパブリッシュする手順が分かりました。 今回は同じ手続きを自分がプログラミングするアプリケーションから実行するコードを紹介します。これができると、実際のデバイスやスマホ、PC などから取得した情報を自分なりに加工した上で QuickStart に送信して、Node-RED フローエディタでも使えるようになります。 なお、今回は Java でのアプリケーションコーディング例を紹介します。


ではその手順を紹介します。まず Java で MQTT プロトコルを扱うため、便利な Paho のライブラリをあらかじめダウンロードしておきます。

ちなみに Paho は MQTT のオープンソース実装を目的とした Eclipse プロジェクトの1つです。Java に限らず、非常に多くの言語向けに MQTT ライブラリが提供されています。

(2015/08/01 追記: 実際に利用可能な Paho のライブラリの入手場所が間違っていました)
では Paho のリリースディレクトリから mqtt-client-0.4.0.jar をダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/mqtt-client/0.4.0/


では Paho のリリースディレクトリから最新の org.eclipse.paho.client.mqttv3-*.*.*.jar ファイルをダウンロードします:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/


ダウンロードした org.eclipse.paho.client.mqttv3-*.*.*.jar を使って、以下の様なソースコードを作成します:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class SampleQuickstartPublisher implements MqttCallback {
  MqttClient myClient;
  MqttConnectOptions connOpt;

  static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883";
  static final String M2MIO_THING = "91a19d112233"; //. DeviceId

  /**
   * connectionLost
   */
  @Override
  public void connectionLost(Throwable t) {
    System.out.println("Connection lost!");
  }

  /**
   * MAIN
   */
  public static void main(String[] args) {
    SampleQuickstartPublisher smc = new SampleQuickstartPublisher();
    smc.runClient();
  }

  /**
   * runClient
   */
  public void runClient() {
    // MQTT クライアントのセットアップ
    String clientID = "d:quickstart:MyDevice:" + M2MIO_THING; // クライアントID
    connOpt = new MqttConnectOptions();

    connOpt.setCleanSession(true);
    connOpt.setKeepAliveInterval(30);

    // ブローカーに接続
    try {
      myClient = new MqttClient(BROKER_URL, clientID);
      myClient.setCallback(this);
      myClient.connect(connOpt);
    } catch (MqttException e) {
      e.printStackTrace();
      System.exit(-1);
    }

    System.out.println("Connected to " + BROKER_URL);

    // トピックの指定
    String myTopic = "iot-2/evt/myeventtype/fmt/json";
    MqttTopic topic = myClient.getTopic(myTopic);

// 10回メッセージを送信する for (int i=1; i<=10; i++) { String pubMsg = "{\"d\":{\"intval\":" + i + "}}"; // 送信メッセージのJSON int pubQoS = 0; MqttMessage message = new MqttMessage(pubMsg.getBytes()); message.setQos(pubQoS); message.setRetained(false); // メッセージをブローカーにパブリッシュ System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS); MqttDeliveryToken token = null; try { // パブリッシュ token = topic.publish(message); // ブローカーへの送信完了を待つ token.waitForCompletion(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } // ブローカーから切断 try { myClient.disconnect(); } catch (Exception e) { e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } }

QuickStart の MQTT ブローカーホストは quickstart.messaging.internetofthings.ibmcloud.com です。ここに 1883 番ポートでアクセスします。またこの例ではデバイスID を "91a19d112233" という固定値にしていますが、実際にはここはユニークな値(MAC アドレスなど)を使ってください。

このプログラムの肝になっているのは runClient() 関数部分です。この runClient() の中でまず MQTT クライアントインスタンスを生成しています。そして
 d:quickstart:MyDevice:(デバイスID)
というクライアント ID を指定して、MQTT ブローカーに接続しています。

接続後は "iot-2/evt/myeventtype/fmt/json" というトピックを指定しながら for ループで10回メッセージを送信(パブリッシュ)する、という内容の処理を実行するプログラム構成になっています。


そして、Node-RED でもアプリケーションを用意します。Node-RED アプリケーションは前回 Mosquitto で動作確認した時と同じ内容で、"91a19d112233" (Java のプログラムで指定しているデバイスIDと同じもの)というデバイス ID を指定した IBM IoT ノードを用意し、デバッグノードに接続しただけのものです。このアプリを Deploy して、動作させておきます:
2015073101


ここで上記の Java プログラムを実行してみます。ソースコード内のデバッグライト(青文字部分)が実行され、コンソールに接続時と送信時のメッセージが表示され、処理が実行されたことが分かります:
2015073102


同時に Node-RED 側のデバッグタブを見ると、いままさにこの Java プログラムからパブリッシュされたメッセージを受け取って表示する様子が分かります。1秒毎にメッセージが送信されて、計10回のメッセージが送られた所で止まります:
2015073103


よく見ると、デバイスタイプが "MyDevice"、イベントタイプが "myeventtype" という文字列になっています。これは MQTT パブリッシュをする際のクライアント ID とトピックで指定しているものなので、これらの値を変更したい場合はクライアント ID やトピックを変更すればよい、ということもわかりました。

というわけで、Java のプログラムから IBM IoT Foundations の QuickStart MQTT ブローカーにメッセージを送信し、その内容を Node-RED で受け取る、というオペレーションが実現できることが確認できました! これを応用すると、とりあえず何らかのデータを MQTT QuickStart に送り、Node-RED で処理して例えば DB に格納するとか、リアルタイム処理を加えるといった Node-RED 得意の土俵で扱うことができるようになりますね。



以前のブログエントリでオープンソース MQTT 環境である Mosquitto の導入方法を紹介しました。このブログエントリではラズベリーパイ用と CentOS 用の紹介をしていますが、Mosquitto 自体には Windows などのバイナリも用意されていて、多くの環境で使えます:
ラズベリーパイにオープンソース MQTT の Mosquitto をインストールする


この Mosquitto を使って、IBM IoTF(IoT Foundations) 環境に用意されている QuickStart と呼ばれる MQTT ブローカーに任意のメッセージデータを送る方法を紹介します。センサーシミュレータなどを使って QuickStart に送られたデータを集めて取り出して加工して・・・という作業は Node-RED 環境があれば簡単ですが、その前段になるデバイスから QuickStart にデータを送るにはどうするか?という手段の紹介です。今回は MQTT クライアント(パブリッシャー)を使った例を紹介します。


準備として、まずは上記のリンク先の情報から、Mosquitto のパブリッシャー(mosquitto_pub)をインストールする所までは済ませておいてください。今回は mosquitto_pub を使って IoTF QuickStart にデータを送信します。

次に、IoTF QuickStart に送られてきたメッセージデータを確認するための環境を整えておく必要があります。IBM Bluemix の IoT Foundations Starter ボイラープレートや Node-RED Starter ボイラープレートを使って、Node-RED 環境を用意してください。この辺りの詳しい手順がよく分からない場合はこちらを参照してください:


Node-RED 環境ができたら、次のような IBM IoT ノードと Debug ノードをつなげただけの、シンプルなフローを作成してください:
2015072701


IBM IoT ノードをダブルクリックし、このような属性値に設定します。Authentication を "QuickStart" に、Input Type を "Device Event" に、そして Device Id には任意のユニークな文字列(下の例では "91a19d112233" にしていますが、同じものを使わないでください。実際にはネットに接続された機器の MAC アドレスを想定しています)を入力します。最後に Name に適当な名前を入力して OK をクリックします:
2015072702


Debug ノードの属性は以下のようにします。Output はデフォルトの payload のままでも実用的にはいいのですが、今回は送られてくるメッセージ全体を確認したいので "complete msg object" に変更して OK をクリックします。最後に "Deploy" をクリックして、デプロイまで済ませておきます:
2015072703


これで Device Id が(今回の例であれば) "91a19d112233" に設定されたデバイスから QuickStart に送られてくるデータを取得してデバッグタブに表示する、というアプリが動いている状態になりました。

では Mosquitto を使って、そのようなデータを QuickStart にパブリッシュしてみます。Mosquitto を導入した環境にログインし、ターミナルのプロンプトから以下のようなコマンドを実行します:
$ mosquitto_pub -h quickstart.messaging.internetofthings.ibmcloud.com -t "iot-2/evt/myeventtype/fmt/json" -m '{"d":{"name1":"stringValue","name2":10}}' -i d:quickstart:MyDevice:91a19d112233

このコマンドは、
 (1) quickstart.messaging.internetofthings.ibmcloud.com という MQTT ブローカーに対して、
 (2) "iot-2/evt/myeventtype/fmt/json" というトピックを指定し、
 (3) '{ "d": { "name1":"stringValue", "name2": 10 } }' という JSON 形式のメッセージを、
 (4) d:quickstart:MyDevice:91a19d112233 というクライアント ID で
パブリッシュする、という処理内容のコマンドです。

※ちなみに IoTF では送信メッセージは '{ "d": { (ここが実際の送信内容) } }' という JSON 形式で送付することを推奨しています。

2015072704


このコマンドを実行した直後、Node-RED アプリのデバッグタブにメッセージが表示されるはずです! 実行したパブリッシュコマンドのメッセージが届いた証拠です:
2015072705


ここで届いたメッセージはこのような内容になっているはずです:
{
 "topic": "iot-2/type/MyDevice/id/91a19d112233/evt/myeventtype/fmt/json",
 "payload": {
  "d": {
   "name1": "stringValue",
   "name2": 10
  }
 },
 "deviceId": "91a19d112233",
 "deviceType": "MyDevice",
 "eventType": "myeventtype",
 "format": "json",
 "_msgid": "6e152038.91eae" 
}

(3) で指定したメッセージは、"payload" の中身として届けられています。また (2) で指定したトピックは "topic" と、"eventType" の値として表示されています。また (4) で指定したクライアント ID は "deviceId""deviceType" として届いていることもわかります。"deviceType" と "eventType" はパブリッシュする側で任意に設定できる、デバイスとイベントの分類用文字列です。


ということは、上記の mosquitto_pub コマンドで実行した MQTT パブリッシュ処理に相当するコマンドを quickstart.messaging.internetofthings.ibmcloud.com に対して実行することで QuickStart にメッセージを送ることができる、ということが分かりました。QuickStart にメッセージを送ることができれば、後は Node-RED を使って便利に処理できます。

このコマンドをプログラミング言語から実行するにはどのようにすればよいのか、それは別途紹介させていただくつもりです。
 

このページのトップヘ