小ネタ – Kinesis Data Firehose の Dynamic Partitioning を CloudFormation でプロビジョニングする [ Kinesis + CloudFormation ]

こんにちは、広野です。

Amazon Kinesis Data Firehose の Dynamic Partitioning を AWS CloudFormation でプロビジョニングしようとしたのですが、AWS公式ドキュメントに具体的なリファレンスやサンプルが無かったため、やってみて実際に動いたテンプレートを紹介します。

やりたいこと

Amazon Kinesis Data Firehose で受け取ったログストリームを、ログ出力先S3バケットに以下の条件でフォルダ分けして出力したい。

  • ログデータ内、key1 というキーに格納されているバリューごとに第一階層のフォルダ分けをする。
    “key1”: “TEST” というデータの場合、TESTというフォルダ内に、
    “key1”: “PROD” というデータの場合、PRODというフォルダ内にログが出力される。
  • 第二階層以降は、年、月、日、時間単位でフォルダ分けする。
    ※Amazon Athena でパーティション分けしやすいフォーマット。この記事では詳細には言及しない。

Amazon Kinesis Data Firehose の Dynamic Partitioning という機能を使用すれば実現でき、マネジメントコンソールで手作業でプロビジョニングするのは簡単なのですが、AWS CloudFormation だとどう書く?というのがテーマです。

マネジメントコンソール上の設定

以下の設定を CloudFormation で実装したいです。

この中で難儀だったのが、New line delimiter と Dynamic partitioning keys の部分です。

New line delimiter は、出力先ログファイル内で1レコードごとに改行を入れる設定です。
Dynamic partitioning keys は、フォルダ分けしたいキーの設定です。ここでは、key1を設定しています。

CloudFormation テンプレート

こうなります。
IAMロールの記述が長いです。ログ出力先となるS3バケットはワイルドカードで全てにしてあります。

AWSTemplateFormatVersion: 2010-09-09
Description: CloudFormation template example

Resources:
# ------------------------------------------------------------#
# Kinesis Data Firehose Role (IAM)
# ------------------------------------------------------------#
  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: KinesisFirehoseRole
      Description: This role allows Kinesis Data Firehose to delivery logs into S3.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - firehose.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: KinesisFirehosePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "glue:GetTable"
                  - "glue:GetTableVersion"
                  - "glue:GetTableVersions"
                Resource:
                  - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog"
                  - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
                  - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "s3:AbortMultipartUpload"
                  - "s3:GetBucketLocation"
                  - "s3:GetObject"
                  - "s3:ListBucket"
                  - "s3:ListBucketMultipartUploads"
                  - "s3:PutObject"
                Resource: "*"
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                  - "lambda:GetFunctionConfiguration"
                Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "kms:GenerateDataKey"
                  - "kms:Decrypt"
                Resource:
                  - !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
                Condition:
                  StringEquals:
                    "kms:ViaService": !Sub "s3.${AWS::Region}.amazonaws.com"
                  StringLike:
                    "kms:EncryptionContext:aws:s3:arn":
                      - "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*"
                      - "arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "logs:PutLogEvents"
                Resource:
                  - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*"
                  - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*"
              - Effect: Allow
                Action:
                  - "kinesis:DescribeStream"
                  - "kinesis:GetShardIterator"
                  - "kinesis:GetRecords"
                  - "kinesis:ListShards"
                Resource:
                  - !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
              - Effect: Allow
                Action:
                  - "kms:Decrypt"
                Resource:
                  - !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"
                Condition:
                  StringEquals:
                    "kms:ViaService": !Sub "kinesis.${AWS::Region}.amazonaws.com"
                  StringLike:
                    "kms:EncryptionContext:aws:kinesis:arn": !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%"

# ------------------------------------------------------------#
# Kinesis Data Firehose delivery stream
# ------------------------------------------------------------#
  FirehoseStreamActivityLogs:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: UserActivityLogStreams
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: arn:aws:s3:::bucket_name
        Prefix: "userActivitylog/key=!{partitionKeyFromQuery:key1}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        ErrorOutputPrefix: "errorLog/!{firehose:error-output-type}/dt=!{timestamp:YYYY}-!{timestamp:MM}-!{timestamp:dd}/"
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseRole.Arn
        ProcessingConfiguration:
          Enabled: false
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams
          LogStreamName: S3Delivery
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{key1: .key1}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: "\\n"
    DependsOn: LogGroupFirehoseActivityLogs

# ------------------------------------------------------------#
# Kinesis Data Firehose Error LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupFirehoseActivityLogs:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/kinesisfirehose/UserActivityLogStreams
      RetentionInDays: 365

解説

以下の記述の部分が難儀した箇所です。

フォルダ分けのキーとなる key1 をまんま以下のように記述しました。デリミタは、このまま記述すればOKです。

ProcessingConfiguration:
  Enabled: true
  Processors:
    - Type: MetadataExtraction
      Parameters:
      - ParameterName: MetadataExtractionQuery
        ParameterValue: '{key1: .key1}'
      - ParameterName: JsonParsingEngine
        ParameterValue: JQ-1.6
    - Type: AppendDelimiterToRecord
      Parameters:
        - ParameterName: Delimiter
          ParameterValue: "\\n"

Dynamic Partitioning は、有効/無効の変更はできず、変更する際はリソースを作り直しになるので注意しましょう。

まとめ

いかがでしたでしょうか?

インターネット上にこれだ!という情報がなかったので書いてみました。みなさまのお役に立てれば幸いです。

タイトルとURLをコピーしました