まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

タグ:node-red

ビジュアルデータフローエディタの Node-RED は、IoT を始めとするデータの取り込みや加工、書き出しを視覚的に行う便利なツールです。更に IBM Bluemix 環境であれば、「ボイラープレート」と呼ばれるテンプレート機能を使うことで、サーバー管理とかミドルウェアインストールとかを意識することなく、簡単に Node-RED 環境を構築して、すぐに使い始めることができます。


が、簡単すぎるが故の課題もあります。典型的な例の1つが「バージョンアップ」です。まあクラウドの宿命といえなくもないのですが、ミドルウェアやアプリケーションのバージョン管理をどうするか、という課題です。クラウド環境の場合、バージョン管理含めてクラウド業者に手放したい人もいれば、バージョン管理は自分でやりたいという人もいるので、1つの正解というものが存在しない、難しい問題ではあります。Bluemix では新規にサーバーを作る際のミドルウェア/アプリケーションバージョンは原則最新のものが用意されますが、一度作ったサーバーのミドルウェア/アプリケーションバージョンが勝手に変更されることはありません。つまり使い続ける間は利用者が管理する必要があります。 

・・・と、ここまではいいのですが、問題は「最初の一歩が簡単すぎる&中で何がどう動いているか分からなくても使い始めることができる故、バージョンアップがやけに難しく感じる」ことです(苦笑)。

一応難しく(というか、ややこしく)なっている理由を解説すると、Bluemix 環境では Node.js サーバーがランタイムとして用意されます。そしてその上に Node-RED アプリケーションが導入されて動いているわけですが、このアプリケーション部分である Node-RED のバージョンアップをする必要があるわけです。この Node-RED のバージョンアップの際に、前提となる Node.js のバージョンも合わせて上げる必要が生じるケースもあります。また Node.js では npm というパッケージ管理の仕組みが使われていて、npm の作法でバージョンを管理する必要があります。 普通の Node.js 環境の場合、自分で npm を管理したり、npm に指示を出すような設定ファイルを用意したりするのですが、Bluemix はその辺りを全く知らなくても(事前に何も用意しなくても)インターネット上に Node-RED 環境が作れてしまうのです。で、バージョンアップの段階になってこれらの用意がないことが話をややこしくする要素になるのでした。


という背景の説明はここまでにして、以下は実際に(数ヶ月前のバージョンが古かった頃から動いているような)Bluemix 上の Node-RED をバージョンアップする方法を紹介します。

まずは Node-RED 環境にアクセスし、画面右上のハンバーガーメニューから Node-RED のバージョンを確認します。この図では "0.13.1" というバージョンになっていることが確認できます。2017/Feb/09 時点での Node-RED の最新バージョンは "0.16.2" なので、この環境はバージョンアップが可能な状態にある、ということになります:
2017020801


(Node.js や)Node-RED のバージョンアップのためには Node-RED のスターターコードと呼ばれるファイル一式か、IBM DevOps サービス等を使ったソースコード一式が必要になります。バージョンアップの対象となる Node-RED 環境を作った際にこれらの環境ごと用意されているのであれば、そのソースコードを用意してください。 以下はスターターコードも IBM DevOps サービスも使わず、ソースコードが手元にない状態からの入手方法になります。

改めて IBM Bluemix にログインし、対象の Node-RED ランタイムのプロジェクトを選択します。そして「開始」タブを開くと、Node-RED のカスタマイズの節内に "DOWNLOAD STARTER CODE" と書かれたボタンがあります(バージョンによって微妙に表現が異なるかもしれません)。ここをクリックしてスターターコードの zip ファイルをダウンロードします:
2017020802


ダウンロードした zip ファイルを展開します。この中に package.json というファイルがあるので、これをテキストファイルで開きます。もともとスターターコードをダウンロード済みであったり、IBM DevOps サービス等でソースコードを管理済みだった場合もお手元のコードの package.json を開いてください:
2017020803


package.json ファイルの中身を確認してみます:
2017020901


この中で利用する各コンポーネントモジュールとそのバージョンを管理しています。まず Node-RED 自体は
      :
  "node-red": "0.x"
      :

と指定されていました。これは「0.ナントカの中で最新のもの」を使うよう指定されていることになり、この指定であれば現時点での最新版 Node-RED である 0.16.2 が使われることになります。もしこのような指定になっていなかった場合はこのように変更してください。

次に Node.js のバージョンを確認します。Node-RED v0.16.0 からは Node.js のバージョンが 4.0 以上のみをサポートしており、Node.js のバージョンが古いと最新版の Node-RED は動きません。そこで Node.js のバージョンも合わせておく必要があります。こちらについては
      :
  "node": "4.x"
      :

