まだプログラマーですが何か?

プログラマーネタ中心。たまに作成したウェブサービス関連の話も https://twitter.com/dotnsf

タグ:couchbase

IBM Cloud から提供されている 30 日間無料 Kubernetes サービスIBM Kubernetes Service 、以下 "IKS")環境を使って利用することのできるコンテナイメージを1日に1個ずつ 30 日間連続で紹介していきます。

環境のセットアップや制約事項については Day0 のこちらの記事を参照してください。

Day 6 からはデータベース系コンテナとその GUI ツールを中心に紹介してます。Day 17 では Day 16 に紹介した CouchDB の弟的な製品でもある CouchBase イメージをデプロイする例を紹介します。
couchbase



【イメージの概要】
CouchDB を作った Damien Katz さんが、CouchDB を新たに再設計して開発した NoSQL 型データベースです。非常に高速に動作するというだけでなく、低遅延かつ高持続スループットを実現しています。



【イメージのデプロイ】
まずはこちらのファイルを自分の PC にダウンロードしてください:
https://raw.githubusercontent.com/dotnsf/yamls_for_iks/main/couchbase.yaml

今回の CouchBase もシングルノード環境で利用する場合は特にパラメータ指定不要で、そのままデプロイすることができます。以下のコマンドを実行する前に Day 0 の内容を参照して ibmcloud CLI ツールで IBM Cloud にログインし、クラスタに接続するまでを済ませておいてください。


そして以下のコマンドを実行します:
$ kubectl apply -f couchbase.yaml

以下のコマンドで CouchBase 関連の Deployment, Service, Pod, Replicaset が1つずつ生成されたことと、サービスが 31210, 30091, 30092, 30093, 30094 番の各ポートで公開されていることを確認します:
$ kubectl get all

NAME                            READY   STATUS    RESTARTS   AGE
pod/couchbase-58b4b8c99-m8tmt   1/1     Running   0          105s

NAME                      TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                       AGE
service/couchbaseserver   NodePort    172.21.76.112   <none>        11210:31210/TCP,8091:30091/TCP,8092:30092/TCP,8093:30093/TCP,8094:30094/TCP   105s
service/kubernetes        ClusterIP   172.21.0.1      <none>        443/TCP                                                                       27d

NAME                        READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/couchbase   1/1     1            1           106s

NAME                                  DESIRED   CURRENT   READY   AGE
replicaset.apps/couchbase-58b4b8c99   1         1         1       106s

この後に実際にサービスを利用するため、以下のコマンドでワーカーノードのパブリック IP アドレスを確認します(以下の例であれば 161.51.204.190):
$ ibmcloud ks worker ls --cluster=mycluster-free
OK
ID                                                       パブリック IP    プライベート IP   フレーバー   状態     状況    ゾーン   バージョン
kube-c3biujbf074rs3rl76t0-myclusterfr-default-000000df   169.51.204.190   10.144.185.144    free         normal   Ready   mil01    1.20.7_1543*

つまりこの時点で(上述の結果であれば)アプリケーションの UI は http://169.51.204.190:30091/ で稼働している、ということになります。CouchDB はこの URL が管理ダッシュボードの URL になっているので早速実行してみます。ウェブブラウザを使って、アプリケーションの URL(上述の方法で確認した URL)にアクセスしてみます:
couchbase1


初回はサーバーセットアップ画面になるので、管理者ユーザーの作成や接続先サーバーの情報を入力していきます:

couchbase2


最終的にデータまでが入力されると、この UI を使って内容や統計情報を確認することができるようになります:
couchbase3



【YAML ファイルの解説】
YAML ファイルはこちらを使っています:
apiVersion: v1
kind: Service
metadata:
  name: couchbaseserver
spec:
  selector:
    app: couchbase
  ports:
  - port: 11210
    name: port0
    protocol: TCP
    targetPort: 11210
    nodePort: 31210
  - port: 8091
    name: port1
    protocol: TCP
    targetPort: 8091
    nodePort: 30091
  - port: 8092
    name: port2
    protocol: TCP
    targetPort: 8092
    nodePort: 30092
  - port: 8093
    name: port3
    protocol: TCP
    targetPort: 8093
    nodePort: 30093
  - port: 8094
    name: port4
    protocol: TCP
    targetPort: 8094
    nodePort: 30094
  type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: couchbase
