Sitemap

The Finatext Tech Blog

THE Finatext Tech Blog

Press enter or click to view image in full size

Athena+Embulk+BigQueryによるアプリケーションログの分析環境構築

13 min readMar 18, 2021

--

はじめに

こんにちは、Finatextで証券プラットフォーム(Brokerage as a Service、以下BaaS)の開発に携わっている石橋(@bashi0501)です。過去のFinatextテックブログではTerraform、CDKとIaCをテーマにした記事しか書いたことがなかったのですが、今回はログの分析活用をテーマとします。

概要

弊社の証券事業ではECSによるワークロードを組んでいます。本テーマのアプリケーションログについては標準出力したものをawslogsログドライバーが回収してCloudWatch Logsに送信しています。
ログの検索という観点ではCloudWatch Logs Insightsというサービスでかなりリッチにフィルターや集計を行うことができるのですが、ログデータを元にしたユーザーのファネル分析や業務改善(後述します)に活かしていきたいという意図があるため、マーケや業務、カスタマーサポートといったチームで分析できる必要があります。
また弊社ではGoogle Workspaceを利用しており、BigQueryをデータレイク・DWHとして活用しているため、上記目的のためにBigQuery環境へのデータロードを最終目標とします。
すでにBigQueryへのデータロードは、EmbulkをAWS環境にて実行することで行っているため、特段の制約がない限り、同様にEmbulkを使っていきます。
その過程でAWS環境内部にて他のサービスとの連携を考えるとAthenaで検索をかけられるものにしておきたいという意図もあり、中間地点としてS3+Athenaを採用しています。
これらの前提を踏まえて、本ブログでは、CloudWatch Logsへ送信されるアプリケーションログをどのようにしてBigQueryへロードしているかについて紹介します。(予め断っておくとBigQueryへのリアルタイムでのロードは行っていません。いつかそのニーズが出てきて構築した暁にはブログにするかも)

早速の結論

早速ですが、どう構成しているかを簡潔に書くと以下のようになります。

  • CloudWatch LogsサブスクリプションフィルタによりKinesis Data Firehoseにリアルタイムでログイベントを送信する
  • Firehoseで一定のバッファリングの後にS3へパーティショニングをしながら蓄積する
  • S3に蓄積されたログデータからGlue Crawlerを使ってデータカタログを作成する
  • 作成したデータカタログに対してAthenaでクエリをかけ、その結果をcsvでS3に配置してEmbulkでBigQueryへロードする

各部分の詳細、その背景にある課題と解決方法について掘り下げていきます。

詳細・課題と解決方法

ログのフォーマットや仕様をどうすればよいか

CloudWatch LogsからBigQueryへ、という本題とは離れますが、そもそもログをどういったフォーマットで、どんな仕様で構築するのが良いか、という悩ましい問題があると思います。
JSONフォーマットで出力するというところはサクッと決めましたが、仕様をどうするのが良いかについては解のない問題で長い年月モヤモヤとしていました。例えば、JSONのルートKeyは何にすればよいか、ログごとに詳細データを詰めたい場合はどうすればよいのか、その詳細データの中にも本来ルートKeyに昇格すべきものがあるのではないか等。そうすると結局運用していく中でより良いものにしていくしかないか、という結論になってしまいます。(それはそれで良いと思いますが

そんなときに知ったのがCloudEventsというイベントデータの標準仕様を策定するというプロジェクトでした。このCloudEventsはCNCFのincubatorプロジェクトとなっています。そのため各種SaaSやクラウドサービスがWebhookで送ってくるイベントデータも長期的にはこの仕様になっていくのかなという期待もありつつ、「まぁこれでよいか」という解のない悩みの探索にも諦めがついたため、CloudEventsの仕様に基づいてログを出力するようにしました。(infoやerrorなどのログレベルも勝手に追加でルートKeyに置いていたりはします)
またOSSなので嬉しいことに主要な言語のSDKも開発されてたりします。

CloudWatch Logs -> Kinesis Data Firehose

ここは単純にCloudWatch Logsサブスクリプションフィルターを使っています。(一つのロググループに設定できるフィルター数には上限があるので要注意です)
Firehoseへ送信されるログイベントはメタデータも付与されてしまうためKinesis Data Firehose Lambda blueprintsを使ってデータ変換を行います。参照

S3への転送にはFirehoseの標準設定を用いています。またFirehoseの機能としてS3上のパーティション構成も行っています。ただしログイベントのTimestampではなく、FirehoseのArrivalTimestampに基づくため、その部分にシビアな要件がある場合は別の方法を検討ください

余談ですが、SQL-likeではないものの、CloudWatch Logs Insightsを使うことでかなりリッチにログデータの検索・集計等ができます。この辺りの使い分けについては、CloudWatch Logs Insightsは分析ではなく調査を目的として活用し、上記のFirehoseにてログデータの長期保存はS3にて行っているという前提でロググループにはretentionを設定するのが良いかなと思っています。そのretention期間はInsightsによる調査にはどれくらいの期間のものが残っていれば十分そうかで判断すると良いと思います。
またサブスクリプションフィルターではフィルター条件を設定するのが良いかどうかについては特に絞ることなく全量をS3に蓄積するのが良いかなと。後述するAthenaでクエリをかける上でスキャンデータサイズの関係で不都合が生じてきた時にAWS Glue Jobにて別のS3バケットに抽出し、そのバケットにクエリをかけるようにすればよく、後から解決できる問題かなと思います。

Glue Crawlerでデータカタログを作成・更新

そのままです。Glue Triggerでcron実行できます。Glue Jobも実行したい場合はGlue Workflowで依存関係もたせて実行もできます。

Athena -> BigQuery

前述したとおり、すでにEmbulkを使ったBigQueryへのデータロードをいくつか組んでいます。Fargateを使ったバッチ処理です。Embulkを通すことに強い要件があるわけではないですが、ここでもEmbulkを使ってみます。
このワークロードを組む上での制約として一つあるのが「IAM roleによってFargate taskにアクセス権限を付与すること」というものです。
その点を踏まえてembulk-input-athenaを見てみるとaccess keyとsecret keyによる認証方式しかありません。同様にembulk-input-s3も見てみるとassume roleによる方式がありません。一時的なsession tokenによる方式はサポートされています。
最終的にここをどうしたかと言うと、以下の手順を採用しました。

  • Fargateのタスクロールに自分自身をassumeできるようにpolicyを付けておく
  • shellでassume-roleして環境変数にaccess key、secret access key、session tokenをsetする
  • Liquidテンプレートエンジンで一時的なsession tokenによる方式でEmbulkからS3へアクセスできるようにしておく
  • shellにてaws athena start-query-executionを実行し、aws athena get-query-executionで実行結果を待つ
  • 完了したら結果が配置されたS3オブジェクトの情報を環境変数にsetし、上と同様にEmbulkからロードできるようにする

IAM role policyは以下のような感じです。

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::111122223333:role/your_self_role"
},
...

