まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

MQTT は IoT の仕組みの中で使われることの多いプロトコルですが、以前からそれだけに使うのは勿体ないなあと思っていました。MQTT のリアルタイム性はチャットなどのメッセージングアプリケーションにも向いていると思っており、実際に Facebook Messenger の仕組みとしても使われているとの情報もあります。というわけで、MQTT を使ったチャットアプリの開発に挑戦してみました。

加えて、IoT といえば IBM Bluemix からも提供されている Node-RED が有名です。今回は IBM Bluemix 環境上の Node-REDIBM IoT Platform サービスの quickstart エディションを使ってチャットを作ってみることに挑戦しました。

何はともあれ、まずは Node-RED 環境を用意します。IBM Bluemix にログインし、ボイラープレートから Node-RED Starter を選択して、Node-RED が使えるアプリケーションサーバーインスタンスを用意します:
2017040301


IBM Bluemix の Node-RED を使わずに、自前等で Node-RED 環境を用意する場合は npm などで node-red-contrib-scx-ibmiotapp ノードをインストールして有効にしておく必要があります:
2017040302

(↑この ibmiot ノードが使える状態にしてください)


では Node-RED でチャットアプリを作ります、といっても実はかなりシンプルです。1つ1つノードを配置してもいいのですが、まずはインポートして中身を確認し、必要に応じて説明を加えることにします。画面右上のハンバーガーメニューを開き、 読み込み→クリックボード を選択します:
2017040303

 
「フローの読み込み」画面になったら、ここの内容をそっくりそのままコピー&ペーストして「読み込み」ボタンをクリックし、フロー定義を作成します:
2017040304


正しく読み込みが完了すると以下のような3本のデータフローが再現されるはずです。上から1つ目は GET /chat 実行時のチャット画面(HTML)の定義、2つ目はチャットメッセージを POST(POST /post) した時の処理、そして3つ目は IBM IoT サービスを使って MQTT 経由でチャット参加メンバーのメッセージを取り出す処理を定義しています。いずれもシンプルな処理で実現できていることが確認できます:
2017040305


画面内に2つの IBM IoT ノードが含まれています(青いノード、INPUT/OUTPUT が1つずつ)。それぞれダブルクリックすると、どちらにも Device Id を入力する項目があり、いずれも初期状態では空になっているはずです。この Device Id にはユニークな値を指定する必要があります。以下の例では "dotnsf.mqtt.chat.001" という値を設定していますが、ここには自分の名前や日付を含めるなどして、誰とも被らないユニークな値を設定します(2つのノード両方の Device Id に同じ値を指定します)。指定後「完了」ボタンをクリックします:
2017040306


また、2つ目のフローの中にある function ノードの中身を確認します。ここはチャット参加者が自分のメッセージを投稿した時に実行されるフローで、HTTP リクエストの本文(msg.req.body)の値を取り出して、その値を IoT の Payload に変換している部分です。これも非常にシンプルな処理内容が記述されていることが確認できます:
2017040307


改めて3つそれぞれのフローの中でどのような処理が定義されているのかを確認してみましょう。1つ目のフローはウェブブラウザから(サーバー名)/chat という URL に(GET リクエストで)アクセスした時に返される HTML の定義です。実際の HTML や CSS/JavaScript 定義そのものは「チャット画面」というテンプレートノードの中で定義されています(後述します):
2017040301


2つ目のフローはチャット画面の中で利用者が自分のメッセージをチャットに流した時に実行される処理です。チャットにメッセージを流すと(サーバー名)/post という URL に名前やメッセージ内容が HTTP POST され、その内容を(上記のように)取り出して MQTT の Payload に変換し、IBM IoT に転送(MQTT の処理でいうと「パブリッシュ」)しています。転送時にユニークな Device Id を指定していることで、同じテンプレートを使っても異なるアプリケーションであるとみなし、他の人が作ったチャットと混線しないようにしています。なお、緑色のノードはデバッグノードで、POST されたメッセージの内容をこの画面内からも確認できるようにしているだけで、実際の処理には無関係です(無くても動きます):
2017040302


