Datadog API を使って Amazon EC2 インスタンスのリソースデータを取得してみる

本記事は TechHarmony Advent Calendar 12/4付の記事です

こんにちは、SCSK北村です。

Amazon EC2 インスタンスのリソースデータを取得するために、Datadog API を使ってみました。
結論として、期間を指定してCPUの使用率データをCSVファイルで出力することができました。
今回は、備忘も兼ねてファイル出力までの流れをまとめたいと思います。

事前準備

実行環境は以下の通りです。
OS:Windows
プログラミング言語:Python 3.12.0

Datadog API を使用するために必要なライブラリをインストールします。
対象:datadog-api-client
※Python3.7以上で利用可能

pip install datadog-api-client

 

実際にやったこと

基本的には下記リンク[※1]を見ながらスクリプトを作成していきました。

手順を追って紹介しますが、全体のコードをまず見たい方は読み飛ばしてください。

APIキーとAPPキーの準備

Datadog APIを利用する際、APIキーとAPPキーによる認証が必要になります。
今回、YAML形式で別ファイルに認証情報を記載し、プログラムから読み取っています。

datadog:
  api_key: <ここにAPI KEYを入力>
  app_key: <ここにAPP KEYを入力>

リクエストで送信するクエリを作成

リソースデータの取得には、TimeseriesFormulaQueryRequestメソッドを使用して、まずリクエストボディを作成します。
公式リファレンス[※1]にサンプルコードが掲載されているので、要件に合わせてボディをカスタマイズしていきます。

今回はCPU使用率データの取得ということで、以下のようにカスタマイズしました。
クエリを任意のものに書き換えれば、メモリ使用率やディスク使用率などのDatadogで収集しているデータは取得可能です!
Datadog のメトリクスエクスプローラーを使用すると、クエリを色々と試せるので便利です。

# リクエストで送信するクエリを定義
QUERY1 = 'avg:system.cpu.idle{name:'+target+'} by {name}' #EC2のNameタグ(target)でフィルタリング

# リクエストボディ
body = TimeseriesFormulaQueryRequest(
  data=TimeseriesFormulaRequest(
    attributes=TimeseriesFormulaRequestAttributes(
      formulas=[
        QueryFormula(
          formula='100 - q1',
          limit=FormulaLimit(
            count=10,
            order=QuerySortOrder.DESC,
          ),
        ),
      ],
      _from=time_from*1000, #データ取得の始点(UNIX時間)
      interval=300000, #データ取得間隔(ミリ秒)
      queries=TimeseriesFormulaRequestQueries(
        [
          MetricsTimeseriesQuery(
            data_source=MetricsDataSource.METRICS,
            query=QUERY1,
            name='q1',
          ),
        ]
      ),
      to=time_to*1000, #データ取得の終点(UNIX時間)
    ),
    type=TimeseriesFormulaRequestType.TIMESERIES_REQUEST,
  ),
)

Datadog APIへリクエスト送信

作成したリクエストボディをセットして、Datadog APIにリクエストを送信します。
その際に、前述の認証情報が必要なので、あわせてセットします。
問題なければ、response 変数にDatadog APIからのレスポンスデータが格納されます。

configuration = Configuration()
configuration.unstable_operations['query_timeseries_data'] = True
# Datadog API にリクエスト送信
with ApiClient(configuration) as api_client:
  api_client.default_headers['DD-API-KEY'] = api_key
  api_client.default_headers['DD-APPLICATION-KEY'] = app_key
  api_instance = MetricsApi(api_client)
  response = api_instance.query_timeseries_data(body=body)

レスポンスデータを整形してCSVに書き出す

まず、ファイルに書き込みたい情報をレスポンスデータから取得します。
データは、”<レスポンスデータのキー>.value”で取り出すことができました。

# ホスト名を取得
res_series = response['data']['attributes']['series'].value
group_tag = res_series[0]['group_tags'].value[0].split(':')
hostname = group_tag[-1]