と指定されている必要があります。もしこのような指定になっていなかった場合はこのように変更してください。


ここまでの変更・確認の上で(必要であれば他のモジュールや public フォルダ以下に静的ファイルを追加した上で)、cf コマンドでプッシュするか、IBM DevOps サービスを使って Deploy してください。

再デプロイ後に改めて Node-RED のバージョンを確認します:
2017020805


↑最新版になっていることが確認できれば成功です。

 

IBM Bluemix からも提供されている NodeRED データフローエディタの中にある TCP インプットノードを使ってみました:
2016021600


この TCP インプットノードはホスト名とポート番号を指定して、TCP サーバーソケットに直結してデータを受信する、というノードです。

実際に NodeRED で使ってみました。データが受信できることを確認したかったので、以下のような TCP インプットノード1つに、デバッグアウトプットノード1つをつなぐ、というシンプルなデータフローを作ります:
2016021601


TCP インプットノードをダブルクリックして属性を指定します。上から順にポート番号(下図では 12345)とホスト名を指定します。なおここで指定するホスト名のサーバーには、NodeRED サーバーから繋がる必要があります(つまりプライベートネットワーク上にある場合は正しくポートフォワードされている必要があり、加えてファイアウォールで指定ポートに穴を開けておく必要もあります)。また受け取るメッセージの形式を単体文字列に指定しています。この状態で NodeRED 上でデプロイします:
2016021602



一方の TCP サーバー側のアプリケーションを用意します。今回はこんなシンプルな Java アプリを、NodeRED のホストとして指定したサーバー上に作ってみました。(デフォルトで)12345 番ポートでクライアントを待ち受けて、接続があればメッセージを返す、というものです。待受ポート番号は起動時に指定することもできるようにしていますが、今回は使いません:
//. myTcpServer.java
import java.io.*;
import java.net.*;

public class myTcpServer{
  public static void main( String[] argv ) throws Exception{
    int port = 12345; //. デフォルトで待ち受けるポート番号
    try{
      //. 実行時のパラメータで待受ポート番号を変更できるようにする
      if( argv.length > 0 ){
        try{
          port = Integer.parseInt( argv[0] );
        }catch( Exception e ){
        }
      }

      //. サーバーソケット作成
      ServerSocket server = new ServerSocket( port );

      while( true ){
        Socket socket = server.accept(); //. 接続してくるクライアントを待つ

        //. 接続してきたら、メッセージを返す
        OutputStream output = socket.getOutputStream();
        String message = "Connected with port: " + port;
        byte[] out = message.getBytes( "UTF-8" );
        output.write( out );

        //. クライアントを破棄(ループで接続待ち状態へ戻る)
        socket.close();
      }
    }catch( Exception e ){
      e.printStackTrace();
    }
  }
}


これをコンパイルして、実行します:
# javac myTcpServer.java
# java myTcpServer

すると NodeRED 側のリクエストがこのサーバーアプリに繋がり、定期的にメッセージが返されることが分かります:
2016021603


デバッグメッセージをよく見ると、10秒おきにメッセージが送られてくることが分かります。これが TCP インプットノードの仕様なんですかね:
2016021604
ともあれ、TCP インプットノードを使って、TCP ポート直結でデータを受け取ることができる、ということが確認できました。

MQTT を使う必要はなく、ホスト名(IPアドレス)とTCP ポートから直結してデータを受け取ることもできる、というサンプルでした。ただ現実問題としてインターネットに公開されたホストからわざわざ TCP ポート直結のファイアウォールを空けてまでデータを受け取る必要があるか、と言われると、そこはやはり MQTT の方がいいのかなあ、とは思っています。まあどうしても MQTT が使えないようなケースに限った利用ケースかもしれません。


IBM BluemixNode-RED および IBM IoT Foundation 環境を使って集めたデータをリレーショナルデータベース(dashDB)に格納する、という手順を紹介します。

まず準備段階として、「何の」データを集めるか、という問題があります。技術的な要素としては IBM IoT Foundation QuickStart 環境に MQTT パブリッシュが可能なアプリケーションやデバイスであれば何でもいいのですが、後のデータ解析のことを考え、シミュレーターではなく実機のデータを集めることにします。今回は IBM developerWorks Recipes から提供されているサンプルの1つでもあるラズベリーパイを使うことにします:
Connect a Raspberry Pi to Internet of Things Foundation

