まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

タグ:streams

IBM Bluemix のサービスの1つに "Streaming Analytics" という、耳慣れないサービスが追加されました:


実はこれ、"IBM InfoSphere Streams" と呼ばれているソフトウェア製品の SaaS 版です。
http://www-03.ibm.com/software/products/ja/infosphere-streams


IBM InfoSphere Streams は非常に多くのデータソースからの情報を取り込みつつ、ディスクを使わずにデータを処理して、次のプロセスへつなげる、という『ストリーミング処理』を実現するソリューションです。ディスクへの書き込みを行わずに処理することで、リアルタイム分析を支援するためのソリューションを実現しています。

その InfoSphere Streams が Bluemix のサービスの1つとして気軽に使えるようになっているのでした(自分にはまだ VM 起動の権限がない、らしい↓):
2015072701


ウェブのコンソール画面はこんな感じ。
2015072702


自分の権限で使えるようになるのが楽しみです。


少し前の話になりますが、IBM Bluemix のハッカソンに参加してきました。

限られた時間の中で4~5人のチームをビルドして、アイデアを出し、実現可能性を調べた上で実装する、というハッカソン。今回は PaaS である IBM Bluemix を使うという前提で「旅をハックする」という制作テーマが与えられていました。

Bluemix は PaaS とはいえ、CloudFoundry をベースとしていることからかなりの自由度があり、また Bluemix から提供されている各種サービスを使うことでシステム構築の手間を大幅に減らすことができます。加えて DevOps によるチーム開発環境も提供されています。 実は私自身、このようなチーム制のハッカソンに参加したのは初体験でしたが、DevOps 環境にもすぐに馴染めたこともあり、Bluemix は非常に相性のいい環境であると感じました。

各チームに与えられた時間は8時間プラスアルファ。チーム内で役割分担して開発するとはいえ、必ずしも充分な時間とはいえません。そのため出されたアイデアのどの部分を実装するのか(どの部分は諦めるのか)、という判断も必要になります。


我々のチームが開発したアプリケーションがこちらです(いちおう動きます):
http://tabihack2014teame.mybluemix.net/

独立した環境下で Twetter Streams API を使い、特定エリア(日本)の位置情報をリアルタイムに集めてDBに格納します。その情報を使い、地図上に表示されたエリア内で最近ツイートされたのはどんな情報か、を可視化するウェブアプリケーションです。

上記 URL にアクセスすると、まず位置情報 API によって、現在地が特定され、その周辺の地図が表示されます。同時に地図内の青みがかったエリア内で最近ツイートされた内容が画面右部に表示されます:
2014111702


また画面を下にスクロールすると、そのエリア内の宿泊施設や、周辺のマンホール(笑)の情報が表示される、というものです:
2014111703


地図をドラッグして青部分が移動すると、ツイート/宿泊施設情報/マンホール情報も、そのエリア内や周辺の情報に切り替わります。
2014111704


加えて、画面上部のファイル指定ボタンで位置情報が埋め込まれた画像を指定すると、その画像に含まれた位置まで地図が移動し、そのエリア周辺のツイートや宿泊施設の情報が表示されます。


今回のハッカソンで作れたのはここまでです。ひたすらに位置情報 API を組み合わせて感じ(苦笑)。本当はツイートを検索してもっと絞り込んだり、位置情報に付随した別の情報を表示したり、UIをより見やすくしたり、といったこともやりたかったのですが、優先度の判断で落としました。チームビルドやアイデア出しを含めて8時間で作ったにしてはまあまあの出来だと思っていますし、実行/開発環境やサービスをその場で手配して用意できる Bluemix 環境のすごさを目の当たりにした結果でもあると感じています。


