こんにちは、広野です。
Amazon Kinesis Data Firehose の Dynamic Partitioning を AWS CloudFormation でプロビジョニングしようとしたのですが、AWS公式ドキュメントに具体的なリファレンスやサンプルが無かったため、やってみて実際に動いたテンプレートを紹介します。
やりたいこと
Amazon Kinesis Data Firehose で受け取ったログストリームを、ログ出力先S3バケットに以下の条件でフォルダ分けして出力したい。
- ログデータ内、key1 というキーに格納されているバリューごとに第一階層のフォルダ分けをする。
“key1”: “TEST” というデータの場合、TESTというフォルダ内に、
“key1”: “PROD” というデータの場合、PRODというフォルダ内にログが出力される。 - 第二階層以降は、年、月、日、時間単位でフォルダ分けする。
※Amazon Athena でパーティション分けしやすいフォーマット。この記事では詳細には言及しない。
Amazon Kinesis Data Firehose の Dynamic Partitioning という機能を使用すれば実現でき、マネジメントコンソールで手作業でプロビジョニングするのは簡単なのですが、AWS CloudFormation だとどう書く?というのがテーマです。
マネジメントコンソール上の設定
以下の設定を CloudFormation で実装したいです。
この中で難儀だったのが、New line delimiter と Dynamic partitioning keys の部分です。
New line delimiter は、出力先ログファイル内で1レコードごとに改行を入れる設定です。
Dynamic partitioning keys は、フォルダ分けしたいキーの設定です。ここでは、key1を設定しています。
CloudFormation テンプレート
こうなります。
IAMロールの記述が長いです。ログ出力先となるS3バケットはワイルドカードで全てにしてあります。
AWSTemplateFormatVersion: 2010-09-09 Description: CloudFormation template example Resources: # ------------------------------------------------------------# # Kinesis Data Firehose Role (IAM) # ------------------------------------------------------------# FirehoseRole: Type: AWS::IAM::Role Properties: RoleName: KinesisFirehoseRole Description: This role allows Kinesis Data Firehose to delivery logs into S3. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: KinesisFirehosePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "glue:GetTable" - "glue:GetTableVersion" - "glue:GetTableVersions" Resource: - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" - Effect: Allow Action: - "s3:AbortMultipartUpload" - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" - "s3:ListBucketMultipartUploads" - "s3:PutObject" Resource: "*" - Effect: Allow Action: - "lambda:InvokeFunction" - "lambda:GetFunctionConfiguration" Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" - Effect: Allow Action: - "kms:GenerateDataKey" - "kms:Decrypt" Resource: - !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" Condition: StringEquals: "kms:ViaService": !Sub "s3.${AWS::Region}.amazonaws.com" StringLike: "kms:EncryptionContext:aws:s3:arn": - "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*" - "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" - Effect: Allow Action: - "logs:PutLogEvents" Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*" - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*" - Effect: Allow Action: - "kinesis:DescribeStream" - "kinesis:GetShardIterator" - "kinesis:GetRecords" - "kinesis:ListShards" Resource: - !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" - Effect: Allow Action: - "kms:Decrypt" Resource: - !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" Condition: StringEquals: "kms:ViaService": !Sub "kinesis.${AWS::Region}.amazonaws.com" StringLike: "kms:EncryptionContext:aws:kinesis:arn": !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" # ------------------------------------------------------------# # Kinesis Data Firehose delivery stream # ------------------------------------------------------------# FirehoseStreamActivityLogs: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: UserActivityLogStreams DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: arn:aws:s3:::bucket_name Prefix: "userActivitylog/key=!{partitionKeyFromQuery:key1}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/" ErrorOutputPrefix: "errorLog/!{firehose:error-output-type}/dt=!{timestamp:YYYY}-!{timestamp:MM}-!{timestamp:dd}/" BufferingHints: IntervalInSeconds: 300 SizeInMBs: 128 CompressionFormat: GZIP RoleARN: !GetAtt FirehoseRole.Arn ProcessingConfiguration: Enabled: false DynamicPartitioningConfiguration: Enabled: true RetryOptions: DurationInSeconds: 300 CloudWatchLoggingOptions: Enabled: true LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams LogStreamName: S3Delivery ProcessingConfiguration: Enabled: true Processors: - Type: MetadataExtraction Parameters: - ParameterName: MetadataExtractionQuery ParameterValue: '{key1: .key1}' - ParameterName: JsonParsingEngine ParameterValue: JQ-1.6 - Type: AppendDelimiterToRecord Parameters: - ParameterName: Delimiter ParameterValue: "\\n" DependsOn: LogGroupFirehoseActivityLogs # ------------------------------------------------------------# # Kinesis Data Firehose Error LogGroup (CloudWatch Logs) # ------------------------------------------------------------# LogGroupFirehoseActivityLogs: Type: AWS::Logs::LogGroup Properties: LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams RetentionInDays: 365
- Type: AppendDelimiterToRecord Parameters: - ParameterName: Delimiter ParameterValue: "\\n"
- Type: AppendDelimiterToRecord
また、Firehose への送信元の仕様によって、レコードごとにデフォルトで改行が入るものがあるようです。例えば Amazon SES のログなど。この場合、Firehose 側で改行を入れる設定をしてしまうと、余計な空行が入ってしまいデータ集計に異常をきたすので注意が必要です。
解説
以下の記述の部分が難儀した箇所です。
フォルダ分けのキーとなる key1 をまんま以下のように記述しました。デリミタは、このまま記述すればOKです。
ProcessingConfiguration: Enabled: true Processors: - Type: MetadataExtraction Parameters: - ParameterName: MetadataExtractionQuery ParameterValue: '{key1: .key1}' - ParameterName: JsonParsingEngine ParameterValue: JQ-1.6 - Type: AppendDelimiterToRecord Parameters: - ParameterName: Delimiter ParameterValue: "\\n"
Dynamic Partitioning は、有効/無効の変更はできず、変更する際はリソースを作り直しになるので注意しましょう。
まとめ
いかがでしたでしょうか?
インターネット上にこれだ!という情報がなかったので書いてみました。みなさまのお役に立てれば幸いです。