上記ページの Recipe を参照してラズベリーパイにアプリケーション("iot" という名前のサービス)を導入すると、ラズベリーパイから1秒ごとに CPU 負荷率(%)、CPU 温度(℃)、そしてサインカーブを描くような -1 から 1 までの間の値、の3つの値が1秒おきに IBM IoT Foundation(MQTT ブローカー)に送られるようになります。今回はそのデータを集めてみます。

iot サービスは MAC アドレスを deviceId として IoT Foundation QuickStart にメッセージを MQTT パブリッシュする仕様になっています。そこで iot サービス導入済みのラズベリーパイにログインし、ifconfig コマンドでイーサネットポート(或いはワイアレスポート)の MAC アドレスを調べておきます(下図では b827ebb9ddc0 ):
2015121502

ちなみにこの MAC アドレスは本物なので、もしラズベリーパイのデータを集めたいのであれば同じアドレスを指定いただければ、僕の自宅のラズパイデータを収集することができますw


そして Bluemix 上に作成した Node-RED 環境に ibmiot インプットノードと、debug アウトプットノードを配置して、線を結びます:
2015121501


また ibmiot インプットノードをダブルクリックして開き、deviceId 欄に先程調べた MAC アドレスを入力して OK をクリックします:
2015121503


また debug アウトプットノードもダブルクリックして開き、Output 欄を "complete msg object" に変更します(実体である payload 以外のデータも出力するようにします):
2015121506


最後に Node-RED 画面右上の Deploy ボタンをクリックすると、このノードアプリケーションが動き出します。指定した deviceId のデータを IBM IoT Foundation QuickStart(MQTT ブローカー)を通じて取得し、画面内の debug タブに出力されます:
2015121504


この個々のデータをよく見るとこのようなデータが送られてきていることが確認できます:
2015121505

{
"topic": "iot-2/type/iotsample-raspberrypi/id/b827ebb9ddc0/evt/status/fmt/json",
"payload": { "d":{ "myName": "myPi", "cputemp": 40.08, CPU温度(℃) "cpuload": 0,   CPU負荷(%) "sine": 0.53    サインカーブの値 }
},
"deviceId": "b827ebb9ddc0",
"deviceType": "iotsample-raspberrypi",
"eventType": "status",
"format": "json",
"_msgid": "8ebe5e09.7141a" 一意のメッセージID }

上記の赤字で書かれたデータは個々のメッセージ毎に変わるデータなので、このデータを集めることにします。先程作ったパレットから線を削除し、function ノードを間に追加して線を繋ぎ直します:
2015121507


function ノードをダブルクリックして、ここで JSON データの変換を行うよう指定します。以下のに内容に書き換えて OK をクリックしてください:
2015121508

return { payload:{
 ID: msg._msgid,
 CPUTEMP: msg.payload.d.cputemp,
 CPULOAD: msg.payload.d.cpuload,
 SINE: msg.payload.d.sine
}};

これで送られてくるメッセージから、ID, CPUTEMP, CPULOAD, SINE の4つの値だけをフラットに取り出すことができるようになりました。この状態で再度 Deploy すると debug タブにはこのようなデータが流れてくるはずです:
2015121509


この payload 部分を dashDB に格納します。改めて Bluemix のプロジェクトに dashDB サービスを追加します:
2015121501


実際にデータを追加(insert)するには、その前にテーブルを定義しておく必要があります。テーブルを定義するために dashDB サービスをクリックし、"LAUNCH" ボタンをクリックしてウェブコンソール画面へ移動します:
2015121502


dashDB のウェブコンソール画面が表示されたら、左メニューから "Tables" を選択します:
2015121503


テーブル一覧画面で "Add Table" ボタンをクリックします:
2015121504


"Create a table" ダイアログボックスが表示されます。ここで CREATE TABLE の SQL を指定して、テーブルを作成します:
2015121505


集めたデータを格納できるよう、以下の内容で RPDATA テーブルのスキーマを指定し、最後に "Run DDL" ボタンをクリックします:
CREATE TABLE "RPDATA"
(
  "ID" VARCHAR(20),
  "CPUTEMP" DOUBLE,
  "CPULOAD" DOUBLE,
  "SINE"  DOUBLE
);

"DDL ran successfully" というメッセージが表示されれば成功です:
2015121506


改めて先ほどの画面に戻るとテーブル一覧の中に "RPDATA" テーブルが追加されています。"RPDATA" テーブルを選択すると RPDATA テーブルの設計要素が表示されます。これで集めたデータを格納するためのテーブルが定義できました:
2015121507


