こんにちは。ソリューションアーキテクトの篠原英治(@shinodogg)です。本日はAWS Database Blogに掲載されたIndexing Metadata in Amazon Elasticsearch Service Using AWS Lambda and Pythonをご紹介します。
Amit Sharma (@amitksh44) はAWSのソリューションアーキテクトです。
全てのデータをAmazon S3にストアし、真の意味でのシングルソースなデータレイクアーキテクチャを構築することはできますが、今回のアプローチにおいては、大量のデータを高い信頼性で保存するだけでなく、非常にハイスピードでデータを登録し、それを解析することを可能にします。オブジェクトの数が膨大になればなるほど、特定のオブジェクトを探し当てるのは困難になる傾向があり、そういった意味でも解析容易性は重要です - 干し草の山の中から針1本捜す
S3の中のオブジェクトは、オブジェクトの識別およびオブジェクトの属性に関するメタデータを持ちます。オブジェクトの数が膨大になった場合、このメタデータはオブジェクトを探し当てるための足掛かりになります。このメタデータを直接検索することは出来ませんが、Amazon Elasticsearch Service に保存することで、全てのS3オブジェクトのメタデータが検索可能になります。このブログポストではPythonとAWS Lambdaを使って、メタデータをAmazon Elasticsearch Service (Amazon ES)にストアする方法をステップ・バイ・ステップなインストラクションと共にご紹介します。

S3イベント通知をLambda起動のトリガーに
このポストでは、S3イベント通知および Lambdaを使ってAmazon ES上にあるS3オブジェクトのメタデータを管理します。S3イベント通知はS3バケットにおいてイベントが発生した際に通知を受け取ることを可能にします。これらのイベントは、例えばPUT, COPY, PUT, DELETEといったアクションです。S3イベント通知に関する詳細はS3のドキュメントをご覧ください。
S3イベント通知はLambda起動のトリガーに利用することができます。この機能を使うことで、S3イベントが発生した際に呼び出されるLambdaファンクションを作成することができます。これを行うには、Amazon S3においてバケットの通知設定で、どのタイプのイベントが起こった際にLambdaファンクションを呼び出すか定義をする必要があります。

High-level flow between S3 and Lambda
Putting it together
上記の図のパーツを統合するために以下のステップで設定を行っていきます。
AWS Lambda と Amazon S3 の設定
Lambda と S3の設定のためにAWSコンソール上でAWS Lambdaを開きます。

もし、Lambdaファンクションを作成するのがはじめてであれば、Get Started Now を選択します。

Configure triggersを選択します。

次のページでトリガーを選択します。

S3バケットとキャプチャしたいイベントを選択します。PrefixとSuffixは空欄のまま、もしくはユースケースに応じて記載します。
例えば、/appTier/appServer1 というフォルダに保存される全てのファイルであれば、そのパスをPrefixに登録します。同様に、.log, .jpg, .avi 等といった特定の拡張子を対象にする場合は、それをSuffixに登録します。PrefixとSuffixが両方マッチする場合にイベントはトリガーされます。そして、Enable Triggerチェックボックスを選択します。

次に、nameとdescriptionを埋めて、実行環境としてPython2.7を選択します。コードのアップロードは別途行うため、Code entryはUpload a .ZIP fileを選択します。ハンドラに関する設定はデフォルトの lambda_function.lambda_handler のままにしておきます。

それでは AWS Identity and Access Management (IAM) ロールを作成しパーミッションを関連付けることで、Lambdaファンクションが必要なAWSリソースにアクセスできるようにしましょう。これを行うには、Create a new role from template(s) を選択し、新しいロールに名前を付けます。そして Policy templates については、S3 object read-only-permission を選択します。

Advanced settings においては、Memory, Timeout, そして VPC の設定をデフォルトのままにしておきます。Next を選択すると、Lambdaファンクションが作成され、S3に関する適切なパーミッションが付与され、Lambdaファンクションが起動できる状態になります。この設定はS3のコンソール上で確認することができますので、上記で設定したS3バケットのPropertiesにおけるEventセクションを見てみましょう。