Fargateタスクが実行するshellの概要は以下です

ASSUME_RES=$(aws sts assume-role \
--role-arn ${ASSUME_ROLE_ARN} \
--role-session-name ${ASSUME_SESSION_NAME} \
--query '[Credentials.AccessKeyId, Credentials.SecretAccessKey, Credentials.SessionToken]' \
--output text)
export EMBULK_ACCESS_KEY_ID=$(echo $ASSUME_RES | cut -d ' ' -f 1)
export EMBULK_SECRET_ACCESS_KEY=$(echo $ASSUME_RES | cut -d ' ' -f 2)
export EMBULK_SESSION_TOKEN=$(echo $ASSUME_RES | cut -d ' ' -f 3)
// Athenaクエリを実行
QUERY_ID=$(aws athena start-query-execution \
--query-string \
"
SELECT hoge, fuga
FROM
<your table>
" \
--work-group ${ATHENA_WORK_GROUP} \
--result-configuration OutputLocation="s3://${RESULT_BACKET}/${RESULT_PATH}" \
--query-execution-context Database=<your database>,Catalog=AwsDataCatalog \
| jq -r '.QueryExecutionId')
// QUERY_IDを使ってathena get-query-executionにて処理状態確認export EMBULK_S3_PATH="${RESULT_PATH}/${QUERY_ID}.csv"// embulkバイナリの実行

EmbulkのLiquidはこんな感じ

in:
type: s3
bucket: {{ env.EMBULK_S3_BUCKET }}
path: {{ env.EMBULK_S3_PATH }}
auth_method: session
access_key_id: {{ env.EMBULK_ACCESS_KEY_ID }}
secret_access_key: {{ env.EMBULK_SECRET_ACCESS_KEY }}
session_token: {{ env.EMBULK_SESSION_TOKEN }}

あとはこのFargateタスクをEventBridgeから実行します。また必要に応じてshellの中でpartition tableの処理もします。

基本的には以上なのですが、一つ課題が残っています。CloudEventsのdataプロパティにはドメイン特有の情報が詰められて、Glue Crawlerにて動的にデータカタログのスキーマがStruct型にて更新されていきます。ただこのStruct型は単純にSELECTすると問題が生じます。例えばdata(structid:string,name:string)とスキーマ定義されているdataをSELECTすると、"id=test, name=test"といった形式で値が取得されます。
これをシリアライズされたJSON文字列にしたく、さらにスキーマが動的に更新されるのに合わせて、structから動的に展開するというのが理想でした。しかし、これをクエリにてうまくする方法がなかったため、現在は愚直にdata.id, data.nameのような形でクエリの上で展開しています。

最後に

今回作ったアプリケーションログによるイベントデータの分析環境はユーザーのファネル分析などに使うというのはわかりやすい例なのですが、他にも社内業務に関しても使えます。
証券事業だと、口座開設の審査フローというのが複数のステップが依存関係を持ちつつ組まれています。エンドユーザーだけでなく、社内業務のイベントもこの分析環境に乗せていくことで口座開設が完了するまでのリードタイムの中でどの業務ステップに時間がかかっているのか、といったことも可視化できるようになってきます。昨今流行りの「DXによる業務改善」にも役立ちそうですね。

実は課題については前述したstruct型のクエリ以外にもあります。それはすでに構築・稼働済みのサービスにこの仕組みをじわじわと広げていくところです。なのでFinatext では一緒に証券、保険やソリューション事業に一緒に取り組んでいただけるエンジニアを募集しております!弊社の開発体制などについて、もっと詳しく聞いてみたい方のために、3/29 の 19:00よりオンラインでの会社説明会を行います。気になる方はぜひ参加してみてください!

また3/22 の 19:00よりMeetup特別編と称して「フィンテックエンジニアの家計と資産管理」をテーマとしたイベントも行います。こちらでもFinatextのエンジニアの雰囲気がわかるのはもちろん、シンプルにためになるお金の話が聞けるかも!?

採用サイトはこちら

--

--

No responses yet