まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

タグ:mqtt

MQTT は IoT の仕組みの中で使われることの多いプロトコルですが、以前からそれだけに使うのは勿体ないなあと思っていました。MQTT のリアルタイム性はチャットなどのメッセージングアプリケーションにも向いていると思っており、実際に Facebook Messenger の仕組みとしても使われているとの情報もあります。というわけで、MQTT を使ったチャットアプリの開発に挑戦してみました。

加えて、IoT といえば IBM Bluemix からも提供されている Node-RED が有名です。今回は IBM Bluemix 環境上の Node-REDIBM IoT Platform サービスの quickstart エディションを使ってチャットを作ってみることに挑戦しました。

何はともあれ、まずは Node-RED 環境を用意します。IBM Bluemix にログインし、ボイラープレートから Node-RED Starter を選択して、Node-RED が使えるアプリケーションサーバーインスタンスを用意します:
2017040301


IBM Bluemix の Node-RED を使わずに、自前等で Node-RED 環境を用意する場合は npm などで node-red-contrib-scx-ibmiotapp ノードをインストールして有効にしておく必要があります:
2017040302

(↑この ibmiot ノードが使える状態にしてください)


では Node-RED でチャットアプリを作ります、といっても実はかなりシンプルです。1つ1つノードを配置してもいいのですが、まずはインポートして中身を確認し、必要に応じて説明を加えることにします。画面右上のハンバーガーメニューを開き、 読み込み→クリックボード を選択します:
2017040303

 
「フローの読み込み」画面になったら、ここの内容をそっくりそのままコピー&ペーストして「読み込み」ボタンをクリックし、フロー定義を作成します:
2017040304


正しく読み込みが完了すると以下のような3本のデータフローが再現されるはずです。上から1つ目は GET /chat 実行時のチャット画面(HTML)の定義、2つ目はチャットメッセージを POST(POST /post) した時の処理、そして3つ目は IBM IoT サービスを使って MQTT 経由でチャット参加メンバーのメッセージを取り出す処理を定義しています。いずれもシンプルな処理で実現できていることが確認できます:
2017040305


画面内に2つの IBM IoT ノードが含まれています(青いノード、INPUT/OUTPUT が1つずつ)。それぞれダブルクリックすると、どちらにも Device Id を入力する項目があり、いずれも初期状態では空になっているはずです。この Device Id にはユニークな値を指定する必要があります。以下の例では "dotnsf.mqtt.chat.001" という値を設定していますが、ここには自分の名前や日付を含めるなどして、誰とも被らないユニークな値を設定します(2つのノード両方の Device Id に同じ値を指定します)。指定後「完了」ボタンをクリックします:
2017040306


また、2つ目のフローの中にある function ノードの中身を確認します。ここはチャット参加者が自分のメッセージを投稿した時に実行されるフローで、HTTP リクエストの本文(msg.req.body)の値を取り出して、その値を IoT の Payload に変換している部分です。これも非常にシンプルな処理内容が記述されていることが確認できます:
2017040307


改めて3つそれぞれのフローの中でどのような処理が定義されているのかを確認してみましょう。1つ目のフローはウェブブラウザから(サーバー名)/chat という URL に(GET リクエストで)アクセスした時に返される HTML の定義です。実際の HTML や CSS/JavaScript 定義そのものは「チャット画面」というテンプレートノードの中で定義されています(後述します):
2017040301


2つ目のフローはチャット画面の中で利用者が自分のメッセージをチャットに流した時に実行される処理です。チャットにメッセージを流すと(サーバー名)/post という URL に名前やメッセージ内容が HTTP POST され、その内容を(上記のように)取り出して MQTT の Payload に変換し、IBM IoT に転送(MQTT の処理でいうと「パブリッシュ」)しています。転送時にユニークな Device Id を指定していることで、同じテンプレートを使っても異なるアプリケーションであるとみなし、他の人が作ったチャットと混線しないようにしています。なお、緑色のノードはデバッグノードで、POST されたメッセージの内容をこの画面内からも確認できるようにしているだけで、実際の処理には無関係です(無くても動きます):
2017040302


そして3つ目のフローは上記2つ目のフローで処理されたメッセージを取り出すフローになります。自分だけでなく、同じチャット画面を見ている他のユーザーがメッセージを流した場合もこの処理が実行され、IBM IoT 経由で送られたメッセージが(サーバー名)/ws/chat という URL の WebSocket に送信されるよう記述されています。実際には1つ目の HTML の中で /ws/chat を監視しており、ここにメッセージが送られてきた場合の処理が記述されています:
2017040303