Lambdaファンクションに関する詳細は、(ペンの形の)変更アイコンを選択することで確認することができます。
Amazon ESドメインの作成
それでは、Amazon ESのドメインを作成しましょう。Analyticsの中のElasticsearch Serviceを選択します。

Get Started ボタンを選択し、ドメインに名前を付けます (私は my-es-cluster にしました
)

以下のように、インスタンスタイプとインスタンス数(必要であれば両方ともに後から変更可能です)を選択します。もし、これをプロダクション環境で使おうとするのであれば、m3.mediumもしくはそれ以上のインスタンスを選択することをおすすめします。また、開発環境やちょっとした検証環境にはt2.microも良い選択肢となるでしょう。
ストレージに関しては、インスタンスストレージとAmazon EBSボリューム(General Purpose, Provisioned IOPS and Magnetic)を選択することができます。まずはGeneral Purpose EBSボリュームを選択し、FreeStorageSpace
, JVMMemoryPressure
, そして CPUUtilization
のメトリクスとクエリのレスポンスタイムを計測しておき、今後ストレージタイプを変えた場合にその違いを見ていきます。エラーのハンドリングやマイグレーションに関してはこちらのAWSのドキュメントを参照してください。
重要なポイントとして "どの程度のストレージが必要か?" というものがありますが、例えば、S3に1KBのメタデータが1000万個あった場合、プロビジョンすべきデータ容量は少なくとも20GBになります。内訳はプライマリインスタンスに10GBで、レプリカインスタンスに10GBです。より詳細にElasticsearchのスケーリングやキャパシティプランニングを検討する場合はElasticsearchのドキュメントが参考になります。

次に、アクセスポリシーを設定します。今回は特別にテストを容易にするために広く開放された設定になっていますが、あなたのクラスタでは決してこのような設定をしないでください。IP-basedもしくはuser-basedなテンプレートがありウィザード形式で制限を設定することもできます。より詳細になクラスタのアクセスコントロールの設定に関しては、こちらのブログポストをご覧ください。

そして最後にConfirm and create を押して完了です。しばらくするとクラスタが起動します。

