« 【Black Belt 再放送のお知らせ】AWS Black Belt Tech Webinar 2015 - Amazon API Gateway | メイン | AD ConnectorをつかったオンプレミスのActive DirectoryからAWSへの接続方法 »

ストリームデータへのインタラクティブクエリのための、Presto-Amazon Kinesisコネクタ

こちらはAWSアドバンスドテクノロジーパートナーであるQuboleの、Technical StaffのSivaramakrishnan NarayananとDirector of Product ManagementのXing Quanからのゲスト投稿になります。
 
Amazon Kinesisは巨大な分散したストリームデータのためのスケーラブルなフルマネージドサービスです。(特にモバイルやウェアラブルデバイスの)アプリケーションでより多くのデータを集め始める時に、Amazon KinesisはAWSにデータを投入する際の開始点となります。しかし、全てのデータは分析されビジネスディシジョンに影響を与えることができて初めて意味をもちます。多くのソリューションがKinesisのストリームを取り出して様々な方法で処理することができますが、SQLを使ってリアルタイムに近いレベルでKinesisにクエリをかけることはどれもできていません。我々Quboleはこれに気づいて、Kinesis用のPrestoコネクタを開発しました。
 
Prestoはインタラクティブでリアルタイムに近いレベルのSQLを発行するユースケースに適応するようにデザインされています。元々はFacebookによって開発されオープンソースとなり、それ以降Netflix, Dropbox, そしてAirbnbといった企業によってペタバイト級のデータセットに対してインタラクティブクエリを行う場面に広く適応してきました。QuboleではPresto as a ServiceをAWS上でエンドのお客様に提供していて、典型的にはデータサイエンティストやビジネスアナリストがインタラクティブなSQLクエリを簡単に使いはじめることができます。Quboleはジョブの要求に合わせてPrestoクラスタの設定や自動プロビジョンやスケールを管理しています。
 
多くのお客様はデータをAmazon Simple Storage Service (S3)に保存していて、Presto, Hive, Spark, またはMapReduce(すべてQuboleではサービスとして提供されています)を組み合わせてインタラクティブやバッチのワークロードを扱っています。しかしながら、Prestoの高速でのインタラクティブ処理をみて、リアルタイムのストリームソース(例えばAmazon Kinesis)に対してクエリをしたいという需要をお客様から伺うことが増えてきました。この需要に合わせて、我々はKinesisから直接に読んだりクエリしたりするためのPrestoのオープンソースコネクタを開発しました。下の図はコネクタがどのように動作するかを示しています(スケッチはMartin Kleppmanのブログに着想を得ています)。
 
Qubole_Image_1a
 
ユーザのアプリケーションはKinesisのshardにデータをpushします。Prestoクラスタはいくつかのworkerと1つのcoordinatorプロセスで構成されます。Kinesisコネクタはそれぞれのshardに対して1つのworkerを割り当ててGetRecords APIを使ってshardからデータを読み出します。Kinesisの各レコードは単なるblobなので、コネクタはそれらを論理的なテーブルの列としてどのように解釈すればいいかを知っている必要があります。ユーザは(後でご紹介しますが)テーブル定義の一部として、コネクタにblobをどの様に解釈するかを教えます。するとコネクタはKinesisのレコードをテーブルのレコードに変換して、それはPrestoエンジンで別のクエリのステージによって処理されます。各workerは各shardからどのぐらい読み込んだかをDynamoDBのテーブルに記録することもできます。これはこの後紹介するチェックポイント機能のために使われます。 
 