この状態でデプロイ(画面右上のボタン)をクリックすることで実際にチャットアプリケーションを使うことができるようになります。デプロイ後、PCやスマホのウェブブラウザで https://(Node-RED の動いているホスト名)/chat にアクセスしてみてください。Node-RED の一番上のフローが呼ばれ、テンプレートノードの中で定義された内容の HTML が表示されます。初期状態では↓のように名前の入力を求められます:
2017040301


適当な名前を入力して「入室」ボタンをクリックします(入室のタイミングで IoT と接続します):
2017040302


入室すると画面が切り替わり、自分の名前とメッセージ入力フィールドが画面下に表示されます。画面の大半はチャット履歴が表示される画面ですが、まだ何もメッセージがないので何も表示されていません:
2017040303


では試しに何かメッセージを入力してみます。入力を確定するには PC からであれば Enter キーを、スマホであれば「開く」などでメッセージを確定させてください:
2017040304


入力したメッセージがチャット履歴に表示されます。これは自分のメッセージなので右側に吹き出しがついて、緑色で表示されるようにしています:
2017040305


もう1つメッセージを送ると、メッセージが下に追加されます:
2017040306


試しに別のブラウザや別のスマホなどから同じ URL にアクセスして、別の名前で入室してメッセージを送信してみます。このユーザーから見ると入室前のメッセージは見れないので、自分のメッセージが一番最初に表示されます:
2017040307


が、元のユーザーからは別のユーザーが入室してきてメッセージを送信したことになります。その場合は白背景で、左側に吹き出しがある状態でチャット履歴に記録されます(この UI 見たことありますよね。意識して CSS を作ってます(笑)):
2017040308


同様にして別のユーザーが入室してくると、そのユーザーのメッセージも白背景で左に吹き出しが付く形で表示されていく、というものです。最低限のグループチャットの機能は実現できていると思ってます:
2017040309


さて、ではこのチャット画面の HTML はどうなっているのかを説明します。具体的な内容は PC ブラウザからアクセスして HTML ソース(と埋め込まれた CSS など)を直接参照していただきたいのですが、肝心な部分の JavaScript はこのようになっています(赤字はコメント):
  :

var socket; var wsUrl = 'wss://' + location.hostname + '/ws/chat'; //. WebSocket監視先URL function connect(){ //. 「入室」時に呼ばれる処理 console.log( "connect()" ); socket = new WebSocket(wsUrl); //. WebSocket 接続 socket.onmessage = function(e) { //. WebSocket にメッセージが来たら、以下を実行 var msg = JSON.parse(e.data); //. 送信データ(POST されたデータ)を JSON で取り出し //console.log( msg ); if( msg.id != deviceid ){ //. 自分のメッセージなのか、他人のメッセージなのかを判別 //. 自分以外の発言 var box = "<div class='question_box'><p class='notmymessage'>" + msg.name + "</p><div id='arrow_question'>" + msg.message + "</div></div>"; $('#contents').append( box ); }else{ //. 自分の発言 var box = "<div class='answer_box'><p class='mymessage'>" + msg.name + "</p><div id='arrow_answer'>" + msg.message + "</div></div>" $('#contents').append( box ); } } }

:

2つ目のフローで投稿したメッセージの内容が IBM IoT ノードに(MQTT で)送られていました。自分だけでなく同じチャットルームに入室している全ての人のメッセージがこのように MQTT データとして送信されます。 そしてその内容を3つ目のフローで取得し、/ws/chat というパスに WebSocket データとして送信していました。つまりチャットで誰かがメッセージを送ると、/ws/chat に WebSocket でデータが送られるということになります。そのデータを監視して、自分のメッセージか他人のメッセージかを判別して Dynamic HTML でチャット履歴に追加する、という部分の処理が上記になります。


そしてこれだけでチャットが実現できているということは、(気付いている人もいるかもしれませんが)少なくともここまでの処理に関してはデータベースを一切使わずに実現できていることを意味しています。確かにリアルタイムデータ処理なのでデータを保存する必要はないのですが、実際に保存せずに実現できるというのはなかなか興味深いのではないかと思っています。