Lambdaファンクションの作成
S3にオブジェクトが作成された際のトリガーでメタデータを実際にプッシュするコードを作成していきます。Lambdaには上記でS3に対するread-onlyの権限を与えており、読み込みが出来る状態になっています。Pythonのコードは全体的には以下のようになります。
- S3イベントからメタデータを読み込む
- Amazon ESドメインのエンドポイントに接続する
- もしインデックスが存在しなければ作成する
- メタデータをAmazon ESに書き込む
Amazon ESに接続するためにPythonのコードはいくつかのライブラリ(Elasticsearch, RequestsHttpConnection, urllib)を使います。以下のコマンドを叩くことでこれらのライブラリのダウンロードが可能になります。最初にpipがインストールされているか確認しましょう - このステップに関してはpipのウェブサイトを参照ください。サンプルコードはこれらのライブラリを含めてダウンロードできるようになっているので必ずしもこのステップは必要でないかもしれませんが、ご理解のためにお役立て下さい。
pip install requests -t /path/to/project-dir
pip install Elasticsearch -t /path/to/project-dir
pip install urllib3 -t /path/to/project-dir
ライブラリがカレントディレクトリに配置されていることが確認できたらコードをみていきましょう。
以下はAmazon ESに接続するファンクションです。
def connectES(esEndPoint):
print ('Connecting to the ES Endpoint {0}'.format(esEndPoint))
try:
esClient = Elasticsearch(
hosts=[{'host': esEndPoint, 'port': 443}],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection)
return esClient
except Exception as E:
print("Unable to connect to {0}".format(esEndPoint))
print(E)
exit(3)
このファンクションはドメインエンドポイントを引数として受け取って、Elasticsearchクライアントのインスタンスを返します。
esClientがドメインのエンドポイントと共に宣言されていることをご確認ください。
esClient = connectES("search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com")
以下のファンクションはAmazon ESのインデックスを作成するものです。
def createIndex(esClient):
try:
res = esClient.indices.exists('metadata-store')
print("Index Exists ... {}".format(res))
if res is False:
esClient.indices.create('metadata-store', body=indexDoc)
return 1
except Exception as E:
print("Unable to Create Index {0}".format("metadata-store"))
print(E)
exit(4)
上記のconnectESファンクションによってElasticsearchクライアントのインスタンスがesClientとして返され、それが使われます。
また、
‘metadata-store
’は作成するインデックスの名前で、
‘indexDoc
’は以下のように定義するインデックスのマッピングです。
indexDoc = {
"dataRecord" : {
"properties" : {
"createdDate" : {
"type" : "date",
"format" : "dateOptionalTime"
},
"objectKey" : {
"type" : "string",
"format" : "dateOptionalTime"
},
"content_type" : {
"type" : "string"
},
"content_length" : {
"type" : "long"
},
"metadata" : {
"type" : "string"
}
}
},
"settings" : {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
以下の5つの情報を保存します。
- createdDate
- objectKey
- content_type
- content_length
- metadata
ここではいくつか重要となるポイントを紹介します。
最初に、シャードに関するプランニング。ベストなプライマリとレプリカシャードの数はインスタンスのサイズ、データ量、データ登録の頻度、古いデータの削除の頻度、クエリのタイプ、といった複数の要素に依存します。例えば、ログファイルのような時系列データに関しては時間毎、日毎、週毎、といった形でデータ登録の頻度によって異なるインデックスを作成して管理することができます - 多くの場合は日毎のインデックス作成をおすすめしています。クエリされることのない古いインデックスに関してはプライマリシャードの数を減らしたり、インデックスそのものをドロップすることが出来るからです。より詳細な議論に関してはElasticsearchのドキュメントをご覧ください。
バルクでのインデクシングに関して、あまり頻度の高くないデータ登録に関しては今回のサンプルコードでも構いませんが - 例えば1ドキュメントが1KBで秒間100PUTまで。しかし、非常に高頻度で大きなボリュームのデータを登録しようとする場合、大きめなインスタンスを使い _bulk インデックスAPIを使って効率的にElasticsearchクラスタにデータを投入することをおすすめします。今後のフォローアップブログにてアーキテクチャパターンや_bulkインデックスAPIを効果的かつコスト効率よく使う方法をご紹介する予定です。
シャードの設定やクラスタのプランニングに関するより詳細な説明はElasticsearchのドキュメントをご覧ください。
以下のファンクションは実際にElasticsearchにメタデータを書き込むコードです。
def indexDocElement(esClient, key, response):
try:
indexObjectKey = key
indexcreatedDate = response['LastModified']
indexcontent_length = response['ContentLength']
indexcontent_type = response['ContentType']
indexmetadata = json.dumps(response['Metadata'])
retval = esClient.index(index='metadata-store', doc_type='images', body={
'createdDate': indexcreatedDate,
'objectKey': indexObjectKey,
'content_type': indexcontent_type,
'content_length': indexcontent_length,
'metadata': indexmetadata
})
except Exception as E:
print("Doc not indexed")
print("Error: ",E)
exit(5)
このファンクションは esClient, S3オブジェクトキー, そしてS3.get_objectファンクションのレスポンスを使います。このレスポンスには実際のメタデータが含まれます。レスポンス内の要素はesClient.indexを呼び出すことでインデックスされます。ドキュメントIDは自動的にElasticsearchによって採番されます。インデックスのオプションに関してはElasticsearchのドキュメントをご覧ください。
最後に以下のメインのLambdaハンドラのコードは上記のファンクションをトリガーされた際に全て呼び出します。
def lambda_handler(event, context):
esClient = connectES("search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com ")
createIndex(esClient)
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
response = s3.get_object(Bucket=bucket, Key=key)
print(response)
print("KEY: " + key)
print("CONTENT TYPE: " + response['ContentType'])
print("Metadata : " + json.dumps(response['Metadata']))
print("Custom 1: " + response['ResponseMetadata']['HTTPHeaders']['x-amz-meta-custom1'])
print("Custom 2: " + response['ResponseMetadata']['HTTPHeaders']['x-amz-meta-custom2'])
indexDocElement(esClient,key,response)
return response['ContentType']
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
上記のコードはこちらからダウンロードすることが可能です。
メタデータがAmazon ESに登録されたか確認するためにKibanaやElasticsearchの標準のAPIおよびクエリを使うことができます。例えば、以下のようにオブジェクト名前で検索することができます。
>curl -XGET https://ドメインのエンドポイント/metadata-store/images/_search?pretty&q=objectKey:YOURFILENAME
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 9.516893,
"hits" : [ {
"_index" : "metadata-store",
"_type" : "images",
"_id" : "AVgGSFxdQ43eQcLduwj9",
"_score" : 9.516893,
"_source" : {
"content_length" : 61194,
"objectKey" : "YOURFILENAME",
"metadata" : "{\"custom1\": \"banana\", \"custom2\": \"shake\"}",
"content_type" : "application/octet-stream",
"createdDate" : "2016-10-27T13:15:54+00:00"
}
} ]
}
}
以下は何件かドキュメントをインデクシングした後のKibanaのスクリーンショットです。

S3オブジェクトが削除された場合のメタデータの削除
S3のオブジェクトが削除された場合に関連するメタデータも削除するのも基本的には上記と同じステップとなります - event typeの選択が Object Removed Event となるところだけ異なります。

上記以外の全てのステップは同様です。オブジェクトが削除された場合のトリガーを作成し、2つの異なるイベント用のLambdaファンクションを作成します - 1つはオブジェクトPUT, COPY, POST用、もう1つはDELETE用。
以下は削除用のメインのLambdaハンドラコードです。
def lambda_handler(event, context):
esClient = connectES("search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com ")
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
clearMetaData(esClient,key)
return 'Removed metadata for ' + key
except Exception as e:
print(e)
print('Error removing object metadata from Elasticsearch Domain.)
raise e
clearMetaData
ファンクションは以下です。
def clearMetaData(esClient,key):
try:
retval = esClient.search(index='metadata-store', doc_type='images', q='objectKey:' + key, fielddata_fields='_id')
total = retval['hits']['total']
count = 0
while (count < total):
docId = retval['hits']['hits'][count]['_id']
print("Deleting: " + docId)
removeDocElement(esClient,docId)
count = count + 1
return 1
except Exception as E:
print("Removing metadata failed")
print("Error: ",E)
exit(5)
このファンクションは与えられたS3オブジェクトの名前でドメインを検索し更に別のremoveDocElementというファンクションを呼び出します。引数のドキュメントIDはドメインにおいてユニークです。removeDocElement
は以下のように定義されます。
def removeDocElement(esClient,docId):
try:
retval = esClient.delete(index='metadata-store', doc_type='images', id=docId)
print("Deleted: " + docId)
return 1
except Exception as E:
print("DocId delete command failed at Elasticsearch.")
print("Error: ",E)
exit(5)
このコードは、ユニークなドキュメントIDを使ってS3オブジェクトのキーい関連する情報を削除します。以下のように削除されたことを確認できます。
>curl -XGET https://ドメインのエンドポイント/metadata-store/images/_search?pretty\&q=objectKey:train.csv
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 0,
"max_score" : null,
"hits" : [ ]
}
}
以下はElasticsearchクラスタに関するAmazon CloudWatchモニタリング画面のスナップショットです - searchable documents, free storage space, cluster healthといったメトリクスを確認することができます。これらのメトリクスはコンピュートとストレージの観点から、どのようにクラスタをスケールさせるべきか決めるのに役立ちます。例えば、FreeStorageSpaceもしくはCPUUtilizationをモニタリングすることで、スケールアウトさせるべきかスケールアップさせるべきか決めることができるでしょう。
