パートナーソリューションアーキテクトの榎並です。
今回は、前回のBlogで紹介したAmazon Kinesis概要の中のKinesis Client Library(以下、KCL)について掘り下げたいと思います。
KCLは、Kinesis Applicationを開発するに辺り容易にデータ取得でき、かつ、耐障害性の高い仕組みを構築することができるライブラリです。Slideshareの資料にあるスライド24以降にKCLの機能をまとめましたのでご覧ください。
KCLは、Githubから取得できます。Amazon Kinesisの開発者ガイドにもKCLの実装方法などについて触れており、本ブログはその情報に補足するものになります。
では、Amazon Kinesisから受け取ったデータを標準出力するというとてもシンプルなKinesis Applicationをつくってみましょう。
Java SE Development Kit 7 (JDK7)以上とMavenがインストールされている前提で進めます。
1.Mavenを用いてProjectを作成します。
$ mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false -DgroupId=jp.co.adsj -DartifactId=kinesisapp
Mavenプロジェクトが作成されます。
Projectフォルダのトップにpom.xmlが作成されてますので、編集します。
2.pom.xmlの編集
KCLとAWS SDKをDependenciesに追加します。
最新のライブラリバージョンは、Maven Centralでリポジトリを確認できます。
KCL:http://search.maven.org/#browse%7C-1421543558
AWS Java SDK:http://search.maven.org/#browse%7C-2028037748
以下、追加した情報になります。
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.8.9.1</version>
</dependency>
上記、以外にも、BuildやPackage時に使用する設定も行います。
3.実装
自動生成された、App.javaに加えてFactory.javaとProcessor.javaを追加しました。
Factory.javaは、IRecordProcessorFactoryの実装でProcessorクラスを返します。
package jp.co.adsj; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; public class Factory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new Processor(); } }
Processor.javaは、IRecordProcessorの実装になり、Amazon Kinesisから取得したデータ処理を記述します。
IRecordProcessorは、initialze, processRecords, shutdownの3つのインタフェースがあり、それぞれ実装します。これらのメソッドがコールバックされる形になり、レコードが取得されるとprocessRecordsが呼び出されます。
- initialize
@Override public void initialize(String shardId){ LOG.info("Initializing record processor for shard: " + shardId); this.kinesisShardId = shardId; }
- processRecords
@Override public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Processing " + records.size() + " records for kinesisShardId " + this.kinesisShardId); String data = null; for (Record record : records) { try{ data = decoder.decode(record.getData()).toString(); LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data); } catch (CharacterCodingException e) { LOG.error("Malformed data: " + data, e); continue; } }
long currentTime = System.currentTimeMillis();
if (currentTime > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = currentTime + CHECKPOINT_INTERVAL_MILLIS;
}
}
recordsにWorker(KCLが生成するデータ取得用スレッド)が取得したデータレコードが入ってきます。 record.getData()でデータの実体にアクセスできます。 getData()で取得されるデータは、ByteBufferなので、データ処理する際は注意が必要です。肝は、checkpoint(checkpointer)で、一定間隔(サンプルでは、CHECKPOINT_INTERVAL_MILLISを60秒としております)で、Checkpointをコールします。これにより、KCLは、DynamoDBに処理したシーケンス番号を書き込みます。
サンプルソースは、可読性を重視し、リトライ処理を入れておりませんが、リトライ処理をいれることをオススメします。
- shutdown
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
shutdownメソッドは、Split Shard、Merge Shardを対象のShardに行った時、または、Streamが削除されときに呼び出されます。
メインのApp.javaになります。
public static void main( String[] args ) { AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration( System.getProperty("kinesis.app"), System.getProperty("kinesis.stream"), credentialsProvider, System.getProperty("kinesis.worker") ) .withKinesisEndpoint("https://kinesis.ap-northeast-1.amazonaws.com") .withFailoverTimeMillis( Long.valueOf(System.getProperty("kinesis.failovertime")) ); IRecordProcessorFactory factory = new Factory(); Worker worker = new Worker(factory, kinesisConfig); worker.run(); }
検証しやすいようにアプリケーション名、Stream、ワーカーID及びFailoverTimeを渡すようにしました。(値チェックなどしてませんが。。)
アプリケーション名
- このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。アプリケーション名が異なる場合、KCL は 同じストリームを処理するまったく別のアプリケーションと見なします。
- KCL はアプリケーション名を使って Amazon DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報(チェックポイントやワーカーとシャードのマッピングなど)を保存します。各アプリケーションには、それぞれ Amazon DynamoDB テーブルがあります。
ワーカーID
- ワーカーを特定するIDになり、実行するアプリケーション毎にユニークにする必要があります。どのワーカーがどのShardからデータを取得し、どのシーケンス番号まで処理しているかをDynamoDBのアイテムとして管理されます。
- インスタンス固有情報と乱数などを利用することで、複数インスタンスでアプリケーションを実行することも、1台のインスタンスに複数アプリケーションを実行することも可能になります。
Failover time
- KinesisClientLibConfigurationで設定できるパラメータの一つで、1つのワーカーが障害によりデータを取得できなくなった場合もう1つのワーカーにフェイルオーバーする時間
- 短い時間にするとDynamoDBへのアクセスが高くなることに注意
その他のパタメータ
- Failover time以外にもGetReord APIで取得するレコード数の上限値を決める、MaxRecordsなどが設定できます。
- 設定できる値は以下で参照することができます。
- https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
4.コンパイルと実行
mavenでコンパイルとパッケージを行った後に実行します。
$ mvn install package
$ java -Dkinesis.app=testapp -Dkinesis.stream={StreamName} -Dkinesis.worker={WorkerName} -Dkinesis.failovertime={failover time} -jar target/KinesisApp.jar jp.co.adsj.App
動作確認は、AWS CLIを使ってKinesisに対してデータをPutします。
$ aws kinesis put-record --stream-name testStream --data "test data" --partition-key pkey
Kinesisアプリ側のコンソールにデータが表示されれば成功です。
また、マネジメントコンソールでUS-EAST-1のDynamoDBにアクセスすると、Kinesisアプリ名のテーブルが作成されていることも確認できます。
今回は、KCLの簡単な開発の流れをご説明しました。
このサンプルアプリを複数のインスタンスで実行させたうえで、1つのアプリを停止させた後もう一方のアプリが検知し、データを取得ことが確認できます。
ぜひ、KCLを用いて様々なAmazon Kinesisアプリケーションを開発頂ければ幸いです。