【AWS/Kinesis/DMS/KCL】KinesisとCDCを利用したストリーミング処理の構築チュートリアル
GWで時間があったので、少しストリーミング処理基盤に関して勉強してみました。とりあえず動く環境を作ることを一つのゴールにしてCloudformationの勉強もかねてサンプルのアプリケーションを作ってみました。一部、不出来なところは多いですが、細かいところは気にせずにとりあえず動くものを作ってみたい人は利用してみてください。
CloudformationのファイルやコードなどはGitHubに上げたので、利用してみてください。
https://github.com/jimatomo/dms-kinesis-kcl
※AWS CLIを実行できる環境が必要です。(Bashで実行できること)
チュートリアルの環境
データベースの更新を検知して自動でターゲットへ加工処理したデータを連携するデータパイプラインをイメージして作ってみています。
Kinesisをかますことで複数の加工処理を一つの変更データに対して実施することができます。(そうでなければ、単純なレプリケーションの方が安定します!)
技術要素
技術要素としては以下です。
Aurora PostgreSQL
データの発生元と連携先を兼ねています。(※お金節約のため、同じデータベースに書き戻すようにしています。)
AWS Data Migration Service(略してDMS)
Auroraの更新を検知してKinesisにデータを連携するための変更データキャプチャ(CDC)のタスクを実行してもらいます。(触ってみるとわかりますが、ちょっとデータ連携元を準備してあげるだけで、CDCの設定は全てマネージドでやってくれるので最高に便利です。さすがAWS)
Kinesis Client Library for Python (略してKCL)
Kinesis Data Streamsからデータを取得するアプリケーション(コンシューマー)を作成するためのライブラリです。今回は私がPythonの方が楽にコード書けるので、Pythonバージョンを利用しています。
https://github.com/awslabs/amazon-kinesis-client-python
EC2 (Cloud9) 上でKCLのアプリケーションを実行します。
MultiLangDaemonというJavaの実行環境と標準入力と標準出力でやりとりしている(?)ので、やや取り回しがめんどくさかったのが残念ポイントでした。
以下の記事を参考にしてデバッグしていました。大変だった…
stackoverflow.com
他にもやりようはあるようです。
github.com
最後に
リファクタリングする要素としては、DBの接続情報のホスト名のところもSSM Parameter Storeに登録して参照させたりしたいなというところです。
後、Lambdaでコンシューマーを実行できるようになるといいなと思っているので、今後時間があればやってみようかな~と思います。
(たぶんこっちの方は結構情報落ちていそうだし、実装も楽そうなイメージ)