※(注)最初にお断りしておきます。マジメぶって書いてますが馬鹿エントリです。


全てのきっかけは最近たまにみかけるこういった記事でした:

人工知能やコグニティブエンジンと呼ばれる技術の発達により、これまで人間の脳でないと判断できなかったようなことをコンピュータができるようになり、人間の仕事がより低コストな人工知能やそれらを搭載したロボットによって奪われてしまう時代がくる、という啓発記事です。

個人的にはそこまでそんな時代が身近に迫ってきているとは思っていません。ただし、その一方で企業間の競争が働いたこともあり、ここ数年における人工知能分野の発展はすさまじいものがあります。静止画像や個人の機械認識率はぐんと上がり、各社が API を公開している背景も手伝って、人工知能に触れる機会がより身近な世界になりつつあるのも事実だと思っています。以下は個人的見解ですが、クリエイティブな仕事(例えば小説を書く、など)を学習させるのはまだ難しいにせよ、脳を使わない単純作業や、ルーチン化された業務などは意外と早い段階で人間の効率を追い抜く日がやってくるかもしれない(そうなると作業コストで勝てるわけがないので、本当に仕事を失う日がやってくるかもしれない)、と思っています。

例えばお客様とお話ししていて、ただ頷いて聞いているだけ。お客様が話し終わったらすかさず相槌を打つ、そんなフローチャートのような業務では近い将来に職を奪われてしまうかもしれないのです!
(注 実在する誰かのことではありません)
2015022305


さて話は変わって、先日秋葉原でこのような部品を買ってしまいました:

2015022300



この SEN02281P はいわゆる「音センサー」です。画像左上にある大きな丸い部分がマイクになっていて、ここで音を拾って、その情報を電気回路を通じて外部に知らせることができる、というものです。買った後で知ったのですが、Arduino に接続したり、Raspberry PiGrovePi という拡張モジュールを取り付けて簡単に使う方法がネットなどで紹介されていました。

・・・ん、もしかすると、この SEN02281P と(例えばラズベリーパイとかの)演算機能を使えば、上記のような簡単なデータフローが実現できてしまうんじゃないだろうか? つまり「誰かが喋っている時は頷き、喋り終わったら相槌を入れる、という仕組みは、このセンサーとアルゴリズムを実装するプログラムだけで実現できてしまうんじゃないだろうか?」ということに気付いてしまったのです! というわけで、よく調べずにとりあえず買ってしまいました。


自分は「ラズベリーパイならメジャーだからまあ繋がるだろう、その方法もネットで見つかるだろう」とタカを括っていたのですが、これが意外と苦戦しました。結論からいうと GrovePi を使わない方法(ラズベリーパイの GPIO に直接繋げる方法)を見つけることができませんでした。えーマジで!?自分で調べるしかないの??電子回路は苦手なんだよなあ。。まあ挑戦してみました。以下、やってみたことのおさらいの意味で書いてます。当方こっち方面はド素人なので、間違いを見つけたり、こうするともっといいよ、という方法があればウェルカム、というか教えてください。

・・・改めてパーツを眺めてみました。接続端子はこの↓画面上部の白い四角の中に生えた4つの突起部分です:
Loudness_101020063_01

これを裏返すとこんな感じ(上図とは左右が逆になった状態):
2015022301


拡大するとこんな感じ。ちょっと見難いのですが、この画面の左から順に(反対から見た場合は右から順に) SIG / NC / VCC / GND と書かれています:
2015022302


GND はアース(Ground)、VCC は電圧、これは(他の部品とほぼ共通なので)分かる。上記の商品ページを見ると、このパーツの動作電圧は 5V(3.5~10V) と書かれているので、とりあえず 5V の電圧ピンにジャンパケーブルを繋いであればいいかな。そして SIG はシグナル、つまりここを GPIO27 とかに繋げて音の信号を受け取るんだろうな・・・ で、NC ?なんだこれ、見たことないぞ・・・

で、ここだけ調べて分かったのは NC = Not Connected 、つまり「どことも繋がない」という端子らしい。そうなんだ。。じゃ、なんで存在してるんだろ?? うーん・・・まあ、いいやw

というわけで、ジャンパケーブルを使ってこんな感じで自分のラズベリーパイの GPIO に接続しました:
2015022303

これで 5V の電源を供給し、アースも備え、SEN02281P が感知した音を取り込む仕組みが動くはずです。実際の写真はこんな感じです(メス-メスのジャンパーケーブルがあればもっと綺麗に接続できたのに・・・):
2016022400