そして3つ目のフローは上記2つ目のフローで処理されたメッセージを取り出すフローになります。自分だけでなく、同じチャット画面を見ている他のユーザーがメッセージを流した場合もこの処理が実行され、IBM IoT 経由で送られたメッセージが(サーバー名)/ws/chat という URL の WebSocket に送信されるよう記述されています。実際には1つ目の HTML の中で /ws/chat を監視しており、ここにメッセージが送られてきた場合の処理が記述されています:
2017040303


この状態でデプロイ(画面右上のボタン)をクリックすることで実際にチャットアプリケーションを使うことができるようになります。デプロイ後、PCやスマホのウェブブラウザで https://(Node-RED の動いているホスト名)/chat にアクセスしてみてください。Node-RED の一番上のフローが呼ばれ、テンプレートノードの中で定義された内容の HTML が表示されます。初期状態では↓のように名前の入力を求められます:
2017040301


適当な名前を入力して「入室」ボタンをクリックします(入室のタイミングで IoT と接続します):
2017040302


入室すると画面が切り替わり、自分の名前とメッセージ入力フィールドが画面下に表示されます。画面の大半はチャット履歴が表示される画面ですが、まだ何もメッセージがないので何も表示されていません:
2017040303


では試しに何かメッセージを入力してみます。入力を確定するには PC からであれば Enter キーを、スマホであれば「開く」などでメッセージを確定させてください:
2017040304


入力したメッセージがチャット履歴に表示されます。これは自分のメッセージなので右側に吹き出しがついて、緑色で表示されるようにしています:
2017040305


もう1つメッセージを送ると、メッセージが下に追加されます:
2017040306


試しに別のブラウザや別のスマホなどから同じ URL にアクセスして、別の名前で入室してメッセージを送信してみます。このユーザーから見ると入室前のメッセージは見れないので、自分のメッセージが一番最初に表示されます:
2017040307


が、元のユーザーからは別のユーザーが入室してきてメッセージを送信したことになります。その場合は白背景で、左側に吹き出しがある状態でチャット履歴に記録されます(この UI 見たことありますよね。意識して CSS を作ってます(笑)):
2017040308


同様にして別のユーザーが入室してくると、そのユーザーのメッセージも白背景で左に吹き出しが付く形で表示されていく、というものです。最低限のグループチャットの機能は実現できていると思ってます:
2017040309


さて、ではこのチャット画面の HTML はどうなっているのかを説明します。具体的な内容は PC ブラウザからアクセスして HTML ソース(と埋め込まれた CSS など)を直接参照していただきたいのですが、肝心な部分の JavaScript はこのようになっています(赤字はコメント):
  :

var socket; var wsUrl = 'wss://' + location.hostname + '/ws/chat'; //. WebSocket監視先URL function connect(){ //. 「入室」時に呼ばれる処理 console.log( "connect()" ); socket = new WebSocket(wsUrl); //. WebSocket 接続 socket.onmessage = function(e) { //. WebSocket にメッセージが来たら、以下を実行 var msg = JSON.parse(e.data); //. 送信データ(POST されたデータ)を JSON で取り出し //console.log( msg ); if( msg.id != deviceid ){ //. 自分のメッセージなのか、他人のメッセージなのかを判別 //. 自分以外の発言 var box = "<div class='question_box'><p class='notmymessage'>" + msg.name + "</p><div id='arrow_question'>" + msg.message + "</div></div>"; $('#contents').append( box ); }else{ //. 自分の発言 var box = "<div class='answer_box'><p class='mymessage'>" + msg.name + "</p><div id='arrow_answer'>" + msg.message + "</div></div>" $('#contents').append( box ); } } }

:

2つ目のフローで投稿したメッセージの内容が IBM IoT ノードに(MQTT で)送られていました。自分だけでなく同じチャットルームに入室している全ての人のメッセージがこのように MQTT データとして送信されます。 そしてその内容を3つ目のフローで取得し、/ws/chat というパスに WebSocket データとして送信していました。つまりチャットで誰かがメッセージを送ると、/ws/chat に WebSocket でデータが送られるということになります。そのデータを監視して、自分のメッセージか他人のメッセージかを判別して Dynamic HTML でチャット履歴に追加する、という部分の処理が上記になります。