改めて Node-RED 画面に戻り、dashDB ノード(左側だけに接続パーツが付いているもの)をパレットに追加し、function ノードから紐付けます:
2015121508


dashDB ノードをダブルクリックして、Service には Bluemix 上のサービス名称(おそらく選択肢は1つだけなのでそれを選択)、Table にはこのデータを格納する RPDATA テーブル(上記で定義したテーブル)を指定し、OK ボタンをクリックします:
2015121509


この状態で改めて Deploy します。成功しても Node-RED 上の画面では特に変化はありませんが、ラズベリーパイから送られてくるデータは dashDB の RPDATA テーブルに格納され続けているはずです:
2015121510


しばらく待ってから dashDB のウェブコンソール画面に移動し、テーブル一覧で RPDATA テーブルを選択して Browse Data タブを選ぶと、その時点までに溜まったデータが表示されます:
2015121511


これでラズベリーパイのデータを IBM IoT Foundation QuickStart 経由で dashDB に格納する、という処理が実現できました。


(追記 このエントリの続編はこちらです)
 

このエントリの続きです:
IBM IoT Foundation サービスへのデバイス登録方法

IBM IoT Foundation サービスを使うために、同サービスにデバイスを登録する方法を上記で紹介しました。では IBM IoT Foundation サービスに登録したデバイスのセンサーデータを Node-RED で集めるための方法を紹介します。


まずは上記手順の完了した IBM IoT Foundation サービスの環境変数を参照し、apiKey と apiToken の値を確認しておきます。これらの値は後に利用します:
2015112600


次に Node-RED のフローエディタを開き、IBMIoT インプットノードを1つ用意します:
2015112601


同ノードをダブルクリックして、属性を編集する画面に切り替えます:
2015112602


IBM IoT Foundation サービスを使う場合、Authentication は QuickStart ではなく、API Key にする必要があります。また API Key は "Mine" を選択、その他は以下のようにチェックボックスを付けます(名前も IBM IoTF に変更しています):
2015112603


API Key の横にある鉛筆マークをクリックしてノードの属性を確認します。ここではデバイスを追加した IBM IoT Foundation サービスの API Key と API Token(上記で確認したもの)が入っていることを確認してください(入っていなかったら入力してください):
2015112604


最後にこのノードにデバッグ output ノードを足して&繋いで、送られてきたデータが参照できるようにしておきます。この状態でデプロイしておきましょう:
2015112605


次に IoT Foundation サービスにに登録したデバイスから実際に MQTT メッセージをパブリッシュして、Node-RED のフローに送られたデータが表示されることを確認してみましょう。今回想定している環境ではこのようなデバイスを IoT Foundation サービスに登録していました:
属性属性値
組織IDttb8bh
デバイスタイプMyDevice
デバイスID(MACアドレス)112233445566
認証トークン(自分で指定する接続パスワード)K.Kimura777


このデバイスの場合、MQTT パブリッシャーとしては以下のような条件で IBM IoT Foundation サーバーにメッセージをパブリッシュすることになります:
設定項目設定値
MQTT ブローカーホストttb8bh.messaging.internetofthings.ibmcloud.com
(組織ID).messaging.internetofthings.ibmcloud.com
MQTT ブローカーポート1883
(固定値)
クライアントIDd:ttb8bh:MyDevice:112233445566
d:(組織ID):(デバイスタイプ):(デバイスID)
認証ユーザーIDuse-token-auth
(固定値)
認証パスワードK.Kimura777
(認証トークン)
トピックiot-2/evt/event_id/fmt/json
(固定値)
メッセージ{"d":{"a":"x","b":"y","c":"z"}}
(任意のJSONテキスト)


実際に動くデバイスがあれば、この内容でパブリッシュするようなコードを記述することでメッセージを送信することができます。ここでは MQTTLens を使って同じ動きをエミュレートして、動作を確認してみることにします。

まずコネクション画面では以下の様な内容でホスト名、クライアントID、ユーザー名、パスワードを指定して接続してください:
iotf08


実際にパブリッシュする際には、上記の Topic と JSON メッセージを指定してパブリッシュします:
iotf09


メッセージが正しく送信されれば、Node-RED 側の Debug タブに送信した JSON が表示されるはずです:
2015112601


期待通りに動きました。これで QuickStart を使わずに IBM IoT Foundation サーバーを使う方法が分かりました。QuickStart でなければ QOS = 1 の制約もなく、より自由度の高い MQTT ブローカー利用が可能になりますね。


(参考資料)
https://docs.internetofthings.ibmcloud.com/ja/messaging/mqtt.html
https://docs.internetofthings.ibmcloud.com/ja/messaging/devices.html


 