spec:
  replicas: 1
  selector:
    matchLabels:
      app: couchbase
  template:
    metadata:
      labels:
        app: couchbase
    spec:
      containers:
      - name: couchbase
        image: couchbase
        ports:
        - containerPort: 11210
        - containerPort: 8091
        - containerPort: 8092
        - containerPort: 8093
        - containerPort: 8094

Deployment 1つと、Service 1つのごくごくシンプルな YAML ファイルですが、一応解説を加えておきます。アプリケーションそのものは 11210, 8091, 8092, 8093, 8094 番ポートで動作するように作られているため、NodePort 30080 番を指定して、外部からは 30080 番ポートでアクセスできるようにしています(NodePort として指定可能な番号の範囲は 30000 ~ 32767 です、指定しない場合は空いている番号がランダムに割り振られます)。また ReplicaSet は1つだけで作りました(データベースなので、別途クラスタ構成の準備をしない限りはこの数値だけを増やしてもあまり意味ないと思います)。


デプロイしたコンテナイメージを削除する場合はデプロイ時に使った YAML ファイルを再度使って、以下のコマンドを実行します。不要であれば削除しておきましょう:
$ kubectl delete -f couchbase.yaml


【紹介したイメージ】
https://hub.docker.com/_/couchbase


【紹介記録】
Dayカテゴリーデプロイ内容
0準備準備作業
1ウェブサーバーhostname
2Apache HTTP
3Nginx
4Tomcat
5Websphere Liberty
6データベースMySQL
7phpMyAdmin
8PostgreSQL
9pgAdmin4
10MongoDB
11Mongo-Express
12Redis
13RedisCommander
14ElasticSearch
15Kibana
16CouchDB
17CouchBase
18HATOYA
19プログラミングNode-RED
20Scratch
21Eclipse Orion
22Swagger Editor
23R Studio
24Jenkins
25アプリケーションFX
262048
27DOS Box
28VNC Server(Lubuntu)
29Drupal
30WordPress

こちらで書いた記事の続きです:
Twitter4J で Twetter の Stream API を使う

上記記事では Twitter 上のツイートをリアルタイムに取得する、という Stream API が一般公開され、Java から利用する場合のサンプルを紹介しました。


折角リアルタイムに取得した情報も、単に表示して終わり、ではつまらないです。後で再利用できるよう、データベースに格納してみます。ただどういったフィルタリングをかけるか(かけないのか)にもよりますが、リアルタイムに取得するツイート情報は、かなり膨大になる可能性があります。つまりそれなりのパフォーマンスを持ったデータストアを用意する必要がある、ということです。

という背景がある中で、今回は Couchbase Server をデータストアとして使ってみます。プログラムとしては先日紹介したこの記事のものをベースとして、取得したツイートを次々に Couchbase Server に格納していく、というロジックに変更します。なお CentOS に Couchbase サーバーを導入する手順についてはこちらの記事を参照してください:
CentOS に Couchbase サーバーを導入する

次にロジック変更のための準備として、こちらの記事を参考に Couchbase の Java SDK をダウンロードして、プロジェクトに追加します:
Java から Couchbase Server にアクセスする

次に、Couchbase Server には JSON データを格納することになるので、Java のオブジェクトを JSON 化するために Gson ライブラリをプロジェクトに追加します。以下のサイトから Gson ライブラリをダウンロード&展開し、gson-2.2.jar をプロジェクトに追加します:
https://google-gson.googlecode.com/files/google-gson-2.2.4-release.zip

更に、取得したツイートを Java オブジェクトとして格納するためのクラスを定義します。ここで定義したクラスのインスタンスとしてツイートを生成し、これを GSON で JSON に変換して Couchbase Server に格納する、という流れになります:
import java.util.Date;

public class Tweet {
	private long id;
	private long userid;
	private String username;
	private String text;
	private Date created;
	private Double lat;
	private Double lng;
	private String[] urls;
	private String[] medias;