このハッカソンで Twitter Streams API を使っての印象として、やはりこれだけの情報を集めることができるようになったことは、非常に面白いと感じました。地図上で可視化する、というアイデア1つにしてももっと別のアプローチもあるでしょうし、またほとんどが位置とは関係のない情報ばかりである一方で、たまに隠れている「その地域の役立つ情報」を如何に探し出して表示するのか、という新しい課題も浮き彫りになりました。 限られた時間の中での作業だからこそ、見えてくるものもあります。

ハッカソンは終わってしまいましたが、このアプリを今後少し改良してみるつもりです。ハッカソンにはこういう「終わった後の楽しみ」もあっていいですね。








 

特別目新しい情報ではなく今更感のある内容ですが、Twitter の Stream 検索 API を使ってみました。

Steam API はいわば「Twitter そのもの」です。自分のタイムラインに関連した内容だけでなく、世界中の Twitter ユーザーがつぶやいた内容をリアルタイムに(Stream として)取得することを目的として提供されています。さすがに全てのツイートを取得するのはごく一部の限られたユーザーにだけ公開されているようですが、一部の(全ツイートの1%程度と言われています)ツイートを取得する API は誰でも使うことができます。一部といっても全て取得していたら、それこそとんでもない量になりますけど・・・

なおずっと以前から公開されていた Twitter の API と区別する場合、Steam API と(従来の)REST API という名称で区別します。


で、その Twitter Stream API を Java から、Twitter4J を使って取得するコードを作って試しに動かしてみました。以下にその様子を紹介します。

まず、何はともあれ Twitter Application Managementへの Developer 登録と、アプリケーションの登録、そして CONSUMER KEY, CONSUMER SECRET, ACCESS TOKEN, ACCESS TOKEN SECRET といった各種キーの取得が必要です。この辺りは REST API の時とも変わらない作業なので、適当にググるなどして調べてください。

次に最新の Twitter4J をダウンロードして展開します。zip アーカイブのファイル名は twitter4j-[バージョン番号].zip です。展開後の lib ディレクトリに含まれる必要なファイル(最低限動かすためには twitter4j-core-[バージョン番号].jar と twitter4j-stream-[バージョン番号].jar の2つのファイル)を開発プロジェクトに取り込むか、CLASSPATH を通して参照可能にします。
2014102001


事前準備はこれでできました。後は以下の様なコードを記述します。冒頭部分で取得した CONSUMER KEY 等のキーに書き換える必要があります。この例では取得する条件を付けず、世の中の全てのツイート(の約1%)をそのまま取得しています。取得イベントが発生する度に onStatus メソッドが呼ばれ、そこで ID やユーザー名、本文といった各種情報を取り出して、最終的には System.out で標準出力しています(実際のアプリではここで DB に格納するなどという流れになると思います):
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

import twitter4j.FilterQuery;
import twitter4j.GeoLocation;
import twitter4j.MediaEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.URLEntity;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

public class Streams {

    private static final String CONSUMER_KEY = "(CONSUMER KEY)";
    private static final String CONSUMER_SECRET = "(CONSUMER SECRET)";
    private static final String ACCESS_TOKEN = "(ACCESS TOKEN)";
    private static final String ACCESS_TOKEN_SECRET = "(ACCESS TOKEN SECRET)";

    static class MyStatusListener implements StatusListener {

        public void onStatus(Status status) {
            Double lat = null;
            Double lng = null;
            String[] urls = null;
            String[] medias = null;

            //. 位置情報が含まれていれば取得する        	
            GeoLocation location = status.getGeoLocation();
            if( location != null ){
                double dlat = location.getLatitude();
                double dlng = location.getLongitude();
                lat = dlat;
                lng = dlng;
            }
            long id = status.getId(); //. ツイートID
            String text = status.getText(); //. ツイート本文
            long userid = status.getUser().getId(); //. ユーザーID
            String username = status.getUser().getScreenName(); //. ユーザー表示名
            Date created = status.getCreatedAt(); //. ツイート日時
            
            //. ツイート本文にリンクURLが含まれていれば取り出す
            URLEntity[] uentitys = status.getURLEntities();
            if( uentitys != null && uentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < uentitys.length; i ++ ){
                    URLEntity uentity = uentitys[i];
                    String expandedURL = uentity.getExpandedURL();
                    list.add( expandedURL );
                }
	        urls = ( String[] )list.toArray( new String[0] );
            }
            
