本記事は TechHarmony Advent Calendar 12/4付の記事です。 |
こんにちは、SCSK北村です。
Amazon EC2 インスタンスのリソースデータを取得するために、Datadog API を使ってみました。
結論として、期間を指定してCPUの使用率データをCSVファイルで出力することができました。
今回は、備忘も兼ねてファイル出力までの流れをまとめたいと思います。
事前準備
実行環境は以下の通りです。
OS:Windows
プログラミング言語:Python 3.12.0
Datadog API を使用するために必要なライブラリをインストールします。
対象:datadog-api-client
※Python3.7以上で利用可能
pip install datadog-api-client
実際にやったこと
基本的には下記リンク[※1]を見ながらスクリプトを作成していきました。
手順を追って紹介しますが、全体のコードをまず見たい方は読み飛ばしてください。
APIキーとAPPキーの準備
Datadog APIを利用する際、APIキーとAPPキーによる認証が必要になります。
今回、YAML形式で別ファイルに認証情報を記載し、プログラムから読み取っています。
datadog: api_key: <ここにAPI KEYを入力> app_key: <ここにAPP KEYを入力>
リクエストで送信するクエリを作成
リソースデータの取得には、TimeseriesFormulaQueryRequestメソッドを使用して、まずリクエストボディを作成します。
公式リファレンス[※1]にサンプルコードが掲載されているので、要件に合わせてボディをカスタマイズしていきます。
今回はCPU使用率データの取得ということで、以下のようにカスタマイズしました。
クエリを任意のものに書き換えれば、メモリ使用率やディスク使用率などのDatadogで収集しているデータは取得可能です!
Datadog のメトリクスエクスプローラーを使用すると、クエリを色々と試せるので便利です。
# リクエストで送信するクエリを定義 QUERY1 = 'avg:system.cpu.idle{name:'+target+'} by {name}' #EC2のNameタグ(target)でフィルタリング # リクエストボディ body = TimeseriesFormulaQueryRequest( data=TimeseriesFormulaRequest( attributes=TimeseriesFormulaRequestAttributes( formulas=[ QueryFormula( formula='100 - q1', limit=FormulaLimit( count=10, order=QuerySortOrder.DESC, ), ), ], _from=time_from*1000, #データ取得の始点(UNIX時間) interval=300000, #データ取得間隔(ミリ秒) queries=TimeseriesFormulaRequestQueries( [ MetricsTimeseriesQuery( data_source=MetricsDataSource.METRICS, query=QUERY1, name='q1', ), ] ), to=time_to*1000, #データ取得の終点(UNIX時間) ), type=TimeseriesFormulaRequestType.TIMESERIES_REQUEST, ), )
Datadog APIへリクエスト送信
作成したリクエストボディをセットして、Datadog APIにリクエストを送信します。
その際に、前述の認証情報が必要なので、あわせてセットします。
問題なければ、response 変数にDatadog APIからのレスポンスデータが格納されます。
configuration = Configuration() configuration.unstable_operations['query_timeseries_data'] = True # Datadog API にリクエスト送信 with ApiClient(configuration) as api_client: api_client.default_headers['DD-API-KEY'] = api_key api_client.default_headers['DD-APPLICATION-KEY'] = app_key api_instance = MetricsApi(api_client) response = api_instance.query_timeseries_data(body=body)
レスポンスデータを整形してCSVに書き出す
まず、ファイルに書き込みたい情報をレスポンスデータから取得します。
データは、”<レスポンスデータのキー>.value”で取り出すことができました。
# ホスト名を取得 res_series = response['data']['attributes']['series'].value group_tag = res_series[0]['group_tags'].value[0].split(':') hostname = group_tag[-1] # リソースデータのリストを取得 res_values = response['data']['attributes']['values'].value # 時間データのリストを取得 res_times = response['data']['attributes']['times'].value
レスポンスデータに含まれる時間はUNIX時間なので、日本時間へ変換します。
# UNIX時間を日本時間に変換した結果を格納するリスト timeseries = [] # UNIX時間を秒に変換 (ミリ秒から秒に変換) for epoch_time in res_times: epoch_time_sec = epoch_time / 1000 # UNIX時間を日本時間に変換 jst = datetime.datetime.fromtimestamp( epoch_time_sec, datetime.timezone(datetime.timedelta(hours=9)) ) # 指定された形式で日本時間をフォーマット jst_formatted = jst.strftime('%Y/%m/%d %H:%M') timeseries.append(jst_formatted)
複数のEC2インスタンスのデータを取得するので、インスタンス毎にファイルを分けて出力するようにしています。
CSVファイルの中身は、1列目が「時間」、2列目が「CPU使用率の値」としています。
# hostname ごとにデータをまとめる grouped_data = {} grouped_data[hostname] = {'values': res_values[0]} # ファイル保存先 SAVE_DIR = f'result/{dt}/{TARGET_RESOURCE}/' # SAVE_DIR が存在しなかったら作成 if not os.path.exists(SAVE_DIR): os.makedirs(SAVE_DIR) # データをCSVファイルに書き込む for group_data in grouped_data.values(): values = group_data['values'] filepath = SAVE_DIR + f'{hostname}_CPU.csv' with open(filepath, mode='w', newline='') as csvfile: writer = csv.writer(csvfile) for i in range(len(timeseries)): row = [timeseries[i], values.value[i]] writer.writerow(row)
作成したコード
- cpu_util.py
今回作成したコードの全体です。
import os import csv import datetime import logging.config import yaml from datadog_api_client import ApiClient, Configuration from datadog_api_client.v2.api.metrics_api import MetricsApi from datadog_api_client.v2.model.formula_limit import FormulaLimit from datadog_api_client.v2.model.metrics_data_source import MetricsDataSource from datadog_api_client.v2.model.metrics_timeseries_query import MetricsTimeseriesQuery from datadog_api_client.v2.model.query_formula import QueryFormula from datadog_api_client.v2.model.query_sort_order import QuerySortOrder from datadog_api_client.v2.model.timeseries_formula_query_request import TimeseriesFormulaQueryRequest from datadog_api_client.v2.model.timeseries_formula_request import TimeseriesFormulaRequest from datadog_api_client.v2.model.timeseries_formula_request_attributes import TimeseriesFormulaRequestAttributes from datadog_api_client.v2.model.timeseries_formula_request_queries import TimeseriesFormulaRequestQueries from datadog_api_client.v2.model.timeseries_formula_request_type import TimeseriesFormulaRequestType # 環境設定 CONFIG = 'config/credentials.yaml' EC2_TARGETS = 'config/target_ec2_list.yaml' TARGET_RESOURCE = 'cpu' LOG_DIR = 'log/' # LOG_DIR が存在しなかったら作成 if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) with open('config/log_config.yaml', 'r') as config_file: config = yaml.safe_load(config_file) logging.config.dictConfig(config) logger = logging.getLogger('cpu') def get_cpu_utilization(api_key, app_key, time_from, time_to, target, dt): try: # リクエストで送信するクエリを定義 QUERY1 = 'avg:system.cpu.idle{name:'+target+'} by {name}' # リクエストボディ body = TimeseriesFormulaQueryRequest( data=TimeseriesFormulaRequest( attributes=TimeseriesFormulaRequestAttributes( formulas=[ QueryFormula( formula='100 - q1', limit=FormulaLimit( count=10, order=QuerySortOrder.DESC, ), ), ], _from=time_from*1000, interval=300000, queries=TimeseriesFormulaRequestQueries( [ MetricsTimeseriesQuery( data_source=MetricsDataSource.METRICS, query=QUERY1, name='q1', ), ] ), to=time_to*1000, ), type=TimeseriesFormulaRequestType.TIMESERIES_REQUEST, ), ) configuration = Configuration() configuration.unstable_operations['query_timeseries_data'] = True # Datadog API にリクエスト送信 with ApiClient(configuration) as api_client: api_client.default_headers['DD-API-KEY'] = api_key api_client.default_headers['DD-APPLICATION-KEY'] = app_key api_instance = MetricsApi(api_client) response = api_instance.query_timeseries_data(body=body) # ホスト名を取得 res_series = response['data']['attributes']['series'].value group_tag = res_series[0]['group_tags'].value[0].split(':') hostname = group_tag[-1] # リソースデータのリストを取得 res_values = response['data']['attributes']['values'].value # 時間データのリストを取得 res_times = response['data']['attributes']['times'].value # UNIX時間を日本時間に変換した結果を格納するリスト timeseries = [] # UNIX時間を秒に変換 (ミリ秒から秒に変換) for epoch_time in res_times: epoch_time_sec = epoch_time / 1000 # UNIX時間を日本時間に変換 jst = datetime.datetime.fromtimestamp( epoch_time_sec, datetime.timezone(datetime.timedelta(hours=9)) ) # 指定された形式で日本時間をフォーマット jst_formatted = jst.strftime('%Y/%m/%d %H:%M') timeseries.append(jst_formatted) # hostname ごとにデータをまとめる grouped_data = {} grouped_data[hostname] = {'values': res_values[0]} # ファイル保存先 SAVE_DIR = f'result/{dt}/{TARGET_RESOURCE}/' # SAVE_DIR が存在しなかったら作成 if not os.path.exists(SAVE_DIR): os.makedirs(SAVE_DIR) # データをCSVファイルに書き込む for group_data in grouped_data.values(): values = group_data['values'] # ファイル保存先パス・ファイル名 filepath = SAVE_DIR + f'{hostname}_CPU.csv' with open(filepath, mode='w', newline='') as csvfile: writer = csv.writer(csvfile) for i in range(len(timeseries)): row = [timeseries[i], values.value[i]] writer.writerow(row) logger.info(f"データが書き込まれました: '{filepath}'") except Exception as e: # 例外処理 logger.error(f'予期せぬエラーが発生しました: {str(e)}') else: # 処理成功 logger.info(f'[{target}] の処理が完了しました') def main(): try: logger.info(f'リソースデータ[{TARGET_RESOURCE}] の取得を開始します') # 設定ファイルを読み込む with open(CONFIG, 'r') as config_file: config = yaml.safe_load(config_file) # APIキーとAPPキーを取得 api_key = config['datadog']['api_key'] app_key = config['datadog']['app_key'] # 設定ファイルを読み込む with open(EC2_TARGETS, 'r') as ec2_file: ec2 = yaml.safe_load(ec2_file) # データ取得する対象を読み込む target_list = ec2['hostname'] # 現在の日付を取得 today = datetime.date.today() # 当月の初日を計算 first_day_of_current_month = datetime.datetime(today.year, today.month, 1) # 前月の初日を計算 if today.month == 1: # 1月の場合、前年の12月の初日になる first_day_of_last_month = datetime.datetime(today.year - 1, 12, 1) dt = str(today.year - 1) + str(12) # for 'SAVE_DIR' path else: # それ以外の場合、前月の初日になる first_day_of_last_month = datetime.datetime(today.year, today.month - 1, 1) dt = str(today.year) + str(today.month - 1) # for 'SAVE_DIR' path # UNIX時間に変換 time_from = int(first_day_of_last_month.timestamp()) # 前月初日 time_to = int(first_day_of_current_month.timestamp()) # 当月初日 # 対象毎にループ処理 for target in target_list: logger.info(f'[{target}] の処理を開始します') get_cpu_utilization(api_key, app_key, time_from, time_to, target, dt) except FileNotFoundError as e: # ファイルが存在しない場合の例外処理 logger.error(f'設定ファイルが見つかりません: {str(e)}') except IOError as e: # その他の入出力関連のエラーに対する例外処理 logger.error(f'入出力エラーが発生しました: {str(e)}') except Exception as e: # その他の例外に対する例外処理 logger.error(f'予期せぬエラーが発生しました: {str(e)}') else: # 処理成功 logger.info(f'リソースデータ[{TARGET_RESOURCE}] の取得が完了しました') if __name__ == '__main__': logger.info('START SCRIPT') main() logger.info('END SCRIPT')
- credentials.yaml
Datadog APIにリクエストを送信する際の認証情報をここに記載します。
datadog: api_key: <ここにAPI KEYを入力> app_key: <ここにAPP KEYを入力>
- log_config.yaml
loggingモジュールでログを出力させるための設定ファイルです。
ターミナルとファイルのそれぞれに出力できるように記載しています。
version: 1 formatters: simple_fmt: format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" datefmt: '%Y/%m/%d %H:%M:%S' handlers: console: class: logging.StreamHandler level: DEBUG formatter: simple_fmt stream: ext://sys.stdout file: class: logging.handlers.TimedRotatingFileHandler level: DEBUG formatter: simple_fmt filename: log/app.log encoding: utf-8 when: midnight interval: 30 backupCount: 3 loggers: cpu: level: INFO handlers: [console, file] propagate: no root: level: DEBUG handlers: [console, file]
- target_ec2_list.yaml
データ取得対象のEC2インスタンスをこのファイルに記載し、プログラムから取得しています。
# EC2インスタンスのNameタグの値を以下に記載 hostname: - server01 - server02 - server03
- 出力ファイルのサンプル
今回作成したプログラムでは以下のように出力されます。
プログラムを実行した日付から、前月1か月分のCPU使用率が5分間隔で取得することができました。
2023/10/01 00:00,9.209855 2023/10/01 00:05,8.721778 2023/10/01 00:10,8.922941 2023/10/01 00:15,8.763545 2023/10/01 00:20,8.784494 2023/10/01 00:25,8.795538 2023/10/01 00:30,8.714313 2023/10/01 00:35,8.817114 2023/10/01 00:40,8.730093 2023/10/01 00:45,8.940528 2023/10/01 00:50,8.863131 2023/10/01 00:55,8.967478 ・・・
あとがき
運用業務の中でサーバリソースの使用状況を確認する場面が出てくると思います。
取得するサーバ台数やメトリクスが多くなると、コンソールのGUI操作でデータ取得作業は結構手間がかかりますよね。
今回、Datadog APIを使用して自動化ができたことで、この運用がかなり楽になりました。
今後はAWS Lambdaへの移行も検討していきたいと考えています。
本記事が皆様のお役に立てれば幸いです。