そして、ラズベリーパイ側にはこのような Python プログラムを導入しました:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# SEN02281P ----- RaspberryPi GPIO
# =SIG ---------- 13
# =NC
# =VCC ---------- 2
# =GND ---------- 20

import paho.mqtt.client as mqtt
import time
import RPi.GPIO as GPIO

def on_connect(client,userdata,flags,rc):
	print( "Connection with result code " + str(rc) )
	client.subscribe( "sen02281p" )

def on_message(client,userdata,msg):
	print( msg.topic + " " + str(msg.payload) )

def reading(sensor):
	sum = -1 
	if sensor == 0:
		sum = 0
		for i in range(0,20):
			time.sleep(0.1)
			a = GPIO.input(SIG)
			sum += a
	else:
		print "Incorrect function."

	return sum

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BOARD)
SIG = 13
GPIO.setup(SIG,GPIO.IN)

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect( "iot.eclipse.org", 1883 )
while client.loop() == 0:
	msg = reading(0);
	client.publish( "sen02281p", msg, 0, True )
	pass

GPIO.cleanup()
※事前に pip install paho-mqtt をして、Paho の Python ライブラリを導入済みです。

ちなみに上記ソースファイル(sen02281p.py)はこちらからダウンロードできます:
https://raw.githubusercontent.com/dotnsf/sen02281p/master/sen02281p.py


GPIO の(SEN02281P の SIG 端子とつながっている)13番ピンからのインプット情報を 0.1 秒ごとに20回(つまり2秒間)取得し、その20回中音が確認できた回数を MQTT ブローカー(iot.eclipse.org)に投げる、というものです。つまり MQTT ブローカーに対しては2秒おきにどれくらい音が識別できたかを 0 から 20 の整数値でパブリッシュする、というものです。トピックは上記例では "sen02281p" と指定していますが、皆さんがもしこのソースを使う場合は少し変えていただけると嬉しいです。

次に Bluemix 上の NodeRED 環境を使って、このパブリッシュされたメッセージを取り出す仕組みを用意します。MQTT インプットノードを用意し、ホスト名に上記の MQTT ブローカーホスト(iot.eclipse.org:1883)、トピックに "sen02281p" を指定しています。このノードからは2秒に1回、ラズベリーパイの接続された SEN02281P のマイクから拾った音の頻度が渡されてくる、という仕組みとなります。また、その取り出した結果を /ws/unzk_sensor というパスの WebSocket に出力しています:
2015022304


後はこの /ws/unzk_sensor からリアルタイムにデータを取り出して動く WebSocket アプリケーションを用意してあげればマイクで拾った音の頻度をリアルタイムに可視化するようなアプリケーションを作ることができる、ということになります。そのサンプルアプリ(やその中で使う画像)も合わせてこちらで公開しておきます:
https://github.com/dotnsf/sen02281p


このアプリを上記の NodeRED 環境にデプロイしてアプリケーション(hoho.html)を開くと、このような画面になります:
2016022401


ひたすら「頷いている」画面になっています。何もデータが送られていないとただ頷いているだけですが、一応この画面が出ればデプロイには成功していることになります。

ではラズベリーパイ側のアプリも起動します。ネットに接続されたラズベリーパイ上で先程の MQTT パブリッシャーアプリを実行します:
# python sen02281p.py

そして先程の頷き画面をリロードすると・・・ ラズベリーパイに接続されたマイクが音を拾っている間は頷き、音が途切れたと判断した時に「ほほー」と相槌を売ってくれるようになります!
2016022402


この「音を拾っているか」「音が途切れたか」の判断がまだ少し甘いところがあるかもしれませんが、一応それっぽく動いていることが確認できました。これで忙しい営業さんに変わってお客様のお話しを上手に引き出してくれるロボットができました(笑)。


実際に動いている動画を Ustream に上げておきます:

Live streaming video by Ustream




MQTT をプログラミングで実装することを試みる場合、真っ先に思いつくのは Paho のライブラリを使うことです。Paho はオープンソースの MQTT ライブラリであり、多くの言語向けにクライアントライブラリが提供されています。この Paho を使うことができれば、比較的簡単に MQTT パブリッシャー/サブスクライバー機能を実装することが可能です:
http://www.eclipse.org/paho/
2016012501