# リソースデータのリストを取得
res_values = response['data']['attributes']['values'].value

# 時間データのリストを取得
res_times = response['data']['attributes']['times'].value

レスポンスデータに含まれる時間はUNIX時間なので、日本時間へ変換します。

# UNIX時間を日本時間に変換した結果を格納するリスト
timeseries = []

# UNIX時間を秒に変換 (ミリ秒から秒に変換)
for epoch_time in res_times:
  epoch_time_sec = epoch_time / 1000

  # UNIX時間を日本時間に変換
  jst = datetime.datetime.fromtimestamp(
    epoch_time_sec, 
    datetime.timezone(datetime.timedelta(hours=9))
  )

  # 指定された形式で日本時間をフォーマット
  jst_formatted = jst.strftime('%Y/%m/%d %H:%M')

  timeseries.append(jst_formatted)

複数のEC2インスタンスのデータを取得するので、インスタンス毎にファイルを分けて出力するようにしています。
CSVファイルの中身は、1列目が「時間」、2列目が「CPU使用率の値」としています。

# hostname ごとにデータをまとめる
grouped_data = {}
grouped_data[hostname] = {'values': res_values[0]}

# ファイル保存先
SAVE_DIR = f'result/{dt}/{TARGET_RESOURCE}/'
# SAVE_DIR が存在しなかったら作成
if not os.path.exists(SAVE_DIR):
  os.makedirs(SAVE_DIR)

# データをCSVファイルに書き込む
for group_data in grouped_data.values():
  values = group_data['values']

  filepath = SAVE_DIR + f'{hostname}_CPU.csv'
  with open(filepath, mode='w', newline='') as csvfile:
    writer = csv.writer(csvfile)

    for i in range(len(timeseries)):
      row = [timeseries[i], values.value[i]]
      writer.writerow(row)

作成したコード

  • cpu_util.py

今回作成したコードの全体です。

import os
import csv
import datetime
import logging.config
import yaml
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v2.api.metrics_api import MetricsApi
from datadog_api_client.v2.model.formula_limit import FormulaLimit
from datadog_api_client.v2.model.metrics_data_source import MetricsDataSource
from datadog_api_client.v2.model.metrics_timeseries_query import MetricsTimeseriesQuery
from datadog_api_client.v2.model.query_formula import QueryFormula
from datadog_api_client.v2.model.query_sort_order import QuerySortOrder
from datadog_api_client.v2.model.timeseries_formula_query_request import TimeseriesFormulaQueryRequest
from datadog_api_client.v2.model.timeseries_formula_request import TimeseriesFormulaRequest
from datadog_api_client.v2.model.timeseries_formula_request_attributes import TimeseriesFormulaRequestAttributes
from datadog_api_client.v2.model.timeseries_formula_request_queries import TimeseriesFormulaRequestQueries
from datadog_api_client.v2.model.timeseries_formula_request_type import TimeseriesFormulaRequestType

# 環境設定
CONFIG = 'config/credentials.yaml'
EC2_TARGETS = 'config/target_ec2_list.yaml'
TARGET_RESOURCE = 'cpu'
LOG_DIR = 'log/'
# LOG_DIR が存在しなかったら作成
if not os.path.exists(LOG_DIR):
  os.makedirs(LOG_DIR)

with open('config/log_config.yaml', 'r') as config_file:
  config = yaml.safe_load(config_file)

logging.config.dictConfig(config)
logger = logging.getLogger('cpu')

