こちらで書いた記事の続きです:
Twitter4J で Twetter の Stream API を使う

上記記事では Twitter 上のツイートをリアルタイムに取得する、という Stream API が一般公開され、Java から利用する場合のサンプルを紹介しました。


折角リアルタイムに取得した情報も、単に表示して終わり、ではつまらないです。後で再利用できるよう、データベースに格納してみます。ただどういったフィルタリングをかけるか(かけないのか)にもよりますが、リアルタイムに取得するツイート情報は、かなり膨大になる可能性があります。つまりそれなりのパフォーマンスを持ったデータストアを用意する必要がある、ということです。

という背景がある中で、今回は Couchbase Server をデータストアとして使ってみます。プログラムとしては先日紹介したこの記事のものをベースとして、取得したツイートを次々に Couchbase Server に格納していく、というロジックに変更します。なお CentOS に Couchbase サーバーを導入する手順についてはこちらの記事を参照してください:
CentOS に Couchbase サーバーを導入する

次にロジック変更のための準備として、こちらの記事を参考に Couchbase の Java SDK をダウンロードして、プロジェクトに追加します:
Java から Couchbase Server にアクセスする

次に、Couchbase Server には JSON データを格納することになるので、Java のオブジェクトを JSON 化するために Gson ライブラリをプロジェクトに追加します。以下のサイトから Gson ライブラリをダウンロード&展開し、gson-2.2.jar をプロジェクトに追加します:
https://google-gson.googlecode.com/files/google-gson-2.2.4-release.zip

更に、取得したツイートを Java オブジェクトとして格納するためのクラスを定義します。ここで定義したクラスのインスタンスとしてツイートを生成し、これを GSON で JSON に変換して Couchbase Server に格納する、という流れになります:
import java.util.Date;

public class Tweet {
	private long id;
	private long userid;
	private String username;
	private String text;
	private Date created;
	private Double lat;
	private Double lng;
	private String[] urls;
	private String[] medias;

	public Tweet( long id, long userid, String username, String text, Date created, Double lat, Double lng, String[] urls, String[] medias ){
		this.id = id;
		this.userid = userid;
		this.username = username;
		this.text = text;
		this.created = created;
		this.lat = lat;
		this.lng = lng;
		this.urls = urls;
		this.medias = medias;
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public long getUserid() {
		return userid;
	}

	public void setUserid(long userid) {
		this.userid = userid;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getText() {
		return text;
	}

	public void setText(String text) {
		this.text = text;
	}

	public Date getCreated() {
		return created;
	}

	public void setCreated(Date created) {
		this.created = created;
	}

	public Double getLat() {
		return lat;
	}

	public void setLat(Double lat) {
		this.lat = lat;
	}

	public Double getLng() {
		return lng;
	}

	public void setLng(Double lng) {
		this.lng = lng;
	}

	public String[] getUrls() {
		return urls;
	}

	public void setUrls(String[] urls) {
		this.urls = urls;
	}

	public String[] getMedias() {
		return medias;
	}

	public void setMedias(String[] medias) {
		this.medias = medias;
	}	
}

そして、先日の記事で作成した Stream クラスを以下のように書き換えます(変更箇所を青くしています):
import twitter4j.MediaEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.URLEntity;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

public class Streams {

    private static final String CONSUMER_KEY = "(CONSUMER KEY)";
    private static final String CONSUMER_SECRET = "(CONSUMER SECRET)";
    private static final String ACCESS_TOKEN = "(ACCESS TOKEN)";
    private static final String ACCESS_TOKEN_SECRET = "(ACCESS TOKEN SECRET)";

    private static final String COUCHBASE_SERVER = "192.168.X.X"; //. CouchBase Server ホスト

    public static CouchbaseClient conn = null;
    public static Gson gson = null;

    static class MyStatusListener implements StatusListener {

        public void onStatus(Status status) {
            Double lat = null;
            Double lng = null;
            String[] urls = null;
            String[] medias = null;

            //. 位置情報が含まれていれば取得する        	
            GeoLocation location = status.getGeoLocation();
            if( location != null ){
                double dlat = location.getLatitude();
                double dlng = location.getLongitude();
                lat = dlat;
                lng = dlng;
            }
            long id = status.getId(); //. ツイートID
            String text = status.getText(); //. ツイート本文
            long userid = status.getUser().getId(); //. ユーザーID
            String username = status.getUser().getScreenName(); //. ユーザー表示名
            Date created = status.getCreatedAt(); //. ツイート日時
            
            //. ツイート本文にリンクURLが含まれていれば取り出す
            URLEntity[] uentitys = status.getURLEntities();
            if( uentitys != null && uentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < uentitys.length; i ++ ){
                    URLEntity uentity = uentitys[i];
                    String expandedURL = uentity.getExpandedURL();
                    list.add( expandedURL );
                }
	        urls = ( String[] )list.toArray( new String[0] );
            }
            
            //. ツイート本文に画像/動画URLが含まれていれば取り出す
            MediaEntity[] mentitys = status.getMediaEntities();
            if( mentitys != null && mentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < mentitys.length; i ++ ){
                    MediaEntity mentity = mentitys[i];
                    String expandedURL = mentity.getExpandedURL();
                    list.add( expandedURL );
                }
                medias = ( String[] )list.toArray( new String[0] );
            }

            //. 取り出した情報を JSON にして Couchbase に格納する
            Tweet tweet = new Tweet( id, userid, username, text, created, lat, lng, urls, medias );
            conn.set( "" + id, gson.toJson( tweet ) );            
        }

        public void onDeletionNotice(StatusDeletionNotice sdn) {
            //System.out.println("onDeletionNotice.");
        }

        public void onTrackLimitationNotice(int i) {
            //System.out.println("onTrackLimitationNotice.(" + i + ")");
        }

        public void onScrubGeo(long lat, long lng) {
            //System.out.println("onScrubGeo.(" + lat + ", " + lng + ")");
        }

        public void onException(Exception excptn) {
            //System.out.println("onException.");
        }

        public void onStallWarning(StallWarning arg0) {
            // TODO Auto-generated method stub
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY)
                .setOAuthConsumerSecret(CONSUMER_SECRET)
                .setOAuthAccessToken(ACCESS_TOKEN)
                .setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET)
                .build();

        //. Couchbase Server に接続用
        try{
            List hosts = Arrays.asList( new URI( "http://" + COUCHBASE_SERVER + ":8091/pools" ) );
            conn = new CouchbaseClient( hosts, "default", "" ); //. "Too many open files"
            gson = new Gson();
        }catch( Exception e ){
            e.printStackTrace();
        }

        TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance();
        twStream.addListener(new MyStatusListener());
        
        twStream.sample();
    }
}

前回のコードと比べて、あまり違いはありません。Couchbase SDK を使って Couchbase サーバーに接続するための準備を main メソッド内で行い、onStatus メソッド内では実際に取得したツイートの内容を Java オブジェクト化したものを GSON を使って JSON 化して、後はひたすら ID をキー値として格納する、ということを実行しています。 なお上記コードではフィルタリングを定義していませんが、必要であれば適当なフィルタリング処理を加えていただくのもいいと思います。

このコードを実行すると(フィルタリングの有無や種類にもよりますが)条件に該当するツイートを次々に取得して、Couchbase サーバー内に格納していきます。その様子は Couchbase の管理コンソールからも確認できます:
2014102101

必要に応じて View を定義するなどして、この取得結果を見やすくしたり、別のプログラムから利用できるようにしていく予定です。そちらはまた別途。