これからコネクタの能力をQuboleを使ってお見せします。サンプルのアプリケーションはNASAがフロリダのNASAケネディスペースセンターのWWWサーバに来たHTTPリクエストを公開した90年代に連れて行ってくれます。それでは時間旅行をするために、以下のステップに従っていきます。
 
  1. KinesisストリームにアクセスするためにAWSアカウントが必要です。データを生成しようとしているマシン上でDefaultAWSCredentialsProviderChainを経てKinesisにアクセスできる様にしてください。例えば、AWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYといった環境変数を設定します。
  2. kinesis-data-genというツールをダウンロードしてビルドして、tarを解凍します。このツールを使ってAmazon Kinesisにnasahttpという名前のストリームを作り、JSON形式のhttpログをpushします。シードデータとしてusesample_data.txtを使うことができます。
    
    [localhost datagen-0.0.1-SNAPSHOT]$ ./bin/kinesis-datagen -f etc/sample_data.txt -create -n nasahttp -s 1
    Jun 29, 11:01:26 AM com.qubole.kinesis.nasa.DataGenerator safeDeleteStream INFO: trying to delete nasahttp
    Jun 29, 11:01:32 AM com.qubole.kinesis.nasa.DataGenerator safeDeleteStream INFO: stream nasahttp not found
    Jun 29, 11:01:36 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: waiting for stream to become active
    Jun 29, 11:01:36 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: CREATING
    Jun 29, 11:01:38 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: CREATING
    
    Jun 29, 11:02:25 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: ACTIVE
    Jun 29, 11:02:25 AM com.qubole.kinesis.nasa.DataGenerator pushRecords INFO: opening file etc/sample_data.txt
    Jun 29, 11:02:25 AM com.qubole.kinesis.text.FileLineReader end INFO: records = 9
    Jun 29, 11:02:25 AM com.qubole.kinesis.executor.StreamProducerRunnable run INFO: producer is done
    Jun 29, 11:02:25 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: producer is done
    Jun 29, 11:02:25 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: waiting for consumers to finish
    Jun 29, 11:02:25 AM com.qubole.kinesis.nasa.RecordParser parseRecord SEVERE: unable to parse:
    Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamConsumerRunnable run INFO: no-op count 1
    Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Produced 9 records
    Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Consumed 9 records
    Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Total time 1.16748948 seconds
    Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Records per second 7.7088489054308225
    
  3. もっと大きいストリームを作るために、NASAのHTTPリクエストデータセットをこちらからダウンロードします。これらのファイルをデータ生成のシードのサンプルファイルとして使うことができます。
  4. QuboleのPresto as a Serviceを使うために、Quboleにアカウントを作成します。作成したAmazon KinesisストリームにアクセスできるようにAWSのクレデンシャルを入力します。こちらの短いガイドを見るとPrestoクラスタをどのようにセットアップするかがわかります。
  5. Quboleのアカウントが作成されると、自動設定されたPrestoクラスタをClustersのページで見ることができるでしょう。editのアイコンをクリックするとクラスタの設定の詳細を見ることができ、Prestoの設定のセクションへスクロールダウンできます。 Qubole_Image_2
  6. このセクションでは以下のテキストを入力します。WikiのConnector ConfigurationTable Definitionsのセクションを読むと詳細が理解できます。
    
    config.properties:
    datasources=jmx,hive,kinesis
    
    catalog/kinesis.properties:
    connector.name=kinesis
    kinesis.table-names=nasahttp
    
    kinesis/nasahttp.json:
    {
    	"tableName": "nasahttp",
    	"schemaName": "default",
    	"streamName": "nasahttp",
    	"message": {
        	"dataFormat": "json",
        	"fields": [
            	{"name": "host", "mapping": "host", "type": "VARCHAR"},
            	{"name": "at", "mapping": "timestamp", "type": "VARCHAR"},
            	{"name": "request", "mapping": "request", "type": "VARCHAR"},
            	{"name": "code", "mapping": "code", "type": "BIGINT"},
            	{"name": "bytes", "mapping": "bytes", "type": "BIGINT"}
        	]}
    }
    
  7. Analyzeのページに戻りこのテーブルに対してPrestoコマンドを組み立てます。以下のスクリーンショットはAnalyzeページの異なるコンポーネントを示しています。最初のPrestoクエリはクラスタを起動するので、2分ぐらいかかります。クエリが実行している間は進捗情報のログを見ることができ、クエリが終了したら結果を見ることができます。この例のクエリはHTTPサーバに対して最大バイトのリクエストを行ったhostを見つけるクエリです。
    
    select host, sum(bytes) as total_bytes from kinesis.default.nasahttp group by host order by total_bytes desc limit 2;
    
    Qubole_Image_3

おめでとうございます!Amazon KinesisストリームにPrestoを使ったSQLクエリを初めて実行できました。同じkinesis-datagenツールを使って追加のデータをinsertすることができて(-createオプションは使わないで下さい)、同じクエリを再実行できます。total_bytesの数字は違う結果になると思います。

Presto-Amazon Kinesisコネクタはチェックポイントクエリもサポートします。もし最後にクエリして以降に入ってきた追加のデータに対してクエリを実行したいときには、この機能を使うことができます。例えば、毎分トップ5のリクエストを表示するダッシュボードを作るためにこの機能が使えます。この機能の使い方についてはcheckpointingのドキュメントをご覧ください。

コネクタはオープンソース(Apacleライセンス)でありgithubで入手可能です。READMEファイルに、特定のバージョンのPrestoに対してコネクタをどのようにビルドするか、どのようにバイナリをインストールするかが説明されています。wikiのconfigurationのページを見ていくとAmazon Kinesisにつなぐための正しい設定要素を見ることができます。wikiのtable definitionsのページでは、データ・フォーマットやフィールドマッピングを含めて、どのようにPresto-Amazon Kinesisテーブルを表現すればよいかが書かれています。instructionsではPrestoをラップトップはクラスタ上にインストールして実行する方法を見ることができます。

最後に、Amazon Kinesisは固定された期間だけデータを保持しますが、より分析向きのORCやParquetといったフォーマットにして日次でS3にオフロードすることが推奨されます。Hiveがこのユースケースには適しています。我々はAmazon Kinesisストレージハンドラを開発しオープンソースとしました(githubで入手可能です)。このストレージハンドラは、Hiveの中でAmazon Kinesisの直接クエリしてHDFSかS3上のパーティションされたテーブルにORCやParquetといった最適化されたフォーマットで書き込むために使うことができます。このコネクタも、時系列分析のために確実に一度だけデータが書き出されたことを保証するためにチェックポイントアクセスをサポートしています。

我々は Shubham Agarwal, Shubham Tagra そして Akhilesh Anand のこのプロジェクトに対する貢献に感謝しています。我々のお客様がAmazon Kinesisのストリームデータセットに対してインタラクティブにクエリできる様になったことに興奮しています。

質問やオススメがありましたら、ぜひ以下にコメントをお願いします。

原文: Presto-Amazon Kinesis Connector for Interactively Querying Streaming Data (翻訳: SA岩永)

コメント

Twitter, Facebook

このブログの最新情報はTwitterFacebookでもお知らせしています。お気軽にフォローください。

2018年4 月

1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30