« 2014年11月のAWS Black Belt Tech Webinarのご案内 | メイン | AWS環境におけるERP市場の現状と今後 »

Amazon Kinesisシリーズ(2) Kinesis Client Libraryを使ってKinesis Applicationを作ってみよう

パートナーソリューションアーキテクトの榎並です。
今回は、前回のBlogで紹介したAmazon Kinesis概要の中のKinesis Client Library(以下、KCL)について掘り下げたいと思います。
KCLは、Kinesis Applicationを開発するに辺り容易にデータ取得でき、かつ、耐障害性の高い仕組みを構築することができるライブラリです。Slideshareの資料にあるスライド24以降にKCLの機能をまとめましたのでご覧ください。

KCLは、Githubから取得できます。Amazon Kinesisの開発者ガイドにもKCLの実装方法などについて触れており、本ブログはその情報に補足するものになります。

では、Amazon Kinesisから受け取ったデータを標準出力するというとてもシンプルなKinesis Applicationをつくってみましょう。

Java SE Development Kit 7 (JDK7)以上とMavenがインストールされている前提で進めます。

1.Mavenを用いてProjectを作成します。

$ mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false -DgroupId=jp.co.adsj -DartifactId=kinesisapp

Mavenプロジェクトが作成されます。
Projectフォルダのトップにpom.xmlが作成されてますので、編集します。

2.pom.xmlの編集

KCLとAWS SDKをDependenciesに追加します。
最新のライブラリバージョンは、Maven Centralでリポジトリを確認できます。
KCL:http://search.maven.org/#browse%7C-1421543558
AWS Java SDK:http://search.maven.org/#browse%7C-2028037748

以下、追加した情報になります。

<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>amazon-kinesis-client</artifactId>
  <version>1.1.0</version>
</dependency>

<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-java-sdk</artifactId>
  <version>1.8.9.1</version>
</dependency>

上記、以外にも、BuildやPackage時に使用する設定も行います。

3.実装

自動生成された、App.javaに加えてFactory.javaとProcessor.javaを追加しました。
Factory.javaは、IRecordProcessorFactoryの実装でProcessorクラスを返します。

package jp.co.adsj;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;

public class Factory implements IRecordProcessorFactory {

	@Override
	public IRecordProcessor createProcessor() {
		return new Processor();
	}

}

Processor.javaは、IRecordProcessorの実装になり、Amazon Kinesisから取得したデータ処理を記述します。

IRecordProcessorは、initialze, processRecords, shutdownの3つのインタフェースがあり、それぞれ実装します。これらのメソッドがコールバックされる形になり、レコードが取得されるとprocessRecordsが呼び出されます。

  • initialize
  
 @Override
   public void initialize(String shardId){
     LOG.info("Initializing record processor for shard: " + shardId);
     this.kinesisShardId = shardId;
   }
  • processRecords
@Override
  public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {
    LOG.info("Processing " + records.size() + " records for kinesisShardId " + this.kinesisShardId);
    String data = null;
    for (Record record : records) {
      try{
        data = decoder.decode(record.getData()).toString();
        LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data);
      } catch (CharacterCodingException e) {
        LOG.error("Malformed data: " + data, e);
        continue;
      }
    }
long currentTime = System.currentTimeMillis();
if (currentTime > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = currentTime + CHECKPOINT_INTERVAL_MILLIS;
}
}

 

recordsにWorker(KCLが生成するデータ取得用スレッド)が取得したデータレコードが入ってきます。 record.getData()でデータの実体にアクセスできます。 getData()で取得されるデータは、ByteBufferなので、データ処理する際は注意が必要です。肝は、checkpoint(checkpointer)で、一定間隔(サンプルでは、CHECKPOINT_INTERVAL_MILLISを60秒としております)で、Checkpointをコールします。これにより、KCLは、DynamoDBに処理したシーケンス番号を書き込みます。
サンプルソースは、可読性を重視し、リトライ処理を入れておりませんが、リトライ処理をいれることをオススメします。

 

  • shutdown
  @Override
  public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
    LOG.info("Shutting down record processor for shard: " + kinesisShardId);
    if (reason == ShutdownReason.TERMINATE) {
       checkpoint(checkpointer);
    }
  }

shutdownメソッドは、Split Shard、Merge Shardを対象のShardに行った時、または、Streamが削除されときに呼び出されます。

メインのApp.javaになります。 

 public static void main( String[] args )
 {
  AWSCredentialsProvider credentialsProvider = 
     new DefaultAWSCredentialsProviderChain();
  KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration(
    System.getProperty("kinesis.app"),
    System.getProperty("kinesis.stream"),
    credentialsProvider,
    System.getProperty("kinesis.worker")
   )
   .withKinesisEndpoint("https://kinesis.ap-northeast-1.amazonaws.com")		    
   .withFailoverTimeMillis(
      Long.valueOf(System.getProperty("kinesis.failovertime"))
   );
  IRecordProcessorFactory factory = new Factory();
  Worker worker = new Worker(factory, kinesisConfig);
  worker.run();
  }

検証しやすいようにアプリケーション名、Stream、ワーカーID及びFailoverTimeを渡すようにしました。(値チェックなどしてませんが。。)

アプリケーション名

  • このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。アプリケーション名が異なる場合、KCL は 同じストリームを処理するまったく別のアプリケーションと見なします。
  • KCL はアプリケーション名を使って Amazon DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報(チェックポイントやワーカーとシャードのマッピングなど)を保存します。各アプリケーションには、それぞれ Amazon DynamoDB テーブルがあります。

ワーカーID

  • ワーカーを特定するIDになり、実行するアプリケーション毎にユニークにする必要があります。どのワーカーがどのShardからデータを取得し、どのシーケンス番号まで処理しているかをDynamoDBのアイテムとして管理されます。
  • インスタンス固有情報と乱数などを利用することで、複数インスタンスでアプリケーションを実行することも、1台のインスタンスに複数アプリケーションを実行することも可能になります。

Failover time

  • KinesisClientLibConfigurationで設定できるパラメータの一つで、1つのワーカーが障害によりデータを取得できなくなった場合もう1つのワーカーにフェイルオーバーする時間
  • 短い時間にするとDynamoDBへのアクセスが高くなることに注意

その他のパタメータ

  • Failover time以外にもGetReord APIで取得するレコード数の上限値を決める、MaxRecordsなどが設定できます。
  • 設定できる値は以下で参照することができます。
    • https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java

4.コンパイルと実行

mavenでコンパイルとパッケージを行った後に実行します。

$ mvn install package
$ java -Dkinesis.app=testapp -Dkinesis.stream={StreamName} -Dkinesis.worker={WorkerName} -Dkinesis.failovertime={failover time} -jar target/KinesisApp.jar jp.co.adsj.App

動作確認は、AWS CLIを使ってKinesisに対してデータをPutします。

$ aws kinesis put-record --stream-name testStream --data "test data" --partition-key pkey

Kinesisアプリ側のコンソールにデータが表示されれば成功です。
また、マネジメントコンソールでUS-EAST-1のDynamoDBにアクセスすると、Kinesisアプリ名のテーブルが作成されていることも確認できます。

今回は、KCLの簡単な開発の流れをご説明しました。
このサンプルアプリを複数のインスタンスで実行させたうえで、1つのアプリを停止させた後もう一方のアプリが検知し、データを取得ことが確認できます。

ぜひ、KCLを用いて様々なAmazon Kinesisアプリケーションを開発頂ければ幸いです。

 

コメント

Twitter, Facebook

このブログの最新情報はTwitterFacebookでもお知らせしています。お気軽にフォローください。

2018年4 月

1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30