特別目新しい情報ではなく今更感のある内容ですが、Twitter の Stream 検索 API を使ってみました。
Steam API はいわば「Twitter そのもの」です。自分のタイムラインに関連した内容だけでなく、世界中の Twitter ユーザーがつぶやいた内容をリアルタイムに(Stream として)取得することを目的として提供されています。さすがに全てのツイートを取得するのはごく一部の限られたユーザーにだけ公開されているようですが、一部の(全ツイートの1%程度と言われています)ツイートを取得する API は誰でも使うことができます。一部といっても全て取得していたら、それこそとんでもない量になりますけど・・・
なおずっと以前から公開されていた Twitter の API と区別する場合、Steam API と(従来の)REST API という名称で区別します。
で、その Twitter Stream API を Java から、Twitter4J を使って取得するコードを作って試しに動かしてみました。以下にその様子を紹介します。
まず、何はともあれ Twitter Application Managementへの Developer 登録と、アプリケーションの登録、そして CONSUMER KEY, CONSUMER SECRET, ACCESS TOKEN, ACCESS TOKEN SECRET といった各種キーの取得が必要です。この辺りは REST API の時とも変わらない作業なので、適当にググるなどして調べてください。
次に最新の Twitter4J をダウンロードして展開します。zip アーカイブのファイル名は twitter4j-[バージョン番号].zip です。展開後の lib ディレクトリに含まれる必要なファイル(最低限動かすためには twitter4j-core-[バージョン番号].jar と twitter4j-stream-[バージョン番号].jar の2つのファイル)を開発プロジェクトに取り込むか、CLASSPATH を通して参照可能にします。

事前準備はこれでできました。後は以下の様なコードを記述します。冒頭部分で取得した CONSUMER KEY 等のキーに書き換える必要があります。この例では取得する条件を付けず、世の中の全てのツイート(の約1%)をそのまま取得しています。取得イベントが発生する度に onStatus メソッドが呼ばれ、そこで ID やユーザー名、本文といった各種情報を取り出して、最終的には System.out で標準出力しています(実際のアプリではここで DB に格納するなどという流れになると思います):

このコードのままだと確かに全ツイート(のランダムな一部)は取得できますが、特定のキーワードやハッシュタグなど、ある特定の目的で絞り込んだツイートだけを集めたいこともあると思います。その場合は main 関数を以下のように書き換えて、フィルタリングを指定します:
この例では { "wordpress", "#php", "ワードプレス" } という3つのフィルタを指定しています。これらのキーワードのいずれかが本文に含まれるツイートだけを集めてストリーミングする、という機能に切り替えています。
こちらを実行すると、先ほどのフィルタリングのないケースと比べてかなりゆっくり、少しずつツイートが記録されていく様子がわかると思います:

ただこのフィルタリングですが、特に日本語キーワードを正しく取得できる率があまり高くないようです。要は日本語文章の単語分割ロジックが英語の場合と異なることに起因しているのだと思いますが、注意が必要です。ただ日本語のハッシュタグの場合はかなり正しく取得できる、らしいです。
改めて、全レコードではないとはいえ、これだけのリアルタイムビッグデータを誰でも取得できる時代になった、ということだと思います。ビッグデータをリアルタイムに解析する生きたサンプルデータとして、Twitter は最適だと思います。ビッグデータ関連製品もいよいよこれらをどれだけ速く処理して、どれだけ遅れなく格納して、どれだけ最適化して、どれだけ簡易的に扱えるか、という段階になっていくんでしょうかね。
Steam API はいわば「Twitter そのもの」です。自分のタイムラインに関連した内容だけでなく、世界中の Twitter ユーザーがつぶやいた内容をリアルタイムに(Stream として)取得することを目的として提供されています。さすがに全てのツイートを取得するのはごく一部の限られたユーザーにだけ公開されているようですが、一部の(全ツイートの1%程度と言われています)ツイートを取得する API は誰でも使うことができます。一部といっても全て取得していたら、それこそとんでもない量になりますけど・・・
なおずっと以前から公開されていた Twitter の API と区別する場合、Steam API と(従来の)REST API という名称で区別します。
で、その Twitter Stream API を Java から、Twitter4J を使って取得するコードを作って試しに動かしてみました。以下にその様子を紹介します。
まず、何はともあれ Twitter Application Managementへの Developer 登録と、アプリケーションの登録、そして CONSUMER KEY, CONSUMER SECRET, ACCESS TOKEN, ACCESS TOKEN SECRET といった各種キーの取得が必要です。この辺りは REST API の時とも変わらない作業なので、適当にググるなどして調べてください。
次に最新の Twitter4J をダウンロードして展開します。zip アーカイブのファイル名は twitter4j-[バージョン番号].zip です。展開後の lib ディレクトリに含まれる必要なファイル(最低限動かすためには twitter4j-core-[バージョン番号].jar と twitter4j-stream-[バージョン番号].jar の2つのファイル)を開発プロジェクトに取り込むか、CLASSPATH を通して参照可能にします。