ところが、Paho では PHP のライブラリは提供されていません。比較的利用者が多いと思われる PHP で MQTT を実装する場合、Paho 以外のライブラリを探す必要があるのですが、その候補の1つになりそうなのがオープンソースの phpMQTT です:
https://github.com/bluerhinos/phpMQTT


利用のための準備は非常に簡単で、上記サイトから phpMQTT.php ファイルをダウンロードするだけです。後はこのライブラリを読み込んで実装するだけ、です。

例えば MQTT パブリッシャーを実装する場合は、以下の様なコードを記述します(ファイル名は publish.php として、phpMQTT.php と同じディレクトリに作成します)。ダウンロードした phpMQTT.php を require で呼び出した上で、ホスト名、ポート番号、クライアント ID、トピック、そしてメッセージ本文といったパーツを単純に指定して、connect() して publish() メソッドを実行しているだけです:
<?php
require( "./phpMQTT.php" );

$mqtt_host = "quickstart.messaging.internetofthings.ibmcloud.com"; # MQTT ブローカー
$mqtt_port = 1883; # MQTT ポート番号
$mqtt_clientid = "d:quickstart:MyDevice:me.juge.mqtt.test"; # クライアントID
$mqtt_topic = "iot-2/evt/status/fmt/json"; # トピック文字列
$mqtt_message = '{"val1":123,"val2":"ABC"}'; # パブリッシュするメッセージ

$mqtt = new phpMQTT( $mqtt_host, $mqtt_port, $mqtt_clientid );
if( $mqtt->connect() ){
  $mqtt->publish( $mqtt_topic, $mqtt_message, 0 );
  $mqtt->close();
}
?>

↑上記のコードは IBM の IoT Foundation Quickstart 環境を想定してブローカーやクライアントID、トピックを指定しています。デバイス ID として "me.juge.mqtt.test" を指定してメッセージをパブリッシュしています。なので、IBM Bluemix の Node-RED 環境で IBM IoT インプットノードのデバイス ID に同じ文字列 "me.juge.mqtt.test" を設定すれば Node-RED にメッセージを送付することができます:
2016012501


(注 皆さんが試す場合は別のユニークな文字列を指定してください。要は PHP 内で指定するデバイス ID と、Node-RED で指定するデバイス ID が同じものを指定する必要がある、という意味です)


この状態で IBMIoT インプットノードに debug アウトプットノードを繋げて Node-RED アプリケーションをデプロイし、上記の PHP ファイルを実行します:
# php -f publish.php

するとコード内で記述された処理が実行され、IBM IoT Foundation Quickstart の MQTT ブローカー(quickstart.messaging.internetofthings.ibmcloud.com:1883)にメッセージが送信され、Node-RED 内の IBMIoT ノードが受け取り、このように画面内の debug タブに表示されれば成功です:
2016012502


PHP でも MQTT プログラミングができること、そして IBM の無料 MQTT ブローカーである QuickStart 環境にメッセージがパブリッシュできることが確認できました。



 

Chrome 拡張アプリケーションとして MQTT パブリッシャー/サブスクライバーとして動作する MQTTLens なるものを使ってみました:
2015111201


これを Chrome ストアからダウンロード(インストール)して起動すると、MQTT のクライアントとして利用できます。パブリッシャーとしてもサブスクライバーとしても動作する、というものです。動作確認に便利そうです。


試しに IBM BluemixNode-RED + IoT Foundation QuickStart 環境で使ってみました。今回の例では MQTTLens を MQTT パブリッシャーとみなして IoT Foundation QuickStart にメッセージをパブリッシュし、Node-RED 側でそのメッセージを受け取って表示する、というシンプルな流れを試してみました。

まずは Node-RED 側を用意します。IBM Bluemix のボイラープレートを使って Node-RED 環境を構築し、Node-RED フローエディタを開き、IBMIOT (input)ノードと Debug (output)ノードを1つずつ配置して接続します:
2015111202


IBMIOT ノードの属性を以下のように指定します。今回は QuickStart 環境を使うので、"Authentication" には "QuickStart" を指定し、"Device Id" として、誰も思いつかないような適当な文字列(この例では "mydeviceid1234")を入力します:
2015111203


