データベース技術調査ブログ

LinuxやPostgreSQL、Oracleデータベース、AWSの知識をアウトプットしていきます

【AWS/Kinesis/DMS/KCL】KinesisとCDCを利用したストリーミング処理の構築チュートリアル

GWで時間があったので、少しストリーミング処理基盤に関して勉強してみました。とりあえず動く環境を作ることを一つのゴールにしてCloudformationの勉強もかねてサンプルのアプリケーションを作ってみました。一部、不出来なところは多いですが、細かいところは気にせずにとりあえず動くものを作ってみたい人は利用してみてください。

CloudformationのファイルやコードなどはGitHubに上げたので、利用してみてください。
https://github.com/jimatomo/dms-kinesis-kcl

AWS CLIを実行できる環境が必要です。(Bashで実行できること)

チュートリアルの環境

f:id:jimatomo:20210509131353p:plain
ストリーミング処理基盤チュートリアル環境

データベースの更新を検知して自動でターゲットへ加工処理したデータを連携するデータパイプラインをイメージして作ってみています。
Kinesisかますことで複数の加工処理を一つの変更データに対して実施することができます。(そうでなければ、単純なレプリケーションの方が安定します!)

技術要素

技術要素としては以下です。

Aurora PostgreSQL

データの発生元と連携先を兼ねています。(※お金節約のため、同じデータベースに書き戻すようにしています。)

AWS Data Migration Service(略してDMS)

Auroraの更新を検知してKinesisにデータを連携するための変更データキャプチャ(CDC)のタスクを実行してもらいます。(触ってみるとわかりますが、ちょっとデータ連携元を準備してあげるだけで、CDCの設定は全てマネージドでやってくれるので最高に便利です。さすがAWS

Kinesis Data Streams

ストリーミング処理のデータストアとして利用します。Apache Kafkaと同じようなサービスですが、完全にサーバレスなので、管理が楽です。

Kinesis Client Library for Python (略してKCL)

Kinesis Data Streamsからデータを取得するアプリケーション(コンシューマー)を作成するためのライブラリです。今回は私がPythonの方が楽にコード書けるので、Pythonバージョンを利用しています。
https://github.com/awslabs/amazon-kinesis-client-python

EC2 (Cloud9) 上でKCLのアプリケーションを実行します。

MultiLangDaemonというJavaの実行環境と標準入力と標準出力でやりとりしている(?)ので、やや取り回しがめんどくさかったのが残念ポイントでした。

以下の記事を参考にしてデバッグしていました。大変だった…
stackoverflow.com

他にもやりようはあるようです。
github.com


最後に

リファクタリングする要素としては、DBの接続情報のホスト名のところもSSM Parameter Storeに登録して参照させたりしたいなというところです。
後、Lambdaでコンシューマーを実行できるようになるといいなと思っているので、今後時間があればやってみようかな~と思います。
(たぶんこっちの方は結構情報落ちていそうだし、実装も楽そうなイメージ)