そしてこれだけでチャットが実現できているということは、(気付いている人もいるかもしれませんが)少なくともここまでの処理に関してはデータベースを一切使わずに実現できていることを意味しています。確かにリアルタイムデータ処理なのでデータを保存する必要はないのですが、実際に保存せずに実現できるというのはなかなか興味深いのではないかと思っています。


Apache HTTP サーバーのリダイレクト機能を使って、ウェブアプリケーションに○ニータイマー的な機能を実装してみました。

※最近は「○ニータイマー」と言われても知らないエンジニアも増えてるんだろうなあ。。
(注 ○ニータイマーとは? 参考 - Wikipedia



【やりたいこと】
普通に動いているアプリケーションに対して、ある日時以降は自動的にエラーページへ転送する


【作るもの】
既存のウェブアプリケーション(/xonytimer/ 以下)に対して、以下のようなファイル構成を追加します:
|- xonytimer/ (目的のアプリケーション)
|   |- index.html
|   |-    :
|
|- error.html (エラーページ)
|- .htaccess (リダイレクト設定用)

/xonytimer/ 以下に目的のウェブアプリケーションが展開されているものとします(現在普通に動いているものとします)。で、ある特定の日時以降になったら /xonytimer/ 以下へのアクセスは全て /error.html に転送するよう .htaccess に設定します。

error.html の内容は適当にこんな感じにしました:
<html>
<head>
<title>Error</title>
</head>
<body>
<h1>○ニータイマー発動。。。</h1>
</body>
</html>

肝心の転送設定は以下のような内容にします。これを .htaccess に追加(または新規作成)します:
RewriteEngine On

RewriteCond %{TIME} >20170403033000
RewriteRule ^xonytimer/(.*) /error.html [R=302,L]
 ↑注 RewriteCond の ">" と "20170403033000" との間にスペースを入れてはいけません

↑の場合、システム時間が 2017年04月03日 03時30分01秒以降となる時間以降に /xonytimer/ 以下にアクセスがあった場合は /error.html に転送する、という内容にしています。日時はあくまでサーバー側のシステム時間なので、日本時間なのかそうでないのかはシステムに依存します。設定されているタイムゾーンに併せて指定する必要がある点に注意してください。

この設定であれば 2017/04/03 03:30:00 までに /xonytimer/ 以下にアクセスした場合は普通に使えます:
2017040301


が、上記時刻以降に /xonytimer/ 以下にアクセスすると転送設定が有効になり、/error.html に強制転送されます(/xonytimer/ 以下へはアクセスできなくなります):
2017040302


簡単なリライトルールだけで設定できました。


【こんなもん何に使うのか?】

実際に使うのは、この逆のケースが多いと思っています。例えば .htaccess に記述する内容を以下のようにします:
RewriteEngine On

RewriteCond %{TIME} <20170410000000
RewriteRule ^newapplication/(.*) /underconstruction.html [R=302,L]
  ↑RewriteCond の不等号が逆向きになっている点に注意


この指定であれば、2017/04/10 00:00:00 までの間に /newapplication/ 以下にアクセスがあった場合、そのリクエストは /underconstruction.html (準備中、みたいなページを想定)に転送され、2017/04/10 00:00:00 を過ぎると /newapplication/ 以下はアクセス可能になります。

新しいアプリケーションや(申し込みサイトなど)期日を決めた一時アプリケーションを運用する場合に、URL だけは事前にお知らせてしておくとして、実際の運用は指定期日になったらアクセスを許すがそれまでは URL を知っていてもアクセスさせない、という場合に便利な転送設定となります。


先日、IBM の NoSQL DBaaS である Cloudant のデータベース単位でのバックアップ/リストアを行うツールを作って公開しました:

本エントリでは、そもそも何故このようなツールを作ったのか、実現においてどのような制約事項があり、どのように制約を回避したか、その背景を技術者視点で紹介します。


【そもそもそんなツールが必要なのか?】
「必要なのか?」と言われると、結構鋭い視点だと思います。Cloudant はいわゆる「マルチマスターレプリケーション」型の DBaaS であり、データは複数のサーバーインスタンスに分散して格納されます。つまり「どこか1つのサーバーが死んだ場合でもデータがどこかに残っている」ことはこの仕組自体の中で提供されています。これで充分、と考える人や用途であれば不要という判断もアリです。

とはいえ、データベースを手軽に丸ごと引越したり、(精神衛生上の問題から)手元にバックアップを残しておきたいと考えたり、そのバックアップファイルをストレージに保存してストレージ代を稼ぎたいクラウドベンダー/インテグレーター側の事情(苦笑)などからバックアップとリストアを行う需要はあると思っています。


【ツールがないとバックアップ/リストアはできないのか?】
これも難しい質問です。まず Cloudant はいわゆる CRUD 操作に REST API が用意されており、基本的にはこの REST API を使って読み書き更新削除検索・・といった操作を行うことになります。この API のリファレンスはこちらを参照ください:
https://console.ng.bluemix.net/docs/services/Cloudant/api/index.html#api-reference

このリファレンスを読むと分かるのですが、データベースに対して「データや添付ファイルも含めた全ドキュメントデータを取得する API」も「複数のドキュメントデータをまとめて書き込む API」も、どちらも存在しています。以下の記述では前者を「データ取得 API」、後者を「データ書込 API」と呼ぶことにします。

これら2つの API を組み合わせればバックアップとリストアは実現できそうに見えます。が、実はいくつかの問題があり(一部は後述します)、その中でも最も大きなものは「データ取得 API で得られるアウトプットデータと、データ書き込み API でインプットするデータのフォーマットが異なる」という点です。より正確に表現すると「データ取得 API で得られるアウトプットデータを、そのままデータ書込み API のインプットデータとして使った場合、書込み処理自体は成功し、全てのデータがインポートされるが、書き込まれた個々のドキュメントデータの JSON データとしてのフォーマット(データ階層)が元のデータのフォーマットとは異なる」という結果が待っているのでした。つまりバックアップ元とリストア先のデータベースの中身(データの JSON のフォーマット)が同じではなくなってしまうのでした。


シンプルに表現すると、バックアップ元のデータベースに以下のようなデータが入っていたとすると、、
{ "_id": "001", "name1": 123, "name2": "value1", "name3": true },
{ "_id": "002", "name1": 456, "name2": "value2", "name3": false },
{ "_id": "003", "name1": 789, "name2": "value3", "name3": true },
  :

上記 API でバックアップ&リストアした先のデータは以下のようになります:
{ "_id": "001", "doc": { "_id": "001", "name1": 123, "name2": "value1", "name3": true } },
{ "_id": "002", "doc": { "_id": "002", "name1": 456, "name2": "value2", "name3": false } },
{ "_id": "003", "doc": { "_id": "003", "name1": 789, "name2": "value3", "name3": true } },
  :

バックアップ元では、例えば "name1" という属性の値を参照しようとすると、(document).name1 でアクセスできた内容が、リストア先だと (document).doc.name1 という方法でアクセスすることになる、という違いが生じます。


ここは考えようで、「元のデータが残っているかどうか」という観点では残っているといえます。ただ JSON データとして見たときの階層情報が異なっていて、全く同じデータではない(特定の属性の値を取得する場合の取得方法は異なる)のです。これを許容できるかどうかの問題とも言えますが、一般的にバックアップ/リストアという言葉からイメージするのはデータフォーマットの変更が許されるものではないと思っています。その意味では上記 API をそのまま使って実現する方法は選択できなくなります。

また細かいことですが、データ書込み API を実行するには事前にデータベースを新規に作成しておく必要があります。わざわざ「データベース作成+文書リストア」という2度手間をかけるのも不便だし、そういった利便性まで含めて考えると1発で実現できるなんらかのツールがあるべきだし必要、という判断になります。


【ツールでの実現方法】
上記の理由により、リストア先でもデータフォーマットを変えずに復元できるよう、API をそのまま使う方法ではなく、処理の途中にフォーマット変換を行うようなバックアップ/リストア処理を行うツールを作ります。

この時点で考えられる実装アルゴリズムには以下のようなものが挙げられます:
(1) バックアップ時には上記のデータ取得 API をそのまま使い、その結果をバックアップファイルとする。リストア時にはバックアップファイルからデータを1つずつ取り出し、リストアしたい部分だけを取り出して1件ずつ書き込む。
(2) バックアップ時には上記のデータ取得 API をそのまま使い、その結果をバックアップファイルとする。リストア時には、まず上記のデータ書込み API で一括書込みした時にデータフォーマットが乱れないような形にバックアップファイルを加工する。その上でデータ書込み API を実行して一括書き込みする
(3) バックアップ時にはまず上記のデータ取得 API をそのまま使い、その結果をデータ書込み API 用に加工した上で保存し、バックアップファイルとする。リストア時にはデータ書き込み API をそのまま実行して一括書き込みする
  :


いずれも理論的には実現できそうなのですが、ここで API とは別の制約が問題になります。上記の (1) の方法で(データを1件ずつ)リストアすることは可能なのですが、その際に「Cloudant のスタンダードプランでは 1 秒間に 10 件しか書込みリクエストを処理できない」という制約事項を意識する必要があります。例えばバックアップファイルに 11 件以上のドキュメントが存在していた場合、データを書き込む API は1秒間に 10 回しかコールできません(どこまで厳密にこの制約が管理されているのかはわかりませんが、私自身がリクエスト上限によるエラー "You've exceeded your current limit of 10 requests per second for write class. Please try later." の発生を確認しています)。このエラーを回避しようとすると、プログラム内で実行タイミングを測り、10回書込みを実行したら、次の書込みリクエストは1秒以上待ってから実行する、という本質的ではない管理が必要になります(そしてこれは Node.js のような非同期処理を基本とする実行環境では非常に面倒な問題になります)。また仮にそこまで作り込んだとしても、多くのドキュメントを持つデータベースのリストアには非常に多くの時間がかかり、その多くは待ち時間となる可能性が高くなります。バックアップ取得にかかる時間に対し、リストアにあまりに多くの時間が取られる、という非生産的な制約ができてしまうので、上記の (1) の方法は断念することにしました。また上記の (1) ~ (3) 以外にも1件ずつ取り出して、1件ずつ書き込んで・・・という方法も考えられますが、同じ理由で非現実な方法となってしまうだけでなく、API の実行回数に比例するコスト上昇という問題も抱えてしまいます。

というわけで、(2) か (3) の方法での(一回の API 実行でまとめて書き込むような方法での)実装が候補として考えられるのですが、今回自分は (3) の方法を取ることにしたのでした。つまりバックアップ時にデータ取得 API を使って一括取得したデータをそのまま保存するのではなく、リストア用(データ書込み API 用)に加工してからバックアップファイルとして保存する、という方法です。そしてリストア時にはまずリストア先のデータベースを作成(既存の場合は削除した上で作成)した上で、取得したバックアップファイルをそのままデータ書込み API に渡して一括書込みを行うことにします。
バックアップツールの処理
  • データ取得 API を使って、バックアップ元データベースから全文書を一括取得
  • 取得した結果をデータ書込み API で一括書込みできるようなフォーマットに変換してから保存
リストアツールの処理
  • リストア先に指定されたデータベースが既に存在していたら削除する
  • リストア先データベースを作成
  • データ書込み API でバックアップファイル内の全文書データを一括書込み

なお、この方法であれば Node.js のような非同期処理の実行環境でも(一括で書込みを行うだけなので)あまり意識せずに実装することができます。

また、この (3) の方法であれば (2) の方法に比べてリストア時にはこのツールを使わなくても、バックアップファイルを指定して(curl などで)データ書込み API を直接実行してもリストアできる、という副産物的なアドバンテージもあります。


と、まあ色々を事情や背景を書き並べましたが、要は「自分はバックアップを手元に置きたい派」で、「自分で手軽にバックアップを取りたくて作った」のです。で、色々調べているうちに Cloudant API の制約事項の勉強にもなったし、一応動くものもできたし、いい勉強にもなりました。


このページのトップヘ