最後にこの状態で "Deploy" します。これで Node-RED 側の(つまり MQTT サブスクライバー側の=受け取り側の)準備は完了です。MQTT メッセージが送られてきさえすれば、そのメッセージ内容を表示できる準備が整いました。いつもながら Node-RED だと簡単です:
2015111204


次に MQTTLens 側を設定します。Chrome を起動して MQTTLens を開き(または上記のダウンロード URL からダウンロード後に「アプリを起動」ボタンをクリックし)ます:
2015111209


MQTTLens の初期画面が表示されたら、画面左上の "Connections" 横の+印をクリックします:
2015111205


"Connection Details" 画面が表示されたら、以下の赤枠の内容を指定します。"Connection name" は接続に付ける名称なので任意の文字列で構いません(図では "QuickStart")。"Hostname" には IBM IoT Foundation QuickStart サーバーを指定する必要があるので "quickstart.messaging.internetofthings.ibmcloud.com" を指定します。"Port" はデフォルト値の "1883" のままにします。"Client ID" には QuickStart 用のフォーマットである "d:quickstart:(任意の値):(Device Id)" を指定します。この中の (Device Id) の部分は、先程の Node-RED フローエディタの中で指定した Device Id の値(上記例では mydeviceid1234)と同じものを指定してください。最後に画面右下の "CREATE CONNECTION" ボタンをクリックして接続します:
2015111206


接続すると MQTTLens の画面が以下のように切り替わります(指定した "QuickStart" が "connected" な状態になっていることがわかります)。今回 MQTTLens は MQTT パブリッシャーとして使いたいので、"Publish" と書かれた下の部分を使います。まず "Topic" にはこれも QuickStart 用のフォーマットである "iot-2/evt/(任意の文字列)/fmt/json" と入力し、その下の "Message" に適当な文字列を入力します。最後に "Publish" ボタンをクリックして、このメッセージを QuickStart サーバーにパブリッシュします:
2015111207


正しくメッセージが送信されると、稼働中の Node-RED アプリケーションがこのメッセージを受け取り、Debug タブ内に Message で指定した内容が表示されるはずです:
2015111208


あっさり成功。MQTT パブリッシャー側のデバイスをブラックボックス化しての動作確認などには便利に使えそうで重宝しそうです。


IBM IoT Foundation が無料提供している Quickstart MQTT ブローカーを使ったアプリのサンプルを紹介します。今回紹介するのは IBM Bluemix各サービスの中で止まっているサービスがあったらそれを知らせる、という機能を Node-RED で作成するような内容にします。

このアプリを作成する上で必要なサービスの稼働状況確認は estado (エスタド)を使って調べます:
http://estado.ng.bluemix.net/  
(※この URL は北米データセンターのサービス状況確認用です。英国データセンター用は URL 内文字列の ng の代わりに eu-gb を指定してください)

2015092001


estado は Bluemix ランタイムやサービスの単位での稼働状況を教えてくれるサービスサイトです。問題なく稼働しているランタイム/サービスについては "up" と表示されますが、稼働が止まってたり、なんらかの障害を抱えていると判断されたサービスに関しては赤背景で "down" と表示されます。
2015092002


Bluemix の各機能の稼動状態は上記 URL にアクセスすることで確認することはできます。ただそれでは「上記サイトを見に行った人が稼働状況を確認できる」というだけです。情報そのものは便利なのですが、稼働ステータスをもっとインタラクティブにプッシュしたり、ユーザーフレンドリーな形で提供することを考えてみましょう。

具体的にはこのようなシステムを作ります。この図の緑の丸で描かれた「Java アプリ」部分を実装します:
2015092004


この Java(スタンドアロン)アプリケーションは定期的(この例では1分に1度)に北米データセンターの estado に HTTP GET リクエストを出して Bluemix サービスの稼動状態を確認します。確認した結果、停止中のサービスの一覧を IBM IoT Foundation の Quickstart MQTT ブローカー(quickstart.messaging.internetofthings.ibmcloud.com:1883)へランダムに生成した deviceId を指定して JSON でパブリッシュする、というものです。この時の deviceId はアプリケーション起動時に動的に作成し、コンソールへ出力します。英国データセンター用に作り変えたり、実行感覚を変更したい場合はソースコードの該当箇所を適宜変更してください。
 

