まだプログラマーですが何か?

プログラマーネタとアスリートネタ中心。たまに作成したウェブサービス関連の話も http://twitter.com/dotnsf

タグ:couchbase

こちらで書いた記事の続きです:
Twitter4J で Twetter の Stream API を使う

上記記事では Twitter 上のツイートをリアルタイムに取得する、という Stream API が一般公開され、Java から利用する場合のサンプルを紹介しました。


折角リアルタイムに取得した情報も、単に表示して終わり、ではつまらないです。後で再利用できるよう、データベースに格納してみます。ただどういったフィルタリングをかけるか(かけないのか)にもよりますが、リアルタイムに取得するツイート情報は、かなり膨大になる可能性があります。つまりそれなりのパフォーマンスを持ったデータストアを用意する必要がある、ということです。

という背景がある中で、今回は Couchbase Server をデータストアとして使ってみます。プログラムとしては先日紹介したこの記事のものをベースとして、取得したツイートを次々に Couchbase Server に格納していく、というロジックに変更します。なお CentOS に Couchbase サーバーを導入する手順についてはこちらの記事を参照してください:
CentOS に Couchbase サーバーを導入する

次にロジック変更のための準備として、こちらの記事を参考に Couchbase の Java SDK をダウンロードして、プロジェクトに追加します:
Java から Couchbase Server にアクセスする

次に、Couchbase Server には JSON データを格納することになるので、Java のオブジェクトを JSON 化するために Gson ライブラリをプロジェクトに追加します。以下のサイトから Gson ライブラリをダウンロード&展開し、gson-2.2.jar をプロジェクトに追加します:
https://google-gson.googlecode.com/files/google-gson-2.2.4-release.zip

更に、取得したツイートを Java オブジェクトとして格納するためのクラスを定義します。ここで定義したクラスのインスタンスとしてツイートを生成し、これを GSON で JSON に変換して Couchbase Server に格納する、という流れになります:
import java.util.Date;

public class Tweet {
	private long id;
	private long userid;
	private String username;
	private String text;
	private Date created;
	private Double lat;
	private Double lng;
	private String[] urls;
	private String[] medias;

	public Tweet( long id, long userid, String username, String text, Date created, Double lat, Double lng, String[] urls, String[] medias ){
		this.id = id;
		this.userid = userid;
		this.username = username;
		this.text = text;
		this.created = created;
		this.lat = lat;
		this.lng = lng;
		this.urls = urls;
		this.medias = medias;
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public long getUserid() {
		return userid;
	}

	public void setUserid(long userid) {
		this.userid = userid;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getText() {
		return text;
	}

	public void setText(String text) {
		this.text = text;
	}

	public Date getCreated() {
		return created;
	}

	public void setCreated(Date created) {
		this.created = created;
	}

	public Double getLat() {
		return lat;
	}

	public void setLat(Double lat) {
		this.lat = lat;
	}

	public Double getLng() {
		return lng;
	}

	public void setLng(Double lng) {
		this.lng = lng;
	}

	public String[] getUrls() {
		return urls;
	}

	public void setUrls(String[] urls) {
		this.urls = urls;
	}

	public String[] getMedias() {
		return medias;
	}

	public void setMedias(String[] medias) {
		this.medias = medias;
	}	
}

そして、先日の記事で作成した Stream クラスを以下のように書き換えます(変更箇所を青くしています):
import twitter4j.MediaEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.URLEntity;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

public class Streams {

    private static final String CONSUMER_KEY = "(CONSUMER KEY)";
    private static final String CONSUMER_SECRET = "(CONSUMER SECRET)";
    private static final String ACCESS_TOKEN = "(ACCESS TOKEN)";
    private static final String ACCESS_TOKEN_SECRET = "(ACCESS TOKEN SECRET)";

    private static final String COUCHBASE_SERVER = "192.168.X.X"; //. CouchBase Server ホスト

    public static CouchbaseClient conn = null;
    public static Gson gson = null;

    static class MyStatusListener implements StatusListener {

        public void onStatus(Status status) {
            Double lat = null;
            Double lng = null;
            String[] urls = null;
            String[] medias = null;

            //. 位置情報が含まれていれば取得する        	
            GeoLocation location = status.getGeoLocation();
            if( location != null ){
                double dlat = location.getLatitude();
                double dlng = location.getLongitude();
                lat = dlat;
                lng = dlng;
            }
            long id = status.getId(); //. ツイートID
            String text = status.getText(); //. ツイート本文
            long userid = status.getUser().getId(); //. ユーザーID
            String username = status.getUser().getScreenName(); //. ユーザー表示名
            Date created = status.getCreatedAt(); //. ツイート日時
            
            //. ツイート本文にリンクURLが含まれていれば取り出す
            URLEntity[] uentitys = status.getURLEntities();
            if( uentitys != null && uentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < uentitys.length; i ++ ){
                    URLEntity uentity = uentitys[i];
                    String expandedURL = uentity.getExpandedURL();
                    list.add( expandedURL );
                }
	        urls = ( String[] )list.toArray( new String[0] );
            }
            
            //. ツイート本文に画像/動画URLが含まれていれば取り出す
            MediaEntity[] mentitys = status.getMediaEntities();
            if( mentitys != null && mentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < mentitys.length; i ++ ){
                    MediaEntity mentity = mentitys[i];
                    String expandedURL = mentity.getExpandedURL();
                    list.add( expandedURL );
                }
                medias = ( String[] )list.toArray( new String[0] );
            }

            //. 取り出した情報を JSON にして Couchbase に格納する
            Tweet tweet = new Tweet( id, userid, username, text, created, lat, lng, urls, medias );
            conn.set( "" + id, gson.toJson( tweet ) );            
        }

        public void onDeletionNotice(StatusDeletionNotice sdn) {
            //System.out.println("onDeletionNotice.");
        }

        public void onTrackLimitationNotice(int i) {
            //System.out.println("onTrackLimitationNotice.(" + i + ")");
        }

        public void onScrubGeo(long lat, long lng) {
            //System.out.println("onScrubGeo.(" + lat + ", " + lng + ")");
        }

        public void onException(Exception excptn) {
            //System.out.println("onException.");
        }

        public void onStallWarning(StallWarning arg0) {
            // TODO Auto-generated method stub
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY)
                .setOAuthConsumerSecret(CONSUMER_SECRET)
                .setOAuthAccessToken(ACCESS_TOKEN)
                .setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET)
                .build();

