イベント処理実装ガイド
イベント処理 Lambda の実装プラクティスを記載したドキュメントです。ポイントは以下2点です。
- WebAPI の構成と統一感を持ち、Service / Dao レイヤーの共通利用ができる
- 1 イベントソースにつき 1 Lambda 構成を基本とする
- イベント処理 Lambda は即時性を求めないことが多く、デバッグしやすい開発体験を維持するため
S3 にアップロードされたデータを処理する Event Lambda の実装
S3 にアップロードされたユーザーデータの一覧をDBに新規登録する処理を実装します。
S3 にアップロードされるユーザーデータ(CSV)の内容
以下のような samples_users.csv がアップロードされる想定とします。
name,country,age
Taro,Japan,30
Alice,USA,28
Bob,UK,35
Maria,Germany,25
Li,China,32
Event Routing 実装
Lambda 関数のエントリーポイント( src/sample_event/index.ts
) に、S3 から通知されたイベント情報が入った event
オブジェクトが渡されます。
本カタログでは Event Lambda に関しては 1
イベントソースにつき 1
Lambda 構成を基本とするため、そのまま Action に振り分けます。
実装例
src/sample_event/index.ts
from sample_event.actions.data_import_action import DataImportAction
from aws_lambda_powertools import Logger
logger = Logger()
@logger.inject_lambda_context(clear_state=True)
def handler(event, context):
"""
Extract necessary information from the event according to the event format that triggers the Lambda.
The following is an example of an S3 Event.
"""
DataImportAction().handle(event)
Action を実装
「Action」は、以下の役割を担当します。
- イベント処理の「入り口」と「出口」となる部分
- 必要なパタメータの抽出、Service 実行
- 処理結果を通知または保存
実装例
src/sample_event/actions/data_import_action.py
from sample_event.services.data_import_service import DataImportService
from aws_lambda_powertools import Logger
logger = Logger(child=True)
class DataImportAction:
def handle(self, s3_event):
logger.info(
'Initiate DataImportAction Process. Incoming s3_event %s', s3_event)
# Acquire bucket name and object key from S3 event
bucket = s3_event['Records'][0]['s3']['bucket']['name']
key = s3_event['Records'][0]['s3']['object']['key']
# Call main logics
service = DataImportService(bucket, key)
service.get_csv_and_parse()
service.save()
# Log / send / notify the result of this action
logger.info('DataImportedAction is successfully finished.')
Service を実装
「Service」は、イベント処理のメインロジック、ビジネスロジックを担当する部分です。
実装例
src/sample_event/services/data_import_service.py
import csv
from sample_event.aws.s3_client import S3Client
from sample_event.daos.users_dao import UsersDao
class DataImportService:
def __init__(self, __bucket_name, __bucket_key):
self.__bucket_name = __bucket_name
self.__bucket_key = __bucket_key
self.__reader = None
def get_and_parse_csv(self):
s3_client = S3Client()
# Acquire S3 object
csv_string = s3_client.get_object_as_string(
bucket_name=self.__bucket_name,
bucket_key=self.__bucket_key
)
# Parse CSV
self.__reader = csv.DictReader(csv_string.splitlines())
def save(self):
users_dao = UsersDao()
users_dao.create_users(self.__reader)
DAO 実装
「DAO」は、データベースにアクセスし、クエリーを実行する処理を担当します。
実装例
src/sample_event/daos/users_dao.py
import os, psycopg
from psycopg.rows import dict_row
class UsersDao:
def get_connection_info(self):
# psycopg3 のコネクション作成に必要なパラメータ情報を読み込みます。
# こちらのサンプルコードはカタログ AMI に含まれています。
def create_user(self, name, country, age):
try:
# データベースのコネクションを作成します
# 実際には、コネクションプールの仕組みを検討してください。
conn_info = self.get_connection_info()
conn = psycopg.connect(**conn_info)
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, country, age) VALUES (%s, %s, %s)",
(name, country, age)
)
conn.commit()
conn.close()
動作確認
「ローカル実行によるデバッグとユニットテスト」を参照してください。