            //. ツイート本文に画像/動画URLが含まれていれば取り出す
            MediaEntity[] mentitys = status.getMediaEntities();
            if( mentitys != null && mentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < mentitys.length; i ++ ){
                    MediaEntity mentity = mentitys[i];
                    String expandedURL = mentity.getExpandedURL();
                    list.add( expandedURL );
                }
                medias = ( String[] )list.toArray( new String[0] );
            }

            //. 取り出した情報を表示する(以下では id, username, text のみ)
            System.out.println( "id = " + id + ", username = " + username + ", text = " + text );
        }

        public void onDeletionNotice(StatusDeletionNotice sdn) {
            //System.out.println("onDeletionNotice.");
        }

        public void onTrackLimitationNotice(int i) {
            //System.out.println("onTrackLimitationNotice.(" + i + ")");
        }

        public void onScrubGeo(long lat, long lng) {
            //System.out.println("onScrubGeo.(" + lat + ", " + lng + ")");
        }

        public void onException(Exception excptn) {
            //System.out.println("onException.");
        }

        public void onStallWarning(StallWarning arg0) {
            // TODO Auto-generated method stub
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY)
                .setOAuthConsumerSecret(CONSUMER_SECRET)
                .setOAuthAccessToken(ACCESS_TOKEN)
                .setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET)
                .build();

        TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance();
        twStream.addListener(new MyStatusListener());
        
        twStream.sample();
    }
}
このサンプルを実行するとツイッター本体が管理するツイートをリアルタイムに(ここ重要!)取得して、目で追えないほど次々に画面出力されていきます。1つ1つの内容を読むことが無理だと思いますが、この様子を眺めているだけで、いかに日本語ツイートの占める割合が高いかもわかると思います:
2014102002


このコードのままだと確かに全ツイート(のランダムな一部)は取得できますが、特定のキーワードやハッシュタグなど、ある特定の目的で絞り込んだツイートだけを集めたいこともあると思います。その場合は main 関数を以下のように書き換えて、フィルタリングを指定します:
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY)
                .setOAuthConsumerSecret(CONSUMER_SECRET)
                .setOAuthAccessToken(ACCESS_TOKEN)
                .setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET)
                .build();

        TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance();
        twStream.addListener(new MyStatusListener());
        
        //. フィルター
        String[] track = { "wordpress", "#php", "ワードプレス" };
        FilterQuery filter = new FilterQuery();
        filter.track( track );
        twStream.filter( filter );
    }

この例では { "wordpress", "#php", "ワードプレス" } という3つのフィルタを指定しています。これらのキーワードのいずれかが本文に含まれるツイートだけを集めてストリーミングする、という機能に切り替えています。

こちらを実行すると、先ほどのフィルタリングのないケースと比べてかなりゆっくり、少しずつツイートが記録されていく様子がわかると思います:
2014102003


ただこのフィルタリングですが、特に日本語キーワードを正しく取得できる率があまり高くないようです。要は日本語文章の単語分割ロジックが英語の場合と異なることに起因しているのだと思いますが、注意が必要です。ただ日本語のハッシュタグの場合はかなり正しく取得できる、らしいです。


改めて、全レコードではないとはいえ、これだけのリアルタイムビッグデータを誰でも取得できる時代になった、ということだと思います。ビッグデータをリアルタイムに解析する生きたサンプルデータとして、Twitter は最適だと思います。ビッグデータ関連製品もいよいよこれらをどれだけ速く処理して、どれだけ遅れなく格納して、どれだけ最適化して、どれだけ簡易的に扱えるか、という段階になっていくんでしょうかね。

 

このページのトップヘ