estado を参照した結果は Quickstart MQTT ブローカーに格納されるので、パブリッシュ時と同じ deviceId を指定すれば MQTT サブスクライバーを使って取り出せます。特に IBM Bluemix 上の IoT Starter や NodeRED Starter ビルドパックからプロジェクトを作成した場合であれば、IBM IoT Input ノードを使えばこの deviceId だけで簡単にメッセージを取り出すことができるので、その結果をデバッグタブに出力したり、メールで通知したり、といった管理機能が簡単に実現できます。この辺りは Bluemix で NodeRED を使ったことがある方であれば、あまり難しくないと思っています。この辺りの詳しい説明はこちらのエントリを参照ください:
Bluemix の Node-RED サービスで IoT アプリを作る(1/2)


では話を Java アプリケーションの実装に戻します。今回は MQTT のオープンソース Java ライブラリである Paho と、Jakarta CommonsHTTP Client V3 を使ったサンプルを作成してみました。ソースコードのコンパイルや実行時には Paho の Java ライブラリ V3 の最新版(org.eclipse.paho.client.mqttv3-X.X.X.jar)が必要です。こちらからダウンロードしてください:
https://www.eclipse.org/paho/clients/java/


また HTTP Client の JAR ファイルも必要です。別途ダウンロードしておいてください。

なお、今回作成した Java アプリのソースコードは github で公開しました。必要であればこちらからダウンロードしておいてください:


公開したソースコードの内容は以下の様なものです:
import java.util.Calendar;
import java.util.TimeZone;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class EstadoQuickstart implements MqttCallback {
  MqttClient myClient;
  MqttConnectOptions connOpt;

  static final String BROKER_URL = "tcp://quickstart.messaging.internetofthings.ibmcloud.com:1883"; //. IBM IoT Quickstart MQTT ブローカー	
  static final String dc = "ng"; //. 監視対象データセンター

  static String deviceId = "net.bluemix." + dc + ".estado.mqtt.publish.";
  static int interval = 60;

  @Override
  public void connectionLost(Throwable t) {
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws Exception {
    // TODO Auto-generated method stub
    System.out.println("-------------------------------------------------");
    System.out.println("| Topic:" + topic);
    System.out.println("| Message: " + new String(message.getPayload()));
    System.out.println("-------------------------------------------------");
  }

  public static void main(String[] args) {
    try{
      if( args.length >; 0 ){
        interval = Integer.parseInt( args[0] );
      }

//. ランダムな deviceId の生成 String hex = "0123456789abcdef"; for( int i = 0; i < 8; i ++ ){ char c = hex.charAt( ( int )( Math.floor( Math.random() * 16 ) ) ); deviceId += c; } }catch( Exception e ){ } EstadoQuickstart eq = new EstadoQuickstart(); eq.runClient(); } public void runClient() { // TODO Auto-generated method stub String clientID = "d:quickstart:MyDevice:" + deviceId; System.out.println( "deviceId=" + deviceId ); connOpt = new MqttConnectOptions(); connOpt.setCleanSession( true ); connOpt.setKeepAliveInterval( 30 ); // Connect to Broker try{ myClient = new MqttClient( BROKER_URL, clientID ); myClient.setCallback( this ); myClient.connect( connOpt ); }catch( MqttException e ){ e.printStackTrace(); System.exit( -1 ); } String myTopic = "iot-2/evt/estado_" + dc + "/fmt/json"; //. MQTT トピック文字列を生成 MqttTopic topic = myClient.getTopic( myTopic ); while( true ){ try{
//. 日付文字列の生成 Calendar c = Calendar.getInstance(); TimeZone tz = TimeZone.getTimeZone( "Asia/Tokyo" ); c.setTimeZone( tz );; int y = c.get( Calendar.YEAR ); int m = c.get( Calendar.MONTH ) + 1; int d = c.get( Calendar.DAY_OF_MONTH ); int h = c.get( Calendar.HOUR_OF_DAY ); int n = c.get( Calendar.MINUTE ); int s = c.get( Calendar.SECOND ); String dt = y + "/" + ( m < 10 ? "0" : "" ) + m + "/" + ( d < 10 ? "0" : "" ) + d + "T" + ( h < 10 ? "0" : "" ) + h + ":" + ( n < 10 ? "0" : "" ) + n + ":" + ( s < 10 ? "0" : "" ) + s + "+09:00";
//. estado の HTML を取得 String out = ""; HttpClient client = new HttpClient(); GetMethod get = new GetMethod( "http://estado." + dc + ".bluemix.net/" ); int sc = client.executeMethod( get ); String html = get.getResponseBodyAsString();

//. HTML 内部を見て、サービスのステータスを確認 int x = html.indexOf( "<table" ); while( x >; -1 ){ int td1 = html.indexOf( "<td>;", x + 1 ); int td2 = html.indexOf( "</td>;", td1 + 4 ); int td3 = html.indexOf( "<td>;", td2 + 1 ); int td4 = html.indexOf( "</td>;", td3 + 4 ); if( td1 >; -1 && td2 >; -1 && td3 >; -1 && td4 >; -1 ){ String name = html.substring( td1 + 4, td2 ); String svalue = html.substring( td3 + 4, td4 ); if( svalue.equals( "down" ) ){ //. 止まっているサービスを探す String line = "\"" + name + "\""; //. 止まっているサービスの配列を返す if( out.length() >; 0 ){ line = "," + line; } out += line; } x = td4; } x = html.indexOf( "<tr ", x + 1 ); } out = "{\"datetime\":\"" + dt + "\",\"error_services\":[" + out + "]}"; //. MQTT Publish int pubQoS = 0; MqttMessage message = new MqttMessage( out.getBytes() ); message.setQos( pubQoS ); message.setRetained( false ); // Publish the message MqttDeliveryToken token = null; try{ // publish message to broker token = topic.publish( message ); // Wait until the message has been delivered to the broker token.waitForCompletion(); Thread.sleep( 1000 ); //. パブリッシュ後、とりあえず1秒待つ }catch( Exception e ){ e.printStackTrace(); } //. 次の実行タイミングを待つ Calendar c0 = Calendar.getInstance(); int s0 = ( c0.get( Calendar.SECOND ) % interval ); int w = 1000 * ( interval - s0 ); Thread.sleep( w ); }catch( Exception e ){ } } } }



Paho の Java ライブラリとソースコード(EstadoQuickstart.java)をダウンロードしてコンパイル&実行します。実行が成功すると、コンソール画面には次のように Quickstart MQTT ブローカーにパブリッシュした際の deviceId が表示されます。そして Ctrl+C などで終了するまでの間は(デフォルト状態であれば)60秒おきに estado をチェックしてその結果をパブリッシュする、という処理を繰り返します:
2015092101
 

IBM Bluemix のユーザーであれば、この結果を簡単に取り出すことができます。Bluemix 上に IoT Foundation Starter または NodeRED Starter のボイラープレートアプリケーションを作成して NodeRED 環境にアクセスします。そして IBM IoT Input ノードを1つ作成して、ノードの属性値に "Quickstart" と DeviceId に上述の Java アプリ実行時にコンソールに表示された deviceId 値と同じものを指定します:
2015092102


そしてとりあえず debug ノードを追加して IBM IoT ノードと線で結んでデプロイするだけで、このノードに60秒おきに停止している Bluemix サービスの情報が送られてくるようになります。
2015092003


このサンプルでは以下のような JSON フォーマットでメッセージを取得します。後はこのメッセージを Node-RED 側でハンドリングしてメールで通知したり、データベースに格納する、といった処理を実装することになります:
{
 "datetime": "2015/09/19T22:53:00+09:00",
 "error_services": [
  "account-usage", "meteringbackgroundprocess", "meteringdatabase", "runtime-usage-calculator", "dotnet", "alchemy_api [Free]", ・・・(止まっているサービス名の配列)・・・
 ] 
}



というわけで、後はこの Java アプリをどこかのクラウド上か、または(24時間電源ONが保証できる)ローカル環境で稼働させておけば、MQTT にステータスを送り続けてくれる環境が出来上がるので、このような Node-RED アプリを作って動かすことも可能になる、ということになります。

MQTT はデバイスを意識して作成された軽量なプロトコルですが、必ずしもデバイスでしか使えない、というものでもありません。今回のようなサーバーの死活情報を扱ってはいけない、というものではないし、現に某有名ソーシャルサイトのメッセージング機能で使われていたりもします。何をパブリッシュして、どこからサブスクライブするか、そしてサブスクライブした情報をどう扱うか(Node-RED はここが簡単にできる、というサービスです)、が重要だと思っています。みなさんも色んなアイデアを是非形にしてみてください。同様のアルゴリズムが実装できれば Java 以外の言語でも実現できると思っていますので、興味のある方は是非移植にも挑戦してみてください。



 

このページのトップヘ