こんにちは、広野です。
Amazon Kinesis Data Firehose の Dynamic Partitioning を AWS CloudFormation でプロビジョニングしようとしたのですが、AWS公式ドキュメントに具体的なリファレンスやサンプルが無かったため、やってみて実際に動いたテンプレートを紹介します。
やりたいこと
Amazon Kinesis Data Firehose で受け取ったログストリームを、ログ出力先S3バケットに以下の条件でフォルダ分けして出力したい。
- ログデータ内、key1 というキーに格納されているバリューごとに第一階層のフォルダ分けをする。
“key1”: “TEST” というデータの場合、TESTというフォルダ内に、
“key1”: “PROD” というデータの場合、PRODというフォルダ内にログが出力される。 - 第二階層以降は、年、月、日、時間単位でフォルダ分けする。
※Amazon Athena でパーティション分けしやすいフォーマット。この記事では詳細には言及しない。
Amazon Kinesis Data Firehose の Dynamic Partitioning という機能を使用すれば実現でき、マネジメントコンソールで手作業でプロビジョニングするのは簡単なのですが、AWS CloudFormation だとどう書く?というのがテーマです。
マネジメントコンソール上の設定
以下の設定を CloudFormation で実装したいです。
この中で難儀だったのが、New line delimiter と Dynamic partitioning keys の部分です。
New line delimiter は、出力先ログファイル内で1レコードごとに改行を入れる設定です。
Dynamic partitioning keys は、フォルダ分けしたいキーの設定です。ここでは、key1を設定しています。
CloudFormation テンプレート
こうなります。
IAMロールの記述が長いです。ログ出力先となるS3バケットはワイルドカードで全てにしてあります。
AWSTemplateFormatVersion: 2010-09-09
Description: CloudFormation template example
Resources:
# ------------------------------------------------------------#
# Kinesis Data Firehose Role (IAM)
# ------------------------------------------------------------#
FirehoseRole:
Type: AWS::IAM::Role
Properties:
RoleName: KinesisFirehoseRole
Description: This role allows Kinesis Data Firehose to delivery logs into S3.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: KinesisFirehosePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "glue:GetTable"
- "glue:GetTableVersion"
- "glue:GetTableVersions"
Resource:
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog"
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
- Effect: Allow
Action:
- "s3:AbortMultipartUpload"
- "s3:GetBucketLocation"
- "s3:GetObject"
- "s3:ListBucket"
- "s3:ListBucketMultipartUploads"
- "s3:PutObject"
Resource: "*"
- Effect: Allow
Action:
- "lambda:InvokeFunction"
- "lambda:GetFunctionConfiguration"
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
- Effect: Allow
Action:
- "kms:GenerateDataKey"
- "kms:Decrypt"
Resource:
- !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
Condition:
StringEquals:
"kms:ViaService": !Sub "s3.${AWS::Region}.amazonaws.com"
StringLike:
"kms:EncryptionContext:aws:s3:arn":
- "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*"
- "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
- Effect: Allow
Action:
- "logs:PutLogEvents"
Resource:
- !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*"
- !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*"
- Effect: Allow
Action:
- "kinesis:DescribeStream"
- "kinesis:GetShardIterator"
- "kinesis:GetRecords"
- "kinesis:ListShards"
Resource:
- !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
- Effect: Allow
Action:
- "kms:Decrypt"
Resource:
- !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
Condition:
StringEquals:
"kms:ViaService": !Sub "kinesis.${AWS::Region}.amazonaws.com"
StringLike:
"kms:EncryptionContext:aws:kinesis:arn": !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
# ------------------------------------------------------------#
# Kinesis Data Firehose delivery stream
# ------------------------------------------------------------#
FirehoseStreamActivityLogs:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: UserActivityLogStreams
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: arn:aws:s3:::bucket_name
Prefix: "userActivitylog/key=!{partitionKeyFromQuery:key1}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
ErrorOutputPrefix: "errorLog/!{firehose:error-output-type}/dt=!{timestamp:YYYY}-!{timestamp:MM}-!{timestamp:dd}/"
BufferingHints:
IntervalInSeconds: 300
SizeInMBs: 128
CompressionFormat: GZIP
RoleARN: !GetAtt FirehoseRole.Arn
ProcessingConfiguration:
Enabled: false
DynamicPartitioningConfiguration:
Enabled: true
RetryOptions:
DurationInSeconds: 300
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams
LogStreamName: S3Delivery
ProcessingConfiguration:
Enabled: true
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{key1: .key1}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: "\\n"
DependsOn: LogGroupFirehoseActivityLogs
# ------------------------------------------------------------#
# Kinesis Data Firehose Error LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
LogGroupFirehoseActivityLogs:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams
RetentionInDays: 365
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: "\\n"
- Type: AppendDelimiterToRecord
また、Firehose への送信元の仕様によって、レコードごとにデフォルトで改行が入るものがあるようです。例えば Amazon SES のログなど。この場合、Firehose 側で改行を入れる設定をしてしまうと、余計な空行が入ってしまいデータ集計に異常をきたすので注意が必要です。
解説
以下の記述の部分が難儀した箇所です。
フォルダ分けのキーとなる key1 をまんま以下のように記述しました。デリミタは、このまま記述すればOKです。
ProcessingConfiguration:
Enabled: true
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{key1: .key1}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: "\\n"
Dynamic Partitioning は、有効/無効の変更はできず、変更する際はリソースを作り直しになるので注意しましょう。
まとめ
いかがでしたでしょうか?
インターネット上にこれだ!という情報がなかったので書いてみました。みなさまのお役に立てれば幸いです。