        //. Couchbase Server に接続用
        try{
            List hosts = Arrays.asList( new URI( "http://" + COUCHBASE_SERVER + ":8091/pools" ) );
            conn = new CouchbaseClient( hosts, "default", "" ); //. "Too many open files"
            gson = new Gson();
        }catch( Exception e ){
            e.printStackTrace();
        }

        TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance();
        twStream.addListener(new MyStatusListener());
        
        twStream.sample();
    }
}

前回のコードと比べて、あまり違いはありません。Couchbase SDK を使って Couchbase サーバーに接続するための準備を main メソッド内で行い、onStatus メソッド内では実際に取得したツイートの内容を Java オブジェクト化したものを GSON を使って JSON 化して、後はひたすら ID をキー値として格納する、ということを実行しています。 なお上記コードではフィルタリングを定義していませんが、必要であれば適当なフィルタリング処理を加えていただくのもいいと思います。

このコードを実行すると(フィルタリングの有無や種類にもよりますが)条件に該当するツイートを次々に取得して、Couchbase サーバー内に格納していきます。その様子は Couchbase の管理コンソールからも確認できます:
2014102101

必要に応じて View を定義するなどして、この取得結果を見やすくしたり、別のプログラムから利用できるようにしていく予定です。そちらはまた別途。







 

IBM Bluemix の "Cloundant" サービスを試しに使ってみました。


まず Cloudant について説明します。Cloudant は JSON ドキュメントデータベースである Apache CouchDB をベースとする DBaaS サービスを提供しているマサチューセッツ州の企業です(2014年3月にIBM による買収が発表されました)。

このサービスが Could Foundry ベースの PaaS である IBM Bluemix のサービスの1つとして提供されている、ということになります。利用する側の観点で言えば「IBM Bluemix 内で使える Apache CouchDB」ということになります。価格等は後述します。


では実際に IBM Bluemix から Cloudant サービスを使う手順を紹介します。IBM Bluemix にログインし、ダッシュボードなどの画面から "ADD SERVICE" をクリックします:
2014081601


サービス一覧の "Data Management" カテゴリの中に "Cloudant NoSQL DB" を見つけることができます。これをクリックします:
2014081602


"Cloundant NoSQL DB" の説明が表示されます。この時点で特定のアプリケーションサーバーに紐付けるのであれば APP からアプリケーションを選択します(試しに使うだけなら特定のアプリケーションへの紐付けは不要です)。そして "CREATE" ボタンをクリックすると Cloudant サービスが生成されます:
2014081603


ちなみに、上記画面の下の方までスクロールするとサービス価格についての説明があります。データ1GBあたり 105円(1ドル?)ですが、2GB までは無料枠内で使えるようです。加えて API の実行回数についても無料枠と有料枠が設定されているようです。今回は無料枠内でしか使わないつもりです:
2014081604


Cloudant サービスが生成されるとダッシュボード内にこのようなパネルが追加されているはずです。この "Show Credentials" と書かれた箇所をクリックすると、Cloudant サーバーへアクセスするための Credentials 情報が表示されます(下図参照):
2014081605


Credentials 情報を確認している画面です。username や password などもありますが、とりあえずすぐ使うので "url" をメモしておきましょう。そしてサービス名(下図だと "Cloudant NoSQL DB-9c")が書かれた箇所をクリックしてサーバーの情報を表示します:
2014081606


Cloudant サービスについても説明が表示されている画面です。この右上に "LAUNCH" と書かれたボタンがあり、ここをクリックしてサーバーコンソールに接続します:
2014081607


別ウィンドウが起動して、Clondant のサーバーコンソールが表示されている画面です。ここでデータベースや複製、アカウントなどの情報を参照したり、実際にドキュメントを読み書きできます。画面ではデータベースタブが選択されていますが、まだデータベースを作成していないので "Add database" ボタンをクリックして最初のデータベースを作成しておきます:
2014081608


データベース名を指定します。ここでは "dotnsfdb" と指定しています:
2014081608a


"dotnsfdb" が作成されたので、Database タブ内の画面が変わりました(まだデータはありません)。画面内の "Create your first document" ボタンをクリックして最初のドキュメントを作成しておきます:
2014081609


中身はこんな感じにしました。 "value" キーに "こんにちは、Cloudant" という日本語の値を追加して、"Save" ボタンをクリックします:
2014081611


保存されました。"Return _all_docs" ボタンで一覧画面に戻ります:
2014081612


dotnsfdb の一覧画面に戻りました。先程作成したドキュメントが表示されています。"_id" の値(この例では"c82b7fb996a83c0bdaead3f75bd44a34")を確認しておきます:
2014081613


このドキュメントを API 経由で呼び出してみます。Credentials 情報から取得した URL を使って、以下の URL にアクセスします:
 (https で始まる "url" の値)/(作成したデータベース名)/(_id の値)

"url" の値にはユーザー名とパスワードも含まれているはずなので、後はデータベース名と _id 値を指定して GET すれば、いま作成したドキュメントの値が取得できるはずです:
2014081615


当たり前ですが、ちゃんと CouchDB の API で目的のドキュメントの情報が取得できました。日本語情報も問題なさそうです。





 

Couchbase サーバーのデータをバックアップ&リストアする手順を紹介します。なお、以下で紹介する手順はバケットタイプが Couchbase のバケットに対してのみ可能な手順です。

まずバックアップを実行します。バックアップコマンドは /opt/couchbase/bin/cbbackup です:
# /opt/couchbase/bin/cbbackup http://couchbase.test.com:8091 /tmp/default_backup -u Administrator -p password -b default

上記例では couchbase.test.com という Couchbase サーバーのバケット default に対して、管理者名 Administrator 、管理者パスワード password でバックアップを実行し、その結果を /tmp/default_backup/ 以下に出力しています。

実行後、/tmp/default_backup/ 以下にはバックアップ結果のダンプファイルができています。ダンプファイルはクラスタ毎に作成されるので、クラスタ数と同じ数のダンプファイルが出力されているはずです:
/tmp/default_backup/bucket-default/node-couchbase.test.com%3A8091/data-0000.cbb

次にリストアを実行します。リストアコマンドは /opt/couchbase/bin/cbrestore です:
# /opt/couchbase/bin/cbrestore http://Administrator:password@couchbase.test.com:8091 --bucket-source=default --bucket-destination=test

このコマンドでは "default" というバケットから取得したバックアップダンプを(同じく) "test" というバケットに対してリストアする、という処理を指示しています。


Couchbase サーバーの特長の1つに「memcached互換」があります。

(ディスクではなく)メモリを一時キャッシュとして利用する memcached サーバーは比較的遅いリレーショナルデータベースのフロントエンドプロセッサーとして配置することで、部分的/仮想的に高速なデータベースサーバーを実現できます。ただし、あくまでメモリ上にのみ存在させて、ディスクへの読み書きを最小限に減らすことで高速化を実現しているため、そのままサーバーを落とすと中身が無くなり、再度メモリ上に展開して初めて再度利用することができるようになります。

Couchbase はこの memcached と互換性があります。memcached そのもののようにディスクへの退避のないデータストアとして利用することもできますし、データを非同期にディスクへ退避させつつメモリ中心のアクセスで高速化をはかる、という利用方法も選択できます(バケット単位で種類を指定します)。ちなみにデフォルト設定では後者になります。 ただいずれのモードでも単に memcached の挙動を真似ているというだけでなく、互換性があります。

具体的には、memcached は 11211 番ポートに TCP/IP で接続してデータを読み書きすることができますが、全く同じ TCP/IP リクエストで Couchbase サーバーのデータも読み書きできます。

実際に試してみましょう。Couchbase サーバーの環境構築についてはこちらを参照して、予め Couchbase サーバーが稼働している状態を用意しておきます:
CentOS に CouchBase サーバーを導入する

また、この手順では telnet を利用するので、telnet クライアントを用意しておきます。CentOS であれば以下のコマンドでインストールできます:
# yum install telnet

では実際に telnet で Couchbase サーバーに接続します。以下のコマンドの "couchbase.test.com" 部分を Couchbase サーバーのホスト名か IP アドレスに変更して実行してください:
# telnet couchbase.test.com 11211
Trying couchbase.test.com...
Connected to couchbase.test.com (127.0.0.1).
Escape character is '^]'.

上記のように "Escape character is '^]'. " と表示されれば接続できています。あっさり。

試しにこのプロトコルでデータを作成してみます。以下の様なコマンドを実行して、"my-third-document" という ID で、値が "My name is Couchbase." というドキュメントを作成してみます:
set my-third-document 0 0 21 My name is Couchbase.(改行)

memcached では set コマンドでデータを作成します。最初のパラメータは ID、2番目のパラメータは flag ですがここでは0を指定します。3番目のパラメータはデータの有効期限を秒単位で指定しますが、ゼロを指定すると無制限(ずっと残るデータ)になります。4番目のパラメータはデータサイズ、そして最後に実際のデータを入力します。

memcached(Couchbase)内のデータを参照するには get コマンドで ID を指定します:
get my-third-document(改行)
VALUE my-third-document 0 21
My name is Couchbase.
END

"My name is Couchbase." という入力した値が正しく取得できました。Couchbase サーバーに対して、memcached のプロトコルで値の読み書きができることが確認できました。

というわけで、Couchbase は memcached と同じように使えることが分かりました。memcached 用のライブラリの多くがそのまま Couchbase に対しても使えることになるので、これは便利な互換性といえます。

ちなみに、Couchbase サーバーへの telnet 接続を終了するには CTRL + ] を押して、プロンプトで quit を入力です:
(CTRL + ] を入力)
telnet> quit (telnetプロンプトで quit と入力)
# (元のシェルに戻る)







Couchbase サーバーに PHP SDK を使って PHP のプログラムからアクセスする環境の準備手順と、そのサンプルを紹介します。環境は CentOS 6.4 です。

まず Couchbase サーバーが用意できていない場合はサーバーを導入します。導入手順はこちらを参照してください:
CentOS に CouchBase サーバーを導入する


次に Couchbase Server 用の PHP のクライアントライブラリをインストールしますが、ここではいくつかの手順が必要になります。

まずは PHP をインストールします。後で pecl を使うので php-pear も含めてインストールします:
# yum install php php-mbstring php-devel php-pear

また、後で使うので openssl-devel モジュールもこのタイミングでインストールしておきます:
# yum install openssl-devel

次に libcouchbase.so という、Couchbase サーバー用の共用モジュールを導入します:
# wget -O /etc/yum.repos.d/couchbase.repo http://packages.couchbase.com/rpm/couchbase-centos62-x86_64.repo
# yum check-update
# yum install libcouchbase2-libevent libcouchbase-devel

ここまでの作業ができていると Couchbase サーバーの PHP SDK が導入できるようになります:
# pecl install couchbase

最後に PHP の拡張モジュールとして couchbase.so を指定します:
# vi /etc/php.d/json.ini
  :
  :
extension=json.so
extension=couchbase.so ←この行を追加

これで Couchbase 用の PHP SDK が使える状態になりました。以下の様なコードを記述することで PHP から Couchbase サーバーにアクセスすることができます("couchbase.test.com"部分は Couchbase サーバーのホスト名かIPアドレスを指定します):
<?php
$cb = new Couchbase( "couchbase.test.com:8091", "", "", "default" );
$cb->set( "my-second-document", "ハロー、CouchBase." );
var_dump( $cb->get( "my-second-document" ) );
?>

この例では Couchbase オブジェクトを作成し、"my-second-document" という ID のドキュメントを "ハロー、Couchbase" というプレーンテキストで作成し、そのデータを var_dump 関数で表示する、というだけのシンプルな内容です。この PHP プログラムが正しく実行されると以下の様な出力結果になります:
# php test.php
string(22) "ハロー、CouchBase."

Couchbase PHP SDK の公式なドキュメントはこちらを参照ください:
http://docs.couchbase.com/couchbase-sdk-php-1.1/




 

このページのトップヘ