事前準備はこれでできました。後は以下の様なコードを記述します。冒頭部分で取得した CONSUMER KEY 等のキーに書き換える必要があります。この例では取得する条件を付けず、世の中の全てのツイート(の約1%)をそのまま取得しています。取得イベントが発生する度に onStatus メソッドが呼ばれ、そこで ID やユーザー名、本文といった各種情報を取り出して、最終的には System.out で標準出力しています(実際のアプリではここで DB に格納するなどという流れになると思います):
このサンプルを実行するとツイッター本体が管理するツイートをリアルタイムに(ここ重要!)取得して、目で追えないほど次々に画面出力されていきます。1つ1つの内容を読むことが無理だと思いますが、この様子を眺めているだけで、いかに日本語ツイートの占める割合が高いかもわかると思います:import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import twitter4j.FilterQuery; import twitter4j.GeoLocation; import twitter4j.MediaEntity; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.URLEntity; import twitter4j.conf.Configuration; import twitter4j.conf.ConfigurationBuilder; public class Streams { private static final String CONSUMER_KEY = "(CONSUMER KEY)"; private static final String CONSUMER_SECRET = "(CONSUMER SECRET)"; private static final String ACCESS_TOKEN = "(ACCESS TOKEN)"; private static final String ACCESS_TOKEN_SECRET = "(ACCESS TOKEN SECRET)"; static class MyStatusListener implements StatusListener { public void onStatus(Status status) { Double lat = null; Double lng = null; String[] urls = null; String[] medias = null; //. 位置情報が含まれていれば取得する GeoLocation location = status.getGeoLocation(); if( location != null ){ double dlat = location.getLatitude(); double dlng = location.getLongitude(); lat = dlat; lng = dlng; } long id = status.getId(); //. ツイートID String text = status.getText(); //. ツイート本文 long userid = status.getUser().getId(); //. ユーザーID String username = status.getUser().getScreenName(); //. ユーザー表示名 Date created = status.getCreatedAt(); //. ツイート日時 //. ツイート本文にリンクURLが含まれていれば取り出す URLEntity[] uentitys = status.getURLEntities(); if( uentitys != null && uentitys.length > 0 ){ Listlist = new ArrayList (); for( int i = 0; i < uentitys.length; i ++ ){ URLEntity uentity = uentitys[i]; String expandedURL = uentity.getExpandedURL(); list.add( expandedURL ); } urls = ( String[] )list.toArray( new String[0] ); } //. ツイート本文に画像/動画URLが含まれていれば取り出す MediaEntity[] mentitys = status.getMediaEntities(); if( mentitys != null && mentitys.length > 0 ){ List list = new ArrayList (); for( int i = 0; i < mentitys.length; i ++ ){ MediaEntity mentity = mentitys[i]; String expandedURL = mentity.getExpandedURL(); list.add( expandedURL ); } medias = ( String[] )list.toArray( new String[0] ); } //. 取り出した情報を表示する(以下では id, username, text のみ) System.out.println( "id = " + id + ", username = " + username + ", text = " + text ); } public void onDeletionNotice(StatusDeletionNotice sdn) { //System.out.println("onDeletionNotice."); } public void onTrackLimitationNotice(int i) { //System.out.println("onTrackLimitationNotice.(" + i + ")"); } public void onScrubGeo(long lat, long lng) { //System.out.println("onScrubGeo.(" + lat + ", " + lng + ")"); } public void onException(Exception excptn) { //System.out.println("onException."); } public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY) .setOAuthConsumerSecret(CONSUMER_SECRET) .setOAuthAccessToken(ACCESS_TOKEN) .setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET) .build(); TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance(); twStream.addListener(new MyStatusListener()); twStream.sample(); } }

このコードのままだと確かに全ツイート(のランダムな一部)は取得できますが、特定のキーワードやハッシュタグなど、ある特定の目的で絞り込んだツイートだけを集めたいこともあると思います。その場合は main 関数を以下のように書き換えて、フィルタリングを指定します:
public static void main(String[] args) {
// TODO Auto-generated method stub
Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY)
.setOAuthConsumerSecret(CONSUMER_SECRET)
.setOAuthAccessToken(ACCESS_TOKEN)
.setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET)
.build();
TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance();
twStream.addListener(new MyStatusListener());
//. フィルター
String[] track = { "wordpress", "#php", "ワードプレス" };
FilterQuery filter = new FilterQuery();
filter.track( track );
twStream.filter( filter );
}
この例では { "wordpress", "#php", "ワードプレス" } という3つのフィルタを指定しています。これらのキーワードのいずれかが本文に含まれるツイートだけを集めてストリーミングする、という機能に切り替えています。
こちらを実行すると、先ほどのフィルタリングのないケースと比べてかなりゆっくり、少しずつツイートが記録されていく様子がわかると思います:

ただこのフィルタリングですが、特に日本語キーワードを正しく取得できる率があまり高くないようです。要は日本語文章の単語分割ロジックが英語の場合と異なることに起因しているのだと思いますが、注意が必要です。ただ日本語のハッシュタグの場合はかなり正しく取得できる、らしいです。
改めて、全レコードではないとはいえ、これだけのリアルタイムビッグデータを誰でも取得できる時代になった、ということだと思います。ビッグデータをリアルタイムに解析する生きたサンプルデータとして、Twitter は最適だと思います。ビッグデータ関連製品もいよいよこれらをどれだけ速く処理して、どれだけ遅れなく格納して、どれだけ最適化して、どれだけ簡易的に扱えるか、という段階になっていくんでしょうかね。
コメント