まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

タグ:quickstart

IBM BluemixNode-RED および IBM IoT Foundation 環境を使って集めたデータをリレーショナルデータベース(dashDB)に格納する、という手順を紹介します。

まず準備段階として、「何の」データを集めるか、という問題があります。技術的な要素としては IBM IoT Foundation QuickStart 環境に MQTT パブリッシュが可能なアプリケーションやデバイスであれば何でもいいのですが、後のデータ解析のことを考え、シミュレーターではなく実機のデータを集めることにします。今回は IBM developerWorks Recipes から提供されているサンプルの1つでもあるラズベリーパイを使うことにします:
Connect a Raspberry Pi to Internet of Things Foundation

上記ページの Recipe を参照してラズベリーパイにアプリケーション("iot" という名前のサービス)を導入すると、ラズベリーパイから1秒ごとに CPU 負荷率(%)、CPU 温度(℃)、そしてサインカーブを描くような -1 から 1 までの間の値、の3つの値が1秒おきに IBM IoT Foundation(MQTT ブローカー)に送られるようになります。今回はそのデータを集めてみます。

iot サービスは MAC アドレスを deviceId として IoT Foundation QuickStart にメッセージを MQTT パブリッシュする仕様になっています。そこで iot サービス導入済みのラズベリーパイにログインし、ifconfig コマンドでイーサネットポート(或いはワイアレスポート)の MAC アドレスを調べておきます(下図では b827ebb9ddc0 ):
2015121502

ちなみにこの MAC アドレスは本物なので、もしラズベリーパイのデータを集めたいのであれば同じアドレスを指定いただければ、僕の自宅のラズパイデータを収集することができますw


そして Bluemix 上に作成した Node-RED 環境に ibmiot インプットノードと、debug アウトプットノードを配置して、線を結びます:
2015121501


また ibmiot インプットノードをダブルクリックして開き、deviceId 欄に先程調べた MAC アドレスを入力して OK をクリックします:
2015121503


また debug アウトプットノードもダブルクリックして開き、Output 欄を "complete msg object" に変更します(実体である payload 以外のデータも出力するようにします):
2015121506


最後に Node-RED 画面右上の Deploy ボタンをクリックすると、このノードアプリケーションが動き出します。指定した deviceId のデータを IBM IoT Foundation QuickStart(MQTT ブローカー)を通じて取得し、画面内の debug タブに出力されます:
2015121504


この個々のデータをよく見るとこのようなデータが送られてきていることが確認できます:
2015121505

{
"topic": "iot-2/type/iotsample-raspberrypi/id/b827ebb9ddc0/evt/status/fmt/json",
"payload": { "d":{ "myName": "myPi", "cputemp": 40.08, CPU温度(℃) "cpuload": 0,   CPU負荷(%) "sine": 0.53    サインカーブの値 }
},
"deviceId": "b827ebb9ddc0",
"deviceType": "iotsample-raspberrypi",
"eventType": "status",
"format": "json",
"_msgid": "8ebe5e09.7141a" 一意のメッセージID }

上記の赤字で書かれたデータは個々のメッセージ毎に変わるデータなので、このデータを集めることにします。先程作ったパレットから線を削除し、function ノードを間に追加して線を繋ぎ直します:
2015121507


function ノードをダブルクリックして、ここで JSON データの変換を行うよう指定します。以下のに内容に書き換えて OK をクリックしてください:
2015121508

return { payload:{
 ID: msg._msgid,
 CPUTEMP: msg.payload.d.cputemp,
 CPULOAD: msg.payload.d.cpuload,
 SINE: msg.payload.d.sine
}};

これで送られてくるメッセージから、ID, CPUTEMP, CPULOAD, SINE の4つの値だけをフラットに取り出すことができるようになりました。この状態で再度 Deploy すると debug タブにはこのようなデータが流れてくるはずです:
2015121509


この payload 部分を dashDB に格納します。改めて Bluemix のプロジェクトに dashDB サービスを追加します:
2015121501


実際にデータを追加(insert)するには、その前にテーブルを定義しておく必要があります。テーブルを定義するために dashDB サービスをクリックし、"LAUNCH" ボタンをクリックしてウェブコンソール画面へ移動します:
2015121502


dashDB のウェブコンソール画面が表示されたら、左メニューから "Tables" を選択します:
2015121503


テーブル一覧画面で "Add Table" ボタンをクリックします:
2015121504