	public Tweet( long id, long userid, String username, String text, Date created, Double lat, Double lng, String[] urls, String[] medias ){
		this.id = id;
		this.userid = userid;
		this.username = username;
		this.text = text;
		this.created = created;
		this.lat = lat;
		this.lng = lng;
		this.urls = urls;
		this.medias = medias;
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public long getUserid() {
		return userid;
	}

	public void setUserid(long userid) {
		this.userid = userid;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getText() {
		return text;
	}

	public void setText(String text) {
		this.text = text;
	}

	public Date getCreated() {
		return created;
	}

	public void setCreated(Date created) {
		this.created = created;
	}

	public Double getLat() {
		return lat;
	}

	public void setLat(Double lat) {
		this.lat = lat;
	}

	public Double getLng() {
		return lng;
	}

	public void setLng(Double lng) {
		this.lng = lng;
	}

	public String[] getUrls() {
		return urls;
	}

	public void setUrls(String[] urls) {
		this.urls = urls;
	}

	public String[] getMedias() {
		return medias;
	}

	public void setMedias(String[] medias) {
		this.medias = medias;
	}	
}

そして、先日の記事で作成した Stream クラスを以下のように書き換えます(変更箇所を青くしています):
import twitter4j.MediaEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.URLEntity;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

public class Streams {

    private static final String CONSUMER_KEY = "(CONSUMER KEY)";
    private static final String CONSUMER_SECRET = "(CONSUMER SECRET)";
    private static final String ACCESS_TOKEN = "(ACCESS TOKEN)";
    private static final String ACCESS_TOKEN_SECRET = "(ACCESS TOKEN SECRET)";

    private static final String COUCHBASE_SERVER = "192.168.X.X"; //. CouchBase Server ホスト

    public static CouchbaseClient conn = null;
    public static Gson gson = null;

    static class MyStatusListener implements StatusListener {

        public void onStatus(Status status) {
            Double lat = null;
            Double lng = null;
            String[] urls = null;
            String[] medias = null;

            //. 位置情報が含まれていれば取得する        	
            GeoLocation location = status.getGeoLocation();
            if( location != null ){
                double dlat = location.getLatitude();
                double dlng = location.getLongitude();
                lat = dlat;
                lng = dlng;
            }
            long id = status.getId(); //. ツイートID
            String text = status.getText(); //. ツイート本文
            long userid = status.getUser().getId(); //. ユーザーID
            String username = status.getUser().getScreenName(); //. ユーザー表示名
            Date created = status.getCreatedAt(); //. ツイート日時
            
            //. ツイート本文にリンクURLが含まれていれば取り出す
            URLEntity[] uentitys = status.getURLEntities();
            if( uentitys != null && uentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < uentitys.length; i ++ ){
                    URLEntity uentity = uentitys[i];
                    String expandedURL = uentity.getExpandedURL();
                    list.add( expandedURL );
                }
	        urls = ( String[] )list.toArray( new String[0] );
            }
            
            //. ツイート本文に画像/動画URLが含まれていれば取り出す
            MediaEntity[] mentitys = status.getMediaEntities();
            if( mentitys != null && mentitys.length > 0 ){
            	List list = new ArrayList();
                for( int i = 0; i < mentitys.length; i ++ ){
                    MediaEntity mentity = mentitys[i];
                    String expandedURL = mentity.getExpandedURL();
                    list.add( expandedURL );
                }
                medias = ( String[] )list.toArray( new String[0] );
            }

            //. 取り出した情報を JSON にして Couchbase に格納する
            Tweet tweet = new Tweet( id, userid, username, text, created, lat, lng, urls, medias );
            conn.set( "" + id, gson.toJson( tweet ) );            
        }

        public void onDeletionNotice(StatusDeletionNotice sdn) {
            //System.out.println("onDeletionNotice.");
        }

        public void onTrackLimitationNotice(int i) {
            //System.out.println("onTrackLimitationNotice.(" + i + ")");
        }

        public void onScrubGeo(long lat, long lng) {
            //System.out.println("onScrubGeo.(" + lat + ", " + lng + ")");
        }

        public void onException(Exception excptn) {
            //System.out.println("onException.");
        }

        public void onStallWarning(StallWarning arg0) {
            // TODO Auto-generated method stub
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration configuration = new ConfigurationBuilder().setOAuthConsumerKey(CONSUMER_KEY)
                .setOAuthConsumerSecret(CONSUMER_SECRET)
                .setOAuthAccessToken(ACCESS_TOKEN)
                .setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET)
                .build();

        //. Couchbase Server に接続用
        try{
            List hosts = Arrays.asList( new URI( "http://" + COUCHBASE_SERVER + ":8091/pools" ) );
            conn = new CouchbaseClient( hosts, "default", "" ); //. "Too many open files"
            gson = new Gson();
        }catch( Exception e ){
            e.printStackTrace();
        }

        TwitterStream twStream = new TwitterStreamFactory(configuration).getInstance();
        twStream.addListener(new MyStatusListener());
        
        twStream.sample();
    }
}

前回のコードと比べて、あまり違いはありません。Couchbase SDK を使って Couchbase サーバーに接続するための準備を main メソッド内で行い、onStatus メソッド内では実際に取得したツイートの内容を Java オブジェクト化したものを GSON を使って JSON 化して、後はひたすら ID をキー値として格納する、ということを実行しています。 なお上記コードではフィルタリングを定義していませんが、必要であれば適当なフィルタリング処理を加えていただくのもいいと思います。

このコードを実行すると(フィルタリングの有無や種類にもよりますが)条件に該当するツイートを次々に取得して、Couchbase サーバー内に格納していきます。その様子は Couchbase の管理コンソールからも確認できます:
2014102101

必要に応じて View を定義するなどして、この取得結果を見やすくしたり、別のプログラムから利用できるようにしていく予定です。そちらはまた別途。







 

Couchbase サーバーのデータをバックアップ&リストアする手順を紹介します。なお、以下で紹介する手順はバケットタイプが Couchbase のバケットに対してのみ可能な手順です。

まずバックアップを実行します。バックアップコマンドは /opt/couchbase/bin/cbbackup です:
# /opt/couchbase/bin/cbbackup http://couchbase.test.com:8091 /tmp/default_backup -u Administrator -p password -b default

上記例では couchbase.test.com という Couchbase サーバーのバケット default に対して、管理者名 Administrator 、管理者パスワード password でバックアップを実行し、その結果を /tmp/default_backup/ 以下に出力しています。

実行後、/tmp/default_backup/ 以下にはバックアップ結果のダンプファイルができています。ダンプファイルはクラスタ毎に作成されるので、クラスタ数と同じ数のダンプファイルが出力されているはずです:
/tmp/default_backup/bucket-default/node-couchbase.test.com%3A8091/data-0000.cbb

次にリストアを実行します。リストアコマンドは /opt/couchbase/bin/cbrestore です:
# /opt/couchbase/bin/cbrestore http://Administrator:password@couchbase.test.com:8091 --bucket-source=default --bucket-destination=test

このコマンドでは "default" というバケットから取得したバックアップダンプを(同じく) "test" というバケットに対してリストアする、という処理を指示しています。


Couchbase サーバーの特長の1つに「memcached互換」があります。

(ディスクではなく)メモリを一時キャッシュとして利用する memcached サーバーは比較的遅いリレーショナルデータベースのフロントエンドプロセッサーとして配置することで、部分的/仮想的に高速なデータベースサーバーを実現できます。ただし、あくまでメモリ上にのみ存在させて、ディスクへの読み書きを最小限に減らすことで高速化を実現しているため、そのままサーバーを落とすと中身が無くなり、再度メモリ上に展開して初めて再度利用することができるようになります。

Couchbase はこの memcached と互換性があります。memcached そのもののようにディスクへの退避のないデータストアとして利用することもできますし、データを非同期にディスクへ退避させつつメモリ中心のアクセスで高速化をはかる、という利用方法も選択できます(バケット単位で種類を指定します)。ちなみにデフォルト設定では後者になります。 ただいずれのモードでも単に memcached の挙動を真似ているというだけでなく、互換性があります。

具体的には、memcached は 11211 番ポートに TCP/IP で接続してデータを読み書きすることができますが、全く同じ TCP/IP リクエストで Couchbase サーバーのデータも読み書きできます。

実際に試してみましょう。Couchbase サーバーの環境構築についてはこちらを参照して、予め Couchbase サーバーが稼働している状態を用意しておきます:
CentOS に CouchBase サーバーを導入する

また、この手順では telnet を利用するので、telnet クライアントを用意しておきます。CentOS であれば以下のコマンドでインストールできます:
# yum install telnet

では実際に telnet で Couchbase サーバーに接続します。以下のコマンドの "couchbase.test.com" 部分を Couchbase サーバーのホスト名か IP アドレスに変更して実行してください:
# telnet couchbase.test.com 11211
Trying couchbase.test.com...
Connected to couchbase.test.com (127.0.0.1).
Escape character is '^]'.

上記のように "Escape character is '^]'. " と表示されれば接続できています。あっさり。

試しにこのプロトコルでデータを作成してみます。以下の様なコマンドを実行して、"my-third-document" という ID で、値が "My name is Couchbase." というドキュメントを作成してみます:
set my-third-document 0 0 21 My name is Couchbase.(改行)

memcached では set コマンドでデータを作成します。最初のパラメータは ID、2番目のパラメータは flag ですがここでは0を指定します。3番目のパラメータはデータの有効期限を秒単位で指定しますが、ゼロを指定すると無制限(ずっと残るデータ)になります。4番目のパラメータはデータサイズ、そして最後に実際のデータを入力します。

memcached(Couchbase)内のデータを参照するには get コマンドで ID を指定します:
get my-third-document(改行)
VALUE my-third-document 0 21
My name is Couchbase.
END

"My name is Couchbase." という入力した値が正しく取得できました。Couchbase サーバーに対して、memcached のプロトコルで値の読み書きができることが確認できました。

というわけで、Couchbase は memcached と同じように使えることが分かりました。memcached 用のライブラリの多くがそのまま Couchbase に対しても使えることになるので、これは便利な互換性といえます。

ちなみに、Couchbase サーバーへの telnet 接続を終了するには CTRL + ] を押して、プロンプトで quit を入力です:
(CTRL + ] を入力)
telnet> quit (telnetプロンプトで quit と入力)
# (元のシェルに戻る)