Node-RED フローエディタの出現などもあって MQTT プロトコルに新たな可能性を感じています。

MQTT といえば「IoT のプロトコル」というイメージを持っている人もいると思います。必ずしも間違いではないと思っていますが、こんな軽量で便利なプロトコルを IoT のためだけに使うのはもったいないかな、、、とも感じるようになりました。それを実感するためにちょっとしたアプリを作ってみました、という内容です。


今回紹介するための例として選んだのが投資相場情報です。個人的にもたしなむ程度には投資をやってますが、デイトレードするようなタイプの投資家からすると、リアルタイムにより近い情報が提供されるのであれば有用なはずです。ただ IoT と投資を結びつけて考えることはあまりないですよね。でも IBM IoT Foundation や Node-RED と組み合わせて投資を考えるとどうなるでしょうか?

まず、ためしに外国為替の価格情報を表示するような、こんなウェブサービスを作ってみました:
http://fx.mybluemix.net/

↑上記 URL にアクセスすると、そのタイミングでの20種類の為替情報を JSON フォーマットで返してくれます。週末とかでない限り、為替相場は24時間変化し続けます。これ自体はごく一般的な REST のウェブサービスです。(ドメインを見ればわかりますが)これは Bluemix 上で動いていますが、このサービスがどこで動いているかは関係ありません:
2015090701


この仕組をこのまま(必要な時にこの URL にアクセスして取得する形で)使うこともできますが、MQTT を使うとプッシュのような仕組みを実現することもできるようになります。別途 MQTT パブリッシャーのアプリケーションを用意して、例えば1分おきに最新の為替状況を取り出して、MQTT ブローカーへパブリッシュする、という方法です。特にパブリッシュ先の MQTT ブローカーを IBM IoT Foundation の QuickStart(quickstart.messaging.internetofthings.ibmcloud.com:1883) に指定すると、Bluemix 上の Node-RED の QuickStart からも参照できるようになるので、Node-RED を使って簡単に為替情報を取り出すことができるようになります。また取り出したデータをデータベースに格納したり、取り出した数値を元にデータフローを記述して実行することは Node-RED の機能を使って簡単に実現することができるようになります:
2015090702


実際に QuickStart の MQTT ブローカーに対してメッセージをパブリッシュするアプリケーションを Java で開発する場合の詳細はこちらを参照ください:
QuickStart MQTT ブローカーに Java からパブリッシュする


今回のアプリでは ClientID(Node-RED の QuickStart で言うところの deviceId)に "net.mybluemix.fx.mqtt.publish" という文字列を指定して quickstart.messaging.internetofthings.ibmcloud.com:1883 にパブリッシュしています。なので、Node-RED 側でも同じ deviceId を指定すれば取り出せるようになる、というものです:
2015090703


実際にこのアプリを作って、ローカルホストで動かしてみました(MQTT パブリッシャーはローカルホストだろうが、プライベートネットワーク内だろうが、MQTT ブローカーに接続できる環境下であれば動きます)。同時に Bluemix 上の Node-RED では MQTT パブリッシャーと同じ deviceId を指定して QuickStart からデータを取り出してみます:
2015090704


とりあえずは普通に Debug アウトプットノードだけを足してデプロイすると・・・
2015090705


為替情報が(この例では)30秒おきに Debug タブに表示されるようになりました!期待通りに動いてます。
2015090706


この結果を WebSocket にも送るように改良してみます:
2015090707


WebSocket に送られてくるデータを視覚化するような HTML を用意するとこんな感じ。MQTT から送られてくる為替データを元にリアルタイム為替チャートが簡単にできちゃいました:
2015090708


後は為替を売買する(業者の)API があれば、これと組み合わせて実際に売買するシステムが作れちゃいそうです。もちろん為替である必要はなく、株式でも同様のデータが取得できて、同様の売買 API があれば同じような仕組みを作ることもできそうですね。

なお、WebSocket 経由で取得したデータの視覚化方法についてはこのページの情報を参考にしています。具体的には Google Visualization API を使っています:
Node-REDを使ってセンサーデータをWebSocketで出力する


MQTT はデバイスデータやセンサーデータを扱うだけでなく、色んな応用ができそうです。アイデア勝負の世界になりそうな感じですね。


(2015/Sep/11 追記)
この中で紹介している Quickstart MQTT ブローカーに為替情報をパブリッシュする Java アプリケーションのソースコードを GitHub で公開します:
https://github.com/dotnsf/FxQuickstart



このページのトップヘ