"Create a table" ダイアログボックスが表示されます。ここで CREATE TABLE の SQL を指定して、テーブルを作成します:
2015121505


集めたデータを格納できるよう、以下の内容で RPDATA テーブルのスキーマを指定し、最後に "Run DDL" ボタンをクリックします:
CREATE TABLE "RPDATA"
(
  "ID" VARCHAR(20),
  "CPUTEMP" DOUBLE,
  "CPULOAD" DOUBLE,
  "SINE"  DOUBLE
);

"DDL ran successfully" というメッセージが表示されれば成功です:
2015121506


改めて先ほどの画面に戻るとテーブル一覧の中に "RPDATA" テーブルが追加されています。"RPDATA" テーブルを選択すると RPDATA テーブルの設計要素が表示されます。これで集めたデータを格納するためのテーブルが定義できました:
2015121507


改めて Node-RED 画面に戻り、dashDB ノード(左側だけに接続パーツが付いているもの)をパレットに追加し、function ノードから紐付けます:
2015121508


dashDB ノードをダブルクリックして、Service には Bluemix 上のサービス名称(おそらく選択肢は1つだけなのでそれを選択)、Table にはこのデータを格納する RPDATA テーブル(上記で定義したテーブル)を指定し、OK ボタンをクリックします:
2015121509


この状態で改めて Deploy します。成功しても Node-RED 上の画面では特に変化はありませんが、ラズベリーパイから送られてくるデータは dashDB の RPDATA テーブルに格納され続けているはずです:
2015121510


しばらく待ってから dashDB のウェブコンソール画面に移動し、テーブル一覧で RPDATA テーブルを選択して Browse Data タブを選ぶと、その時点までに溜まったデータが表示されます:
2015121511


これでラズベリーパイのデータを IBM IoT Foundation QuickStart 経由で dashDB に格納する、という処理が実現できました。


(追記 このエントリの続編はこちらです)
 

IBM IoT Foundation が無料提供している Quickstart MQTT ブローカーを使ったアプリのサンプルを紹介します。今回紹介するのは IBM Bluemix各サービスの中で止まっているサービスがあったらそれを知らせる、という機能を Node-RED で作成するような内容にします。

このアプリを作成する上で必要なサービスの稼働状況確認は estado (エスタド)を使って調べます:
http://estado.ng.bluemix.net/  
(※この URL は北米データセンターのサービス状況確認用です。英国データセンター用は URL 内文字列の ng の代わりに eu-gb を指定してください)

2015092001


estado は Bluemix ランタイムやサービスの単位での稼働状況を教えてくれるサービスサイトです。問題なく稼働しているランタイム/サービスについては "up" と表示されますが、稼働が止まってたり、なんらかの障害を抱えていると判断されたサービスに関しては赤背景で "down" と表示されます。
2015092002


Bluemix の各機能の稼動状態は上記 URL にアクセスすることで確認することはできます。ただそれでは「上記サイトを見に行った人が稼働状況を確認できる」というだけです。情報そのものは便利なのですが、稼働ステータスをもっとインタラクティブにプッシュしたり、ユーザーフレンドリーな形で提供することを考えてみましょう。

具体的にはこのようなシステムを作ります。この図の緑の丸で描かれた「Java アプリ」部分を実装します:
2015092004


この Java(スタンドアロン)アプリケーションは定期的(この例では1分に1度)に北米データセンターの estado に HTTP GET リクエストを出して Bluemix サービスの稼動状態を確認します。確認した結果、停止中のサービスの一覧を IBM IoT Foundation の Quickstart MQTT ブローカー(quickstart.messaging.internetofthings.ibmcloud.com:1883)へランダムに生成した deviceId を指定して JSON でパブリッシュする、というものです。この時の deviceId はアプリケーション起動時に動的に作成し、コンソールへ出力します。英国データセンター用に作り変えたり、実行感覚を変更したい場合はソースコードの該当箇所を適宜変更してください。
 

estado を参照した結果は Quickstart MQTT ブローカーに格納されるので、パブリッシュ時と同じ deviceId を指定すれば MQTT サブスクライバーを使って取り出せます。特に IBM Bluemix 上の IoT Starter や NodeRED Starter ビルドパックからプロジェクトを作成した場合であれば、IBM IoT Input ノードを使えばこの deviceId だけで簡単にメッセージを取り出すことができるので、その結果をデバッグタブに出力したり、メールで通知したり、といった管理機能が簡単に実現できます。この辺りは Bluemix で NodeRED を使ったことがある方であれば、あまり難しくないと思っています。この辺りの詳しい説明はこちらのエントリを参照ください:
Bluemix の Node-RED サービスで IoT アプリを作る(1/2)


では話を Java アプリケーションの実装に戻します。今回は MQTT のオープンソース Java ライブラリである Paho と、Jakarta CommonsHTTP Client V3 を使ったサンプルを作成してみました。ソースコードのコンパイルや実行時には Paho の Java ライブラリ V3 の最新版(org.eclipse.paho.client.mqttv3-X.X.X.jar)が必要です。こちらからダウンロードしてください:
https://www.eclipse.org/paho/clients/java/


また HTTP Client の JAR ファイルも必要です。別途ダウンロードしておいてください。

なお、今回作成した Java アプリのソースコードは github で公開しました。必要であればこちらからダウンロードしておいてください:


公開したソースコードの内容は以下の様なものです:
import java.util.Calendar;
import java.util.TimeZone;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class EstadoQuickstart implements MqttCallback {
  MqttClient myClient;
  MqttConnectOptions connOpt;

  static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883"; //. IBM IoT Quickstart MQTT ブローカー	
  static final String dc = "ng"; //. 監視対象データセンター

  static String deviceId = "net.bluemix." + dc + ".estado.mqtt.publish.";
  static int interval = 60;

  @Override
  public void connectionLost(Throwable t) {
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws Exception {
    // TODO Auto-generated method stub
    System.out.println("-------------------------------------------------");
    System.out.println("| Topic:" + topic);
    System.out.println("| Message: " + new String(message.getPayload()));
    System.out.println("-------------------------------------------------");
  }

  public static void main(String[] args) {
    try{
      if( args.length >; 0 ){
        interval = Integer.parseInt( args[0] );
      }

//. ランダムな deviceId の生成 String hex = "0123456789abcdef"; for( int i = 0; i < 8; i ++ ){ char c = hex.charAt( ( int )( Math.floor( Math.random() * 16 ) ) ); deviceId += c; } }catch( Exception e ){ } EstadoQuickstart eq = new EstadoQuickstart(); eq.runClient(); } public void runClient() { // TODO Auto-generated method stub String clientID = "d:quickstart:MyDevice:" + deviceId; System.out.println( "deviceId=" + deviceId ); connOpt = new MqttConnectOptions(); connOpt.setCleanSession( true ); connOpt.setKeepAliveInterval( 30 ); // Connect to Broker try{ myClient = new MqttClient( BROKER_URL, clientID ); myClient.setCallback( this ); myClient.connect( connOpt ); }catch( MqttException e ){ e.printStackTrace(); System.exit( -1 ); } String myTopic = "iot-2/evt/estado_" + dc + "/fmt/json"; //. MQTT トピック文字列を生成 MqttTopic topic = myClient.getTopic( myTopic ); while( true ){ try{
//. 日付文字列の生成 Calendar c = Calendar.getInstance(); TimeZone tz = TimeZone.getTimeZone( "Asia/Tokyo" ); c.setTimeZone( tz );; int y = c.get( Calendar.YEAR ); int m = c.get( Calendar.MONTH ) + 1; int d = c.get( Calendar.DAY_OF_MONTH ); int h = c.get( Calendar.HOUR_OF_DAY ); int n = c.get( Calendar.MINUTE ); int s = c.get( Calendar.SECOND ); String dt = y + "/" + ( m < 10 ? "0" : "" ) + m + "/" + ( d < 10 ? "0" : "" ) + d + "T" + ( h < 10 ? "0" : "" ) + h + ":" + ( n < 10 ? "0" : "" ) + n + ":" + ( s < 10 ? "0" : "" ) + s + "+09:00";
//. estado の HTML を取得 String out = ""; HttpClient client = new HttpClient(); GetMethod get = new GetMethod( "http://estado." + dc + ".bluemix.net/" ); int sc = client.executeMethod( get ); String html = get.getResponseBodyAsString();

//. HTML 内部を見て、サービスのステータスを確認 int x = html.indexOf( "<table" ); while( x >; -1 ){ int td1 = html.indexOf( "<td>;", x + 1 ); int td2 = html.indexOf( "</td>;", td1 + 4 ); int td3 = html.indexOf( "<td>;", td2 + 1 ); int td4 = html.indexOf( "</td>;", td3 + 4 ); if( td1 >; -1 && td2 >; -1 && td3 >; -1 && td4 >; -1 ){ String name = html.substring( td1 + 4, td2 ); String svalue = html.substring( td3 + 4, td4 ); if( svalue.equals( "down" ) ){ //. 止まっているサービスを探す String line = "\"" + name + "\""; //. 止まっているサービスの配列を返す if( out.length() >; 0 ){ line = "," + line; } out += line; } x = td4; } x = html.indexOf( "<tr ", x + 1 ); } out = "{\"datetime\":\"" + dt + "\",\"error_services\":[" + out + "]}"; //. MQTT Publish int pubQoS = 0; MqttMessage message = new MqttMessage( out.getBytes() ); message.setQos( pubQoS ); message.setRetained( false ); // Publish the message MqttDeliveryToken token = null; try{ // publish message to broker token = topic.publish( message ); // Wait until the message has been delivered to the broker token.waitForCompletion(); Thread.sleep( 1000 ); //. パブリッシュ後、とりあえず1秒待つ }catch( Exception e ){ e.printStackTrace(); } //. 次の実行タイミングを待つ Calendar c0 = Calendar.getInstance(); int s0 = ( c0.get( Calendar.SECOND ) % interval ); int w = 1000 * ( interval - s0 ); Thread.sleep( w ); }catch( Exception e ){ } } } }



Paho の Java ライブラリとソースコード(EstadoQuickstart.java)をダウンロードしてコンパイル&実行します。実行が成功すると、コンソール画面には次のように Quickstart MQTT ブローカーにパブリッシュした際の deviceId が表示されます。そして Ctrl+C などで終了するまでの間は(デフォルト状態であれば)60秒おきに estado をチェックしてその結果をパブリッシュする、という処理を繰り返します:
2015092101
 

IBM Bluemix のユーザーであれば、この結果を簡単に取り出すことができます。Bluemix 上に IoT Foundation Starter または NodeRED Starter のボイラープレートアプリケーションを作成して NodeRED 環境にアクセスします。そして IBM IoT Input ノードを1つ作成して、ノードの属性値に "Quickstart" と DeviceId に上述の Java アプリ実行時にコンソールに表示された deviceId 値と同じものを指定します:
2015092102


そしてとりあえず debug ノードを追加して IBM IoT ノードと線で結んでデプロイするだけで、このノードに60秒おきに停止している Bluemix サービスの情報が送られてくるようになります。
2015092003


このサンプルでは以下のような JSON フォーマットでメッセージを取得します。後はこのメッセージを Node-RED 側でハンドリングしてメールで通知したり、データベースに格納する、といった処理を実装することになります:
{
 "datetime": "2015/09/19T22:53:00+09:00",
 "error_services": [
  "account-usage", "meteringbackgroundprocess", "meteringdatabase", "runtime-usage-calculator", "dotnet", "alchemy_api [Free]", ・・・(止まっているサービス名の配列)・・・
 ] 
}



というわけで、後はこの Java アプリをどこかのクラウド上か、または(24時間電源ONが保証できる)ローカル環境で稼働させておけば、MQTT にステータスを送り続けてくれる環境が出来上がるので、このような Node-RED アプリを作って動かすことも可能になる、ということになります。

MQTT はデバイスを意識して作成された軽量なプロトコルですが、必ずしもデバイスでしか使えない、というものでもありません。今回のようなサーバーの死活情報を扱ってはいけない、というものではないし、現に某有名ソーシャルサイトのメッセージング機能で使われていたりもします。何をパブリッシュして、どこからサブスクライブするか、そしてサブスクライブした情報をどう扱うか(Node-RED はここが簡単にできる、というサービスです)、が重要だと思っています。みなさんも色んなアイデアを是非形にしてみてください。同様のアルゴリズムが実装できれば Java 以外の言語でも実現できると思っていますので、興味のある方は是非移植にも挑戦してみてください。



 

以前のブログエントリでオープンソース MQTT 環境である Mosquitto の導入方法を紹介しました。このブログエントリではラズベリーパイ用と CentOS 用の紹介をしていますが、Mosquitto 自体には Windows などのバイナリも用意されていて、多くの環境で使えます:
ラズベリーパイにオープンソース MQTT の Mosquitto をインストールする


この Mosquitto を使って、IBM IoTF(IoT Foundations) 環境に用意されている QuickStart と呼ばれる MQTT ブローカーに任意のメッセージデータを送る方法を紹介します。センサーシミュレータなどを使って QuickStart に送られたデータを集めて取り出して加工して・・・という作業は Node-RED 環境があれば簡単ですが、その前段になるデバイスから QuickStart にデータを送るにはどうするか?という手段の紹介です。今回は MQTT クライアント(パブリッシャー)を使った例を紹介します。


準備として、まずは上記のリンク先の情報から、Mosquitto のパブリッシャー(mosquitto_pub)をインストールする所までは済ませておいてください。今回は mosquitto_pub を使って IoTF QuickStart にデータを送信します。

次に、IoTF QuickStart に送られてきたメッセージデータを確認するための環境を整えておく必要があります。IBM Bluemix の IoT Foundations Starter ボイラープレートや Node-RED Starter ボイラープレートを使って、Node-RED 環境を用意してください。この辺りの詳しい手順がよく分からない場合はこちらを参照してください:


Node-RED 環境ができたら、次のような IBM IoT ノードと Debug ノードをつなげただけの、シンプルなフローを作成してください:
2015072701


IBM IoT ノードをダブルクリックし、このような属性値に設定します。Authentication を "QuickStart" に、Input Type を "Device Event" に、そして Device Id には任意のユニークな文字列(下の例では "91a19d112233" にしていますが、同じものを使わないでください。実際にはネットに接続された機器の MAC アドレスを想定しています)を入力します。最後に Name に適当な名前を入力して OK をクリックします:
2015072702


Debug ノードの属性は以下のようにします。Output はデフォルトの payload のままでも実用的にはいいのですが、今回は送られてくるメッセージ全体を確認したいので "complete msg object" に変更して OK をクリックします。最後に "Deploy" をクリックして、デプロイまで済ませておきます:
2015072703


これで Device Id が(今回の例であれば) "91a19d112233" に設定されたデバイスから QuickStart に送られてくるデータを取得してデバッグタブに表示する、というアプリが動いている状態になりました。

では Mosquitto を使って、そのようなデータを QuickStart にパブリッシュしてみます。Mosquitto を導入した環境にログインし、ターミナルのプロンプトから以下のようなコマンドを実行します:
$ mosquitto_pub -h quickstart.messaging.internetofthings.ibmcloud.com -t "iot-2/evt/myeventtype/fmt/json" -m '{"d":{"name1":"stringValue","name2":10}}' -i d:quickstart:MyDevice:91a19d112233

このコマンドは、
 (1) quickstart.messaging.internetofthings.ibmcloud.com という MQTT ブローカーに対して、
 (2) "iot-2/evt/myeventtype/fmt/json" というトピックを指定し、
 (3) '{ "d": { "name1":"stringValue", "name2": 10 } }' という JSON 形式のメッセージを、
 (4) d:quickstart:MyDevice:91a19d112233 というクライアント ID で
パブリッシュする、という処理内容のコマンドです。

※ちなみに IoTF では送信メッセージは '{ "d": { (ここが実際の送信内容) } }' という JSON 形式で送付することを推奨しています。

2015072704


このコマンドを実行した直後、Node-RED アプリのデバッグタブにメッセージが表示されるはずです! 実行したパブリッシュコマンドのメッセージが届いた証拠です:
2015072705


ここで届いたメッセージはこのような内容になっているはずです:
{
 "topic": "iot-2/type/MyDevice/id/91a19d112233/evt/myeventtype/fmt/json",
 "payload": {
  "d": {
   "name1": "stringValue",
   "name2": 10
  }
 },
 "deviceId": "91a19d112233",
 "deviceType": "MyDevice",
 "eventType": "myeventtype",
 "format": "json",
 "_msgid": "6e152038.91eae" 
}

(3) で指定したメッセージは、"payload" の中身として届けられています。また (2) で指定したトピックは "topic" と、"eventType" の値として表示されています。また (4) で指定したクライアント ID は "deviceId""deviceType" として届いていることもわかります。"deviceType" と "eventType" はパブリッシュする側で任意に設定できる、デバイスとイベントの分類用文字列です。


ということは、上記の mosquitto_pub コマンドで実行した MQTT パブリッシュ処理に相当するコマンドを quickstart.messaging.internetofthings.ibmcloud.com に対して実行することで QuickStart にメッセージを送ることができる、ということが分かりました。QuickStart にメッセージを送ることができれば、後は Node-RED を使って便利に処理できます。

このコマンドをプログラミング言語から実行するにはどのようにすればよいのか、それは別途紹介させていただくつもりです。
 

このページのトップヘ