Amazon Kinesis Data Firehose の Dynamic Partitioning を AWS CloudFormation でプロビジョニングする

こんにちは、広野です。

Amazon Kinesis Data Firehose の Dynamic Partitioning を AWS CloudFormation でプロビジョニングしようとしたのですが、AWS公式ドキュメントに具体的なリファレンスやサンプルが無かったため、やってみて実際に動いたテンプレートを紹介します。

やりたいこと

Amazon Kinesis Data Firehose で受け取ったログストリームを、ログ出力先S3バケットに以下の条件でフォルダ分けして出力したい。

  • ログデータ内、key1 というキーに格納されているバリューごとに第一階層のフォルダ分けをする。
    “key1”: “TEST” というデータの場合、TESTというフォルダ内に、
    “key1”: “PROD” というデータの場合、PRODというフォルダ内にログが出力される。
  • 第二階層以降は、年、月、日、時間単位でフォルダ分けする。
    ※Amazon Athena でパーティション分けしやすいフォーマット。この記事では詳細には言及しない。

Amazon Kinesis Data Firehose の Dynamic Partitioning という機能を使用すれば実現でき、マネジメントコンソールで手作業でプロビジョニングするのは簡単なのですが、AWS CloudFormation だとどう書く?というのがテーマです。

マネジメントコンソール上の設定

以下の設定を CloudFormation で実装したいです。

この中で難儀だったのが、New line delimiter と Dynamic partitioning keys の部分です。

New line delimiter は、出力先ログファイル内で1レコードごとに改行を入れる設定です。
Dynamic partitioning keys は、フォルダ分けしたいキーの設定です。ここでは、key1を設定しています。

CloudFormation テンプレート

こうなります。
IAMロールの記述が長いです。ログ出力先となるS3バケットはワイルドカードで全てにしてあります。

AWSTemplateFormatVersion: 2010-09-09
Description: CloudFormation template example

Resources:
# ------------------------------------------------------------#
# Kinesis Data Firehose Role (IAM)
# ------------------------------------------------------------#
  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: KinesisFirehoseRole
      Description: This role allows Kinesis Data Firehose to delivery logs into S3.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - firehose.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: KinesisFirehosePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "glue:GetTable"
                  - "glue:GetTableVersion"
                  - "glue:GetTableVersions"
                Resource:
                  - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog"
                  - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
                  - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "s3:AbortMultipartUpload"
                  - "s3:GetBucketLocation"
                  - "s3:GetObject"
                  - "s3:ListBucket"
                  - "s3:ListBucketMultipartUploads"
                  - "s3:PutObject"
                Resource: "*"
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                  - "lambda:GetFunctionConfiguration"
                Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "kms:GenerateDataKey"
                  - "kms:Decrypt"
                Resource:
                  - !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
                Condition:
                  StringEquals:
                    "kms:ViaService": !Sub "s3.${AWS::Region}.amazonaws.com"
                  StringLike:
                    "kms:EncryptionContext:aws:s3:arn":
                      - "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*"
                      - "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "logs:PutLogEvents"
                Resource:
                  - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*"
                  - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*"
              - Effect: Allow
                Action:
                  - "kinesis:DescribeStream"
                  - "kinesis:GetShardIterator"
                  - "kinesis:GetRecords"
                  - "kinesis:ListShards"
                Resource:
                  - !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "kms:Decrypt"
                Resource:
                  - !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
                Condition:
                  StringEquals:
                    "kms:ViaService": !Sub "kinesis.${AWS::Region}.amazonaws.com"
                  StringLike:
                    "kms:EncryptionContext:aws:kinesis:arn": !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"

# ------------------------------------------------------------#
# Kinesis Data Firehose delivery stream
# ------------------------------------------------------------#
  FirehoseStreamActivityLogs:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: UserActivityLogStreams
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: arn:aws:s3:::bucket_name
        Prefix: "userActivitylog/key=!{partitionKeyFromQuery:key1}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        ErrorOutputPrefix: "errorLog/!{firehose:error-output-type}/dt=!{timestamp:YYYY}-!{timestamp:MM}-!{timestamp:dd}/"
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseRole.Arn
        ProcessingConfiguration:
          Enabled: false
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams
          LogStreamName: S3Delivery
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{key1: .key1}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: "\\n"
    DependsOn: LogGroupFirehoseActivityLogs

# ------------------------------------------------------------#
# Kinesis Data Firehose Error LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupFirehoseActivityLogs:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams
      RetentionInDays: 365
2024.7.3 追記
出力先ファイル内でレコードごとに改行を入れる設定は、以下の 2 パターンの記述のどちらでも動作しました。
- Type: AppendDelimiterToRecord
  Parameters:
    - ParameterName: Delimiter
      ParameterValue: "\\n"
- Type: AppendDelimiterToRecord

また、Firehose への送信元の仕様によって、レコードごとにデフォルトで改行が入るものがあるようです。例えば Amazon SES のログなど。この場合、Firehose 側で改行を入れる設定をしてしまうと、余計な空行が入ってしまいデータ集計に異常をきたすので注意が必要です。

解説

以下の記述の部分が難儀した箇所です。

フォルダ分けのキーとなる key1 をまんま以下のように記述しました。デリミタは、このまま記述すればOKです。

ProcessingConfiguration:
  Enabled: true
  Processors:
    - Type: MetadataExtraction
      Parameters:
      - ParameterName: MetadataExtractionQuery
        ParameterValue: '{key1: .key1}'
      - ParameterName: JsonParsingEngine
        ParameterValue: JQ-1.6
    - Type: AppendDelimiterToRecord
      Parameters:
        - ParameterName: Delimiter
          ParameterValue: "\\n"

Dynamic Partitioning は、有効/無効の変更はできず、変更する際はリソースを作り直しになるので注意しましょう。

まとめ

いかがでしたでしょうか?

インターネット上にこれだ!という情報がなかったので書いてみました。みなさまのお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャを駆使して社内クラウド人材育成アプリとコンテンツづくりに勤しんでいます。React で SPA を書き始めたら快適すぎて、他の言語には戻れなくなりました。サーバーレス & React 仲間を増やしたいです。AWSは好きですが、それよりもフロントエンド開発の方が好きでして、バックエンド構築を簡単にしてくれたAWSには感謝の気持ちの方が強いです。
取得資格:AWS 認定は13資格、ITサービスマネージャ、ITIL v3 Expert 等
2020 - 2024 Japan AWS Top Engineer 受賞
2022 - 2024 AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS Amplify / AWS AppSync / Amazon Cognito / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする

クラウドに強いによるエンジニアブログです。

SCSKクラウドサービス(AWS)は、企業価値の向上につながるAWS 導入を全面支援するオールインワンサービスです。AWS最上位パートナーとして、多種多様な業界のシステム構築実績を持つSCSKが、お客様のDX推進を強力にサポートします。

AWSアプリケーション開発クラウドソリューションデータ分析・活用基盤
シェアする
タイトルとURLをコピーしました