管理不要なAmazon Redshift Database Loader
こんにちは、ソリューションアーキテクトの岩永です。 Principal Solutions ArchitectのIan MeyersがAWS Big Data Blogに投稿した A Zero-Administration Amazon Redshift Database Loaderを翻訳しました。AWS Lambdaを利用して、S3にファイルが上がったら自動でAmazon Redshiftにデータを投入できるようになります。DynamoDBを利用したジョブ管理なども参考になる部分があるかと思います!
この新しいAWS Lambda ファンクションによって、Amazon Redshiftへのデータロードが今までになく簡単になります!Amazon S3の好きな場所にファイルをpushするだけで自動的にAmazon Redshiftクラスタにロードされます。
AWS Lamda を Amazon Redshiftと組み合わせる
Amazon Redshiftは完全に管理されたペタバイト級のデータウェアハウスを$1000/TB/年以下で使え、AWSのお客様にご自身のアプリケーションやビジネスのすべてをとてもパワフルに分析するサービスを提供しています。クラスタにロードするために、サードパーティが管理するFTPや、ファイルを生成する内部アプリケーションといった、たくさんの種類のデータソースからデータを取り込みます。Amazon Redshiftにロードするベストプラクティスは、COPYコマンドを利用してAmazon S3やAmazon DynamoDBやAmazon EMR上のHDFSから並列でデータをロードする方法です。
いずれの方法であれ、新しいデータが来ているかをチェックするサーバや、そのデータをロードしたり問題が起こったら対処するワークフローを管理するサーバを立てておく必要があります。そこで、我々はAWS LambdaベースのAmazon Redshift loader(http://github.com/awslabs/aws-lambda-redshift-loader)を開発しました。これによって、皆さんはサーバ管理をすることなく、S3にファイルを置いていくつものAmazon Redshiftクラスタ上の多数のテーブルに自動的にロードすることができます。これは、AWS Lambdaがイベント駆動で管理不要なコンピュートサービスであるからこそ実現可能となっています。開発者はアプリケーションを開発するだけで、自動的にホストされスケールするので、きめ細かい価格構造をとることができます。
このファンクションはS3上のAmazon RedshiftにロードされるべきすべてのファイルのリストをDynamoDBを用いて管理します。このリストによって1つのファイルが1度だけロードされることを保証し、同時にファイルがいつどのテーブルにロードされるかを皆さんがコントロールできるようになります。入力場所に見つかったファイルは指定したバッチサイズまでバッファに貯められるか、もしくは時間ベースの閾値を指定してロードをトリガーすることができます。
どんなCOPYコマンドのオプションも利用可能であり、CSV(もしくは任意の区切り文字の)ファイルはもちろん、JSONファイル(JSON pathとの組み合わせ)もサポートしています。すべてのパスワードやアクセスキーは安全のため暗号化されています。AWS Lambdaによって、自動的なスケールと高可用性、そしてAmazon CloudWatchによる組み込みのロギングを手にすることができます。
最後に、組み込みの構成管理やバッチ状態やトラブルシューティングの監視機能によって、ロード処理の状態管理をするためのツールを提供しています。Amazon SNSを通じてロード状態の通知もサポートしているので、皆さんはロード処理がどのぐらい時間超過しているかを見て取ることができます。
AWS Lambda Amazon Redshift Database Loaderへのアクセス方法
このAWS Lambda ファンクションはAWSLabsからダウロードできます( http://github.com/awslabs/aws-lambda-redshift-loader )。例えば、ローカルセットアップのためには以下のステップを実行します。
git clone https://github.com/awslabs/aws-lambda-redshift-loader.git
cd aws-lambda-redshift-loader
npm install
まずはじめに:Amazon Redshiftクラスタを準備する
クラスタにロードするために、まずAWS Lambdaが接続可能にする必要があります。そのためには、クラスタのセキュリティグループはパブリックインターネットからのアクセスを許可しなければなりません。将来的には、AWS Lambdaがまるで皆さんのVPC内にいるように振る舞う機能がサポートされると思います。
クラスタのsecurity groupを設定する方法(※訳注 こちらの手順はEC2-Classicのものになります。VPCをお使いの方は、EC2のコンソールから同様のセキュリティグループの設定を行って下さい):
- Amazon Redshiftのコンソールにログインします。
- 左側のナビゲーションからSecurityを選びます。
- ご自身のクラスタに設定したセキュリティグループを選びます。
- 新しいConnection TypeでCIDR/IPを追加し、0.0.0.0/0と入力します。
- Authorizeを選び変更を保存します。
Amazon Redshift側では、ユーザにロードするテーブルへのINSERTのみを許可することをオススメします。CREATE USERコマンドを使って複雑なパスワードのユーザを作成し、GRANTコマンドを使ってINSERTのみを許可します。
まずはじめに:AWS Lambdaファンクションのデプロイ
ファンクションのデプロイ方法:
- S3バケットとAmazon Redshiftクラスタと同じリージョンでAWS Lambdaのコンソールを開きます。
- Create a Lambda functionを選び、例えばMyLambdaDBLoaderといった名前を入力します。
- Code entry typeでUpload a zip fileを選び、GitHubからダウンロードしたAWSLambdaRedshiftLoader-1.1.zipをアップロードします。
- File nameはデフォルトのindex.jsを、Handler nameもhandlerとし、AWS Lambdaのexecution role作成Wizardに従います。このファンクションではtimeoutの最大値を使うことをオススメします(プレビュー中は60秒が最大値となります)。
続いて、イベントソースを設定しS3 PUTイベントがAWS Lambdaファンクションに送られるようにします。
- デプロイしたファンクションで、Configure Event Sourceを選び、入力データとして使いたいバケットを選択します。lambda_invoke_roleを選ぶか、Create/Select機能でデフォルトのinvocation roleを作成します。
- Submitをクリックして変更を保存します。
これが終わると、AWS Lambdaファンクションがデプロイされていて、テストイベントを送信しCloudWatch Logsのログストリームを見ることができます。
バージョンの注意事項
過去にバージョン1.0をAWSLambdaRedshiftLoader.zipとして配布しました。このバージョンは暗号化のためにAmazon Key Management Serviceを使っていませんでした。もし過去にデプロイしていてバージョン1.0をお使いで、バージョン1.1にアップグレードしたい場合は以下の手順に従って下さい。
- node setup.jsを実行して、設定を再生成します。
- 接続パスワードやS3のシークレットキーを含む過去の値を再入力します。
- これでもうKey Management Serviceと会話できるようになっているので、次のセクションで説明される様にAWS Lambda Execution RoleのためのIAMポリシーをアップグレードします。
まずはじめに:Lambda Execution Role
加えて、以下にあるIAMポリシーをAWS Lambdaが実行時に使うroleに追加する必要があります。AWS LambdaがSNSを呼び出しDynamoDBを使い、AWS Key Management Serviceで暗号化を実施できるように、ファンクションをデプロイした後で以下のポリシーをlambda_exec_roleに追加します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1424787824000",
"Effect": "Allow",
"Action": [
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:ListTables",
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:UpdateItem",
"sns:GetEndpointAttributes",
"sns:GetSubscriptionAttributes",
"sns:GetTopicAttributes",
"sns:ListTopics",
"sns:Publish",
"sns:Subscribe",
"sns:Unsubscribe",
"kms:Decrypt",
"kms:DescribeKey",
"kms:GetKeyPolicy"
],
"Resource": [
"*"
]
}
]
}
まずはじめに:通知をサポート
このファンクションは必要であればバッチ処理の終了時に通知を送ることができます。SNSを使うと、eメールやアプリケーションへのHTTP Pushで通知を受け取ることや、後続の処理のためにキューに追加することができます。ロードの成功や失敗、またはその両方をSNS通知で受け取りたい場合、SNSのトピックを作成し、Amazon Resource Notations(ARN)形式のIDをメモしておいて下さい。
まずはじめに:設定開始
ファンクションがデプロイされたので、どういう時にどうやってS3からファイルがロードされるべきかを伝えるための設定を作成する必要があります。Getting Started with the SDK in Node.js tutorialとConfiguring the SDK in Node.js tutorialで説明されている様にAWS SDK for JavaScriptをインストールし認証情報を設定して下さい。ローカルにNode.jsの処理系も必要です。また以下のコマンドを使って依存するモジュールをインストールして下さい。
cd aws-lambda-redshift-loader && npm install
注意:正しいAWSリージョンとの通信を保証するために、AWS_REGION環境変数を必要なロケーションする必要が場合によってはあります。例えば、US Eastはus-east-1、EU West1はeu-west-1になります。
export AWS_REGION=eu-central-1
次に、setup.jsスクリプトを実行するためにnode setup.jsと入力します。このスクリプトはどのようにロードされるべきかを質問してきます。このドキュメントの最後にある付録にセットアップの概要があります。データベースのパスワードに加えて、Amazon RedshiftがS3にアクセスする時に使うシークレットキーもAmazon Key Management Serviceによって暗号化されます。セットアップはalias/LambdaRedshiftLoaderKeyというエイリアスのついた新しいマスターキーを作成します。
以前のバッチや状態を確認する
もしクラスタへのバッチロードで何が起きたかを見る必要があれば、queryBatches.jsスクリプトを使ってDynamoDBのLambdaRedshiftBatchesテーブルを覗くことができます。以下の引数を取ることができます:
- region: AWS Lambdaファンクションがデプロイされたリージョン
- status: 検索したいステータスで、’error’, ‘complete’, ‘pending’, ‘locked’のいずれか
- date: オプションで開始日を指定してバッチを検索
node queryBatches.js eu-west-1 error というコマンドを実行すると、以下の様に状態がerrorでEU(アイルランド)リージョンのすべてのバッチのリストが返ってきます。
[
{
"s3Prefix": "lambda-redshift-loader-test/input",
"batchId": "2588cc35-b52f-4408-af89-19e53f4acc11",
"lastUpdateDate": "2015-02-26-16:50:18"
},
{
"s3Prefix": "lambda-redshift-loader-test/input",
"batchId": "2940888d-146c-47ff-809c-f5fa5d093814",
"lastUpdateDate": "2015-02-26-16:50:18"
}
]
特定のバッチのより詳細が必要であれば、describeBatch.jsを使ってすべての詳細情報を見ることができます。以下の引数を取ります:
- region: Lambdaファンクションがデプロイされたリージョン
- batchid: 詳細を見たいバッチのID
- s3Prefix: バッチが作成されたS3のプレフィックス
これらの引数でDynamoDBに保存されているバッチの情報を得られます。
{
"batchId": {
"S": "7325a064-f67e-416a-acca-17965bea9807"
},
"manifestFile": {
"S": "my-bucket/manifest/manifest-2015-02-06-16:20:20-2081"
},
"s3Prefix": {
"S": "input"
},
"entries": {
"SS": [
"input/sample-redshift-file-for-lambda-loader.csv",
"input/sample-redshift-file-for-lambda-loader1.csv",
"input/sample-redshift-file-for-lambda-loader2.csv",
"input/sample-redshift-file-for-lambda-loader3.csv",
"input/sample-redshift-file-for-lambda-loader4.csv",
"input/sample-redshift-file-for-lambda-loader5.csv"
]
},
"lastUpdate": {
"N": "1423239626.707"
},
"status": {
"S": "complete"
}
}
処理されたファイルの掃除
デフォルトでは1つのファイルは1回しかロードされませんが、バッチが何かの理由でエラー状態になった時など、まれに再処理したくなることがあります。そういった時にはprocessedFiles.jsスクリプトで処理済ファイルリストを検索したり削除したりできます。このスクリプトはoperation typeとfilenameを引数にとります。-qを使うとファイルが処理済かどうか検索でき、-dでエントリを削除できます。以下は保存されている処理済ファイルの例です。
バッチを再実行する
例えばもし何かの理由で必要なファイルのロードに失敗してしまい、バッチを再実行する必要があるときは、reprocessBatch.jsスクリプトが使えます。このスクリプトはdescribeBatch.jsと同じ引数(region, batch ID, input location)を取ります。オリジナルの入力バッチには影響を与えず、その代わりにそのバッチの中にあった入力ファイルはLambdaRedshiftProcessedFilesテーブルから削除され、スクリプトがそのS3のファイルの場所で生成されるイベントを強制的に発生させます。これがファンクションによって捕捉されオリジナルと同様の形で再処理されます。バッチが’open'状態でない時に限って再処理することができます。
定期的なロードの保証
もしそれほど頻繁にファイルが置かれないプレフィックスがあって、それでもN分毎にちゃんとファイルがロードされていることを保証したかったら、以下の手順で定期的なロードを発生させて下さい。
設定を作成する時に、filenameFilterRegexに*\.csvの様な設定を追加して下さい。これは指定されたS3プレフィックスに置かれたもののなかでCSVファイルだけをロードするという意味です。それから、cronジョブを使ってN分毎にダミーファイル生成スクリプトを実行して下さい。
./path/to/function/dir/generate-dummy-file.py <region> <input bucket> <input prefix> <local working directory>
- region: ロード用の入力バケットのリージョン
- input bucket: 入力場所として設定されているバケット
- input prefix: 入力場所として設定されているプレフィックス
- local working directory: S3へのアップロード前にダミーファイルが生成される場所
ログを確認する
通常の運用であれば、管理者的なことは何もする必要はないでしょう。設定したS3の場所に置かれた新しいファイル数が設定したバッチサイズに達したらロードされます。失敗通知を処理するために運用的な処理を作成したくなるでしょうが、ローダーの状態を見るだけならAmazon CloudWatchを見ることで確認できます。CloudWatchのコンソールを開、左側のナビゲーションからLogsをクリックします。次いで、/aws/lambda/<My Function>といった名前のファンクションのロググループを選びます。
上のログはどれもAWS Lambdaファンクションが起動すると作成され、定期的にローテーションします。最後の取り込み時間を見ると、AWS Lambdaが最後にCloudWatch Logsにイベントをプッシュした時が分かります。
ログストリームを確認して、ファンクションが単にファイルをバッファに入れたイベントを見ることができます:
または、ロードが実行されたこともわかります:
拡張や新機能開発
- Java: 既存のJava APIとつなぐブリッジAPI(https://www.npmjs.com/package/java & `npm install java`) - デフォルトパスにlibjvm.soがあることとjavacがインストールされている必要があります
- JDBC: JDBCのラッパーモジュール(https://www.npmjs.com/package/jdbc & `npm install jdbc`)
- Async: 非同期プログラムのための高階関数と共通パターン((https://www.npmjs.com/package/async & `npm install async`)
- Node UUID: RFC41222 (v1とv4) UUIDの厳格な実装((https://www.npmjs.com/package/node-uuid & 'npm install node-uuid`)
付録:サンプルローダー
GitHubプロジェクトは、このファンクションを試す手助けになるサンプルディレクトリが含まれています。このサンプルには、サンプルデータをロードするためのデータベース設定のセットアップスクリプトに加え、サンプル設定を生成するスクリプトが含まれています。
まず、GitHubのdistフォルダーにあるAWSLambdaRedshiftLoader-1.1.zipを、「まずはじめに」のセクションの様にデプロイし、依存モジュールをインストールします()。Amazon Redshiftクラスタをセットアップし、エンドポイントアドレス、ポート、サンプルを実行したいデータベース名、それにサンプルのために作成したユーザのユーザ名とパスワードを確認します。またPostgreSQLのコマンドラインクライアントとbashターミナルを準備します。
準備ができたら、PUTTYの様なターミナルを使ってsample/scriptsディレクトリにあるconfigureSample.shを実行します。このスクリプトは引数としてクラスタのエンドポイントアドレス、ポート、データベース名、ユーザ名をこの順番にとり、実行するとパスワードを尋ねてきます。そうしたらこのスクリプトは以下を実行します:
- test_lambda_load_userというデータベースユーザを作成します。サンプルが終了したら削除してかまいません
- lambda_redshift_sampleという3つの整数型カラムを持つテーブルを、新しいユーザ所有で作成します。
- 設定スクリプトを実行し、必要な設定情報、例えばサンプルに使いたいS3バケットやアクセスキーの情報を尋ねてきます。
これでデータベースへのロードを試すことができます。単純にsample/dataディレクトリのファイルを、セットアップスクリプトに与えたS3バケットのinputというプレフィックスに転送してみましょう。例えばAWS CLIを使うとこういう感じです:
aws s3 sync ../data s3://<my bucket>/input --region <region for my bucket>
そうしたらデプロイされたAWS Lambdaファンクションに行き、CloudWatch Logsのログを確認すると、2つのバッチが2つのファイル毎にロードされていて、1ファイルがopenなバッチになっていることがわかると思います。
デモローダーを綺麗に掃除するには、cleanup.shを引数無しで実行して下さい。Amazon Redshiftのテーブルとユーザが削除され、DynamoDBの設定テーブルも削除されます。
もし質問や提案があれば、ぜひこのポストの最後にコメントをして下さい。
付録:注意点と設定のリファレンス
次の表はサポートされる設定のオプションのガイダンスになります。
バッチサイズなどの項目では、Lambdaファンクションのタイムアウトがプレビュー中は60秒であることに注意して下さい。つまり、COPYコマンドは必ず約50秒以下で完了する必要があります。そうすることで、Lambdaファンクションがバッチのメタデータの書き込みを終える時間を確保できます。COPYの時間はファイルサイズとコードされるファイルの数、クラスタのサイズ、そして他の処理がどれくらいWorkLoadManagementキューのスロットを使っているかの関数になります。
ITEM |
REQUIRED |
NOTES |
---|---|---|
Enter the Region for the Redshift Load Configuration |
Y |
AWSのリージョンをこちらから選びます http://docs.aws.amazon.com/general/latest/gr/rande.html。短縮名を使って下さい (例: US East N. Virginiaはus-east-1) |
Enter the S3 Bucket & Prefix to watch for files |
Y |
S3のパスを以下の形式で入れます <bucket name>/<prefix>. Prefixはオプションです。 |
Enter a Filename Filter Regex |
N |
input prefixに追加されたファイルを処理をする前にフィルタする正規表現を入力します。 |
Enter the Cluster Endpoint |
Y |
データをロードするAmazon Redshiftクラスタのエンドポイントアドレスを入力します。 |
Enter the Cluster Port |
Y |
Amazon Redshiftクラスタに設定したポート番号を入力します。 |
Enter the Database Name |
Y |
ロード先のテーブルがあるデータベースの名前を入力します。 |
Enter the Database Username |
Y |
COPYコマンドを実行するためのユーザのユーザ名を入力します。テーブル所有者しかCOPYを実行できないので、コピー先のテーブルの所有者でなければなりません。 |
Enter the Database Password |
Y |
データベースユーザのパスワードを入力します。DynamoDBに保存される前に暗号化されます。 |
Enter the Table to be Loaded |
Y |
入力データがロードされるテーブル名を入力します。 |
Should the Table be Truncated before Load? (Y/N) |
N |
ロードの前にテーブルをtruncateするかを選択します。このオプションは入力データを続けて処理をしていて、かつ新しいデータのみをこのELT処理の中で見たい時に使います。 |
Enter the Data Format (CSV or JSON) |
Y |
データ形式が、文字分割された値か、JSONかを決めます。Amazon Redshift documentationにより詳細な情報があります。 |
If CSV, Enter the CSV Delimiter |
Yes if Data Format = CSV |
分割文字を1文字で入力します。‘,’ (コンマ) や ‘|’ (パイプ)の様になります。 |
If JSON, Enter the JSON Paths File Location on S3 (or NULL for Auto) |
Yes if Data Format = JSON |
アトリビュートをデータベースのテーブルにマッピングするためのJSON pathファイルの場所を入力します。もし入力しなかった場合、‘json = auto’というオプションがCOPYコマンドで使われるので、アトリビュートはロード先テーブルのカラム名と一致する必要があります。 |
Enter the S3 Bucket for Redshift COPY Manifests |
Y |
COPYコマンドのマニフェストファイルを保存するS3のバケットを入力します。これはロードデータの入力場所と同じにすべきではありません。 |
Enter the Prefix for Redshift COPY Manifests |
Y |
COPYマニフェストのprefixを入力します。 |
Enter the Prefix to use for Failed Load Manifest Storage |
N |
もしCOPYが失敗した時に、別の場所にそのマニフェストファイルをコピーしておくことができます。prefixを入力すると、他のCOPYマニフェストと同じバケットに保存されます。 |
Enter the Access Key used by Redshift to get data from S3 |
Y |
Amazon RedshiftはS3のデータを読み取るために認証情報を与える必要があります。アカウントかAmazon Redshiftを扱えるIAMユーザのアクセスキーを入力します。 |
Enter the Secret Key used by Redshift to get data from S3 |
Y |
S3からデータを取得するアクセスキーのシークレットキーを入力します。これはDynamoDBに保存する前に暗号化されます。 |
Enter the SNS Topic ARN for Failed Loads |
N |
もし、ロードが失敗した時にSNSトピックへ通知を飛ばしたい場合、ARNを入力します。ARNは‘arn:aws:sns:<region>:<account number>:<topic name>といった形式になります。 |
Enter the SNS Topic ARN for Successful Loads |
N |
COPYが成功した時に通知をしたいSNSトピックのARNを入力します。 |
How many files should be buffered before loading? |
Y |
COPYが現在のバッチが開始されるまでに入力場所に保持するファイルの数を入力します。クラスタのCPUの数の偶数倍をオススメします。2〜5分毎にロードが起きる様な数の倍数にするのが良いです。 |
How old should we allow a Batch to be before loading (seconds)? |
N |
AWS Lambdaはここで入力した秒より古いバッチを一気に片付けようとします。この処理は、Filename Filter Regexにマッチしてもしなくても入力場所のすべてのS3イベントで発生します。120より低い値は推奨されません。 |
Additional Copy Options to be added |
N |
追加で使いたいCOPYのオプションを指定します(http://docs.aws.amazon.com/redshift/latest/dg/r_COPY.htmlに概要があります)。高頻度なロード環境でのCOPYのオプションでのベストプラクティスの情報がhttp://blogs.aws.amazon.com/bigdata/post/Tx2ANLN1PGELDJU/Best-Practices-for-Micro-Batch-Loading-on-Amazon-Redshiftにもあるので参照して下さい。 |
付録:GitHubの目次
以下のファイルが配布物に含まれています:
Filename |
Arguments |
Purpose |
index.js |
|
AWS Lambdaファンクションのメインメソッドです。AWS Lambdaで実行されるhandlerメソッドをexportします。 |
constants.js |
|
コンソールアプリケーションの終了コードや、テーブル名などの、ファンクションやスクリプトで使われる定数です。 |
common.js |
|
ユーザの入力のバリデーションメソッドや、日付を扱うメソッドなどです。exportされます。 |
processedFiles.js |
region, process option (-d for delete, -q for query), filename |
LambdaRedshiftProcessedFilesテーブルから検索したり削除するスクリプトです。前のバッチで処理された1つの入力ファイルを再処理するときに使います。 |
describeBatch.js |
region, batch ID, input prefix |
指定したバッチのステータスを出力するスクリプトです。 |
queryBatches.js |
region, batch status (complete, error, locked, open) and optionally a search date |
ある期間や特定のバッチステータスのバッチのリストを出力するスクリプトです。 |
unlockBatch.js |
region, batch ID, input prefix |
lock状態のバッチをopen状態にするスクリプトです。これは通常の運用ではまず使う必要はありませんが、入力バッチを処理しているファンクションがバッチの状態を変更している途中で失敗してしまった時にのみ必要になります。 |
reprocessBatch.js |
region, batch ID, input prefix |
あるバッチを新規で再実行させるために、すべての入力イベントを発生させるスクリプトです。 |
test.js |
|
イベントを手入力してAWS Lambdaファンクションハンドラを実行することで、ローカルでのテストを可能にするテストツールです。 |
generate-trigger-file.py |
region, input bucket, input prefix, local directory for temp file |
‘lambda-redshift-trigger-file.dummy’ファイルを指定した場所に生成するPythonスクリプトです。低頻度なロードを発生させる必要があればスケジュールタスク(例えばcronジョブ)の中で使います。 |
コメント