Couchbase サーバーに PHP SDK を使って PHP のプログラムからアクセスする環境の準備手順と、そのサンプルを紹介します。環境は CentOS 6.4 です。

まず Couchbase サーバーが用意できていない場合はサーバーを導入します。導入手順はこちらを参照してください:
CentOS に CouchBase サーバーを導入する


次に Couchbase Server 用の PHP のクライアントライブラリをインストールしますが、ここではいくつかの手順が必要になります。

まずは PHP をインストールします。後で pecl を使うので php-pear も含めてインストールします:
# yum install php php-mbstring php-devel php-pear

また、後で使うので openssl-devel モジュールもこのタイミングでインストールしておきます:
# yum install openssl-devel

次に libcouchbase.so という、Couchbase サーバー用の共用モジュールを導入します:
# wget -O /etc/yum.repos.d/couchbase.repo http://packages.couchbase.com/rpm/couchbase-centos62-x86_64.repo
# yum check-update
# yum install libcouchbase2-libevent libcouchbase-devel

ここまでの作業ができていると Couchbase サーバーの PHP SDK が導入できるようになります:
# pecl install couchbase

最後に PHP の拡張モジュールとして couchbase.so を指定します:
# vi /etc/php.d/json.ini
  :
  :
extension=json.so
extension=couchbase.so ←この行を追加

これで Couchbase 用の PHP SDK が使える状態になりました。以下の様なコードを記述することで PHP から Couchbase サーバーにアクセスすることができます("couchbase.test.com"部分は Couchbase サーバーのホスト名かIPアドレスを指定します):
<?php
$cb = new Couchbase( "couchbase.test.com:8091", "", "", "default" );
$cb->set( "my-second-document", "ハロー、CouchBase." );
var_dump( $cb->get( "my-second-document" ) );
?>

この例では Couchbase オブジェクトを作成し、"my-second-document" という ID のドキュメントを "ハロー、Couchbase" というプレーンテキストで作成し、そのデータを var_dump 関数で表示する、というだけのシンプルな内容です。この PHP プログラムが正しく実行されると以下の様な出力結果になります:
# php test.php
string(22) "ハロー、CouchBase."

Couchbase PHP SDK の公式なドキュメントはこちらを参照ください:
http://docs.couchbase.com/couchbase-sdk-php-1.1/




 

このページのトップヘ