def get_cpu_utilization(api_key, app_key, time_from, time_to, target, dt):
  try:
    # リクエストで送信するクエリを定義
    QUERY1 = 'avg:system.cpu.idle{name:'+target+'} by {name}'
    
    # リクエストボディ
    body = TimeseriesFormulaQueryRequest(
      data=TimeseriesFormulaRequest(
        attributes=TimeseriesFormulaRequestAttributes(
          formulas=[
            QueryFormula(
              formula='100 - q1',
              limit=FormulaLimit(
                count=10,
                order=QuerySortOrder.DESC,
              ),
            ),
          ],
          _from=time_from*1000,
          interval=300000,
          queries=TimeseriesFormulaRequestQueries(
            [
              MetricsTimeseriesQuery(
                data_source=MetricsDataSource.METRICS,
                query=QUERY1,
                name='q1',
              ),
            ]
          ),
          to=time_to*1000,
        ),
        type=TimeseriesFormulaRequestType.TIMESERIES_REQUEST,
      ),
    )
    configuration = Configuration()
    configuration.unstable_operations['query_timeseries_data'] = True

    # Datadog API にリクエスト送信
    with ApiClient(configuration) as api_client:
      api_client.default_headers['DD-API-KEY'] = api_key
      api_client.default_headers['DD-APPLICATION-KEY'] = app_key
      api_instance = MetricsApi(api_client)
      response = api_instance.query_timeseries_data(body=body)

    # ホスト名を取得
    res_series = response['data']['attributes']['series'].value
    group_tag = res_series[0]['group_tags'].value[0].split(':')
    hostname = group_tag[-1]

    # リソースデータのリストを取得
    res_values = response['data']['attributes']['values'].value

    # 時間データのリストを取得
    res_times = response['data']['attributes']['times'].value

    # UNIX時間を日本時間に変換した結果を格納するリスト
    timeseries = []

    # UNIX時間を秒に変換 (ミリ秒から秒に変換)
    for epoch_time in res_times:
      epoch_time_sec = epoch_time / 1000

    # UNIX時間を日本時間に変換
    jst = datetime.datetime.fromtimestamp(
      epoch_time_sec, 
      datetime.timezone(datetime.timedelta(hours=9))
      )

    # 指定された形式で日本時間をフォーマット
    jst_formatted = jst.strftime('%Y/%m/%d %H:%M')
    timeseries.append(jst_formatted)

    # hostname ごとにデータをまとめる
    grouped_data = {}
    grouped_data[hostname] = {'values': res_values[0]}

    # ファイル保存先
    SAVE_DIR = f'result/{dt}/{TARGET_RESOURCE}/'
    # SAVE_DIR が存在しなかったら作成
    if not os.path.exists(SAVE_DIR):
      os.makedirs(SAVE_DIR)

    # データをCSVファイルに書き込む
    for group_data in grouped_data.values():
      values = group_data['values'] 

      # ファイル保存先パス・ファイル名
      filepath = SAVE_DIR + f'{hostname}_CPU.csv'
      with open(filepath, mode='w', newline='') as csvfile:
        writer = csv.writer(csvfile)

        for i in range(len(timeseries)):
          row = [timeseries[i], values.value[i]]
          writer.writerow(row)

        logger.info(f"データが書き込まれました: '{filepath}'")

  except Exception as e:
    # 例外処理
    logger.error(f'予期せぬエラーが発生しました: {str(e)}')
  else:
    # 処理成功
    logger.info(f'[{target}] の処理が完了しました')

def main():
  try:
    logger.info(f'リソースデータ[{TARGET_RESOURCE}] の取得を開始します')
    # 設定ファイルを読み込む
    with open(CONFIG, 'r') as config_file:
      config = yaml.safe_load(config_file)

    # APIキーとAPPキーを取得
      api_key = config['datadog']['api_key']
      app_key = config['datadog']['app_key']

    # 設定ファイルを読み込む
    with open(EC2_TARGETS, 'r') as ec2_file:
      ec2 = yaml.safe_load(ec2_file)

    # データ取得する対象を読み込む
    target_list = ec2['hostname']

    # 現在の日付を取得
    today = datetime.date.today()

    # 当月の初日を計算
    first_day_of_current_month = datetime.datetime(today.year, today.month, 1)

    # 前月の初日を計算
    if today.month == 1:
      # 1月の場合、前年の12月の初日になる
      first_day_of_last_month = datetime.datetime(today.year - 1, 12, 1)
      dt = str(today.year - 1) + str(12) # for 'SAVE_DIR' path
    else:
      # それ以外の場合、前月の初日になる
      first_day_of_last_month = datetime.datetime(today.year, today.month - 1, 1)
      dt = str(today.year) + str(today.month - 1) # for 'SAVE_DIR' path

    # UNIX時間に変換
    time_from = int(first_day_of_last_month.timestamp()) # 前月初日
    time_to = int(first_day_of_current_month.timestamp()) # 当月初日

    # 対象毎にループ処理
    for target in target_list:
      logger.info(f'[{target}] の処理を開始します')
      get_cpu_utilization(api_key, app_key, time_from, time_to, target, dt)

  except FileNotFoundError as e:
    # ファイルが存在しない場合の例外処理
    logger.error(f'設定ファイルが見つかりません: {str(e)}')

  except IOError as e:
    # その他の入出力関連のエラーに対する例外処理
    logger.error(f'入出力エラーが発生しました: {str(e)}')

  except Exception as e:
    # その他の例外に対する例外処理
    logger.error(f'予期せぬエラーが発生しました: {str(e)}')

  else:
    # 処理成功
    logger.info(f'リソースデータ[{TARGET_RESOURCE}] の取得が完了しました')

if __name__ == '__main__':
  logger.info('START SCRIPT')
  main()
  logger.info('END SCRIPT')
  • credentials.yaml

Datadog APIにリクエストを送信する際の認証情報をここに記載します。

datadog:
 api_key: <ここにAPI KEYを入力>
 app_key: <ここにAPP KEYを入力>
  • log_config.yaml

loggingモジュールでログを出力させるための設定ファイルです。
ターミナルとファイルのそれぞれに出力できるように記載しています。

version: 1
formatters:
  simple_fmt:
    format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    datefmt: '%Y/%m/%d %H:%M:%S'
handlers:
  console:
    class: logging.StreamHandler
    level: DEBUG
    formatter: simple_fmt
    stream: ext://sys.stdout
  file:
    class: logging.handlers.TimedRotatingFileHandler
    level: DEBUG
    formatter: simple_fmt
    filename: log/app.log
    encoding: utf-8
    when: midnight
    interval: 30
    backupCount: 3
loggers:
  cpu:
    level: INFO
    handlers: [console, file]
    propagate: no
  root:
    level: DEBUG
    handlers: [console, file]
  • target_ec2_list.yaml

データ取得対象のEC2インスタンスをこのファイルに記載し、プログラムから取得しています。

# EC2インスタンスのNameタグの値を以下に記載
hostname:
  - server01
  - server02
  - server03
  • 出力ファイルのサンプル

今回作成したプログラムでは以下のように出力されます。
プログラムを実行した日付から、前月1か月分のCPU使用率が5分間隔で取得することができました。

2023/10/01 00:00,9.209855
2023/10/01 00:05,8.721778
2023/10/01 00:10,8.922941
2023/10/01 00:15,8.763545
2023/10/01 00:20,8.784494
2023/10/01 00:25,8.795538
2023/10/01 00:30,8.714313
2023/10/01 00:35,8.817114
2023/10/01 00:40,8.730093
2023/10/01 00:45,8.940528
2023/10/01 00:50,8.863131
2023/10/01 00:55,8.967478
・・・

あとがき

運用業務の中でサーバリソースの使用状況を確認する場面が出てくると思います。
取得するサーバ台数やメトリクスが多くなると、コンソールのGUI操作でデータ取得作業は結構手間がかかりますよね。
今回、Datadog APIを使用して自動化ができたことで、この運用がかなり楽になりました。
今後はAWS Lambdaへの移行も検討していきたいと考えています。

本記事が皆様のお役に立てれば幸いです。

タイトルとURLをコピーしました