パートナーソリューションアーキテクトの榎並です。
今回は、前回のBlogで紹介したKinesis Client Library(Java版)のPython版であるKinesis Client Library for Python(以下、KCL for Python)について掘り下げたいと思います。
2014年10月21日にKCL for Pythonがリリースされました。本ブログでは、AWS Blogを補足する形で、KCL for Pythonについて説明していきます。
1.KCL for Python概要
KCL for Pythonは、KCL for Javaを使ってます。
ここが勘違いしてしまうポイントかもしれません。Amazon Kinesisからのデータ取得、ロードバランシングや可用性を高める機能などは、KCL for Javaを利用している構造です。データ取得後の処理ロジックをPythonで記述することができる構造です。これができるようになったのもKCL for Java version 1.2からMulti-Language Support機能がついたからです。このMulti-Language Support機能は、常駐プロセスとして起動し、サブプロセスとしてPythonで記述されたデータ処理用プログラムを起動します。常駐プロセスとデータ処理用プログラム間は標準入出力を経由してデータ連携がされる構造になります。
標準入出力間は、JSONフォーマットのプロトコルが定められています。詳細は、こちらをご覧ください。
2.サンプルコードの実行
上記の内容をご理解頂ければ、Githubに記述されている手順も容易に理解できると思います。
Githubに書かれている手順と重複しますが、以下の手順でセットアップします。
$ git clone https://github.com/awslabs/amazon-kinesis-client-python.git
$ cd amazon-kinesis-client-python
$ sudo python setup.py install
amazon-kinesis-client-pythonフォルダ内のsamplesフォルダ内にKCL for Pythonのサンプルアプリとデータを入力するためのアプリが容易されております。まず、Amazon Kinesisにデータを入力いたします。
sample_kinesis_wordputter.pyを実行することでデータを入力することができます。実行するためのクレデンシャルの設定(~/.aws/credentialsに記述する、IAMロール利用するなど)を行ってください。
sample_kinesis_wordputter.pyでは、指定したAmazon KinesisのStreamがなかった場合には、自動的に作成してくれるので便利です。(以下は、ap-northeast-1を利用する用にしております。)
$ cd samples
$ sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster -r ap-northeast-1
KCL for Pythonのサンプルアプリ sample_kclpy_app.py を実行するために、まず、sample.propertiesに設定を行います。既にサンプルアプリが最低限動作ための設定はされているので、実行するKCL for Pythonのパスの設定と利用するリージョンをap-northeast-1に設定を以下の通り行ってください。
(5行目)
executableName = /home/ec2-user/amazon-kinesis-client-python/samples/sample_kclpy_app.py
(34行目)
regionName = ap-northeast-1
上記、設定を行うことで実行可能になります。
先にも記述している通り、KCL for JavaがKCL for Pythonで記述されたプログラムを起動するので、KCL for Javaを起動することになります。起動時の必要なライブラリの指定など面倒な作業を簡易化するためにamazon_kclpy_helper.pyというヘルパープログラムがsamplesフォルダ内にあります。以下の形でコマンドを実行するとKCL for Javaを起動するためのコマンドがプリントされます。
$ amazon_kclpy_helper.py --print_command -j /usr/bin/java -p /home/ec2-user/amazon-kinesis-client-python/samples/sample.properties
/usr/bin/java -cp /usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/amazon-kinesis-client-1.2.0.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/jackson-annotations-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/commons-codec-1.3.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/commons-logging-1.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/joda-time-2.4.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/jackson-databind-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/jackson-core-2.1.1.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/aws-java-sdk-1.7.13.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/httpclient-4.2.jar:/usr/lib/python2.6/site-packages/amazon_kclpy-1.0.0-py2.6.egg/amazon_kclpy/jars/httpcore-4.2.jar:/home/ec2-user/amazon-kinesis-client-python/samples com.amazonaws.services.kinesis.multilang.MultiLangDaemon sample.properties
上記の出力結果を、コピーして、再度コンソールにペーストすることでKCL for Javaが実行されます。
実行すると、コンソール上にKCL for Javaが利用するAmazon DynamoDBのテーブル作成やAmazon Kinesisのシャードからのデータ処理ステータスなどが確認できます。
3.データ処理ロジックの実装
上記のサンプルアプリでは、肝心のデータ処理部分は何も実装されておりません。
kcl.RecordProcessorBaseの抽象クラスを継承して実装します。
initialize(self, shard_id)
process_records(self, records, checkpointer)
shutdown(self, checkpointer, reason)
initializeは起動時に1度実行されます。process_recordsは、複数回実行され、ここにデータ処理ロジックを実装することになります。shutdownは、KCL for JavaのMultiLangDaemonがシャットダウンしたときに呼ばれます。
KCL for JavaのMultiLangRecordProcessor.javaを見ていただくとわかるのですが、initializeメソッド内でデータ処理ロジック(サンプルだとsample_kclpy_app.py)のプロセスを起動し、processRecordsメソッド内で取得したデータを標準出力として書き込みを行います。データ処理ロジック側は常に標準入力を待っており、入力されたデータをチェックし、process_recordsに合致する内容であれば、データ処理ロジックのprocess_recordsが実行される流れになります。
KCL for Javaでは、Shard毎にWorkerが起動されるため、データ処理ロジックのプロセスもWorker数分起動される構造になります。
sample_kclpy_app.pyでは、95行目付近にロジックを入れることが可能です。
4.KCL for Pythonで指定できるパラメータ
sample.propertiesで設定できるパラメータは、以下の通りです。基本は、KCL for Javaで指定できるパタメータになります。
項目
|
内容
|
failoverTimeMillis
|
Workerが処理を継続できなくなり別なWorkerにフェイルオーバする時間(ミリ秒)
短い値を設定するとDynamoDBのPIOPS高くなるため注意が必要
|
maxRecords
|
1回のデータ取得で取得するレコード件数
|
idleTimeBetweenReadsInMillis
|
レコード取得間隔(ミリ秒)
|
callProcessRecordsEvenForEmptyRecordList
|
レコードデータが空でもレコード取得処理を継続するかの判断(True or Fault)
|
parentShardPollIntervalMillis
|
親のShardをチェックするインターバル
短い値を設定するとDynamoDBのPIOPS高くなるため注意が必要
|
cleanupLeasesUponShardCompletion
|
shradが終了した後に継続して処理を続けるかクリーンアップするかを指定 |
taskBackoffTimeMillis
|
KCLのバックオフタイムの設定
|
metricsBufferTimeMillis
|
CloudWatchのAPIコールする前の時間
|
metricsMaxQueueSize
|
CloudWatchのAPIコールする最大のキューサイズ
|
validateSequenceNumberBeforeCheckpointing
|
Checkpointingする前にシーケンス番号をチェックするかを指定 |
maxActiveThreads
|
MultiLangDaemonの最大スレッド数
|
Javaでの実装にハードルがあった方には、今回のPython対応はメリットがあるかと思います。また、KCL for JavaがMulti-Language対応されたので、今後、Python以外での実装も容易になったかと思います。
ぜひ、色々なKinesisアプリケーションを作成してください。