メインコンテンツまでスキップ

イベント処理実装ガイド

イベント処理 Lambda の実装プラクティスを記載したドキュメントです。ポイントは以下2点です。

  • WebAPI の構成と統一感を持ち、Service / Dao レイヤーの共通利用ができる
  • 1 イベントソースにつき 1 Lambda 構成を基本とする
    • イベント処理 Lambda は即時性を求めないことが多く、デバッグしやすい開発体験を維持するため

S3 にアップロードされたデータを処理する Event Lambda の実装

S3 にアップロードされたユーザーデータの一覧をDBに新規登録する処理を実装します。

S3 にアップロードされるユーザーデータ(CSV)の内容

以下のような samples_users.csv がアップロードされる想定とします。

name,country,age
Taro,Japan,30
Alice,USA,28
Bob,UK,35
Maria,Germany,25
Li,China,32

Event Routing 実装

Lambda 関数のエントリーポイント( src/sample_event/index.ts ) に、S3 から通知されたイベント情報が入った event オブジェクトが渡されます。

本カタログでは Event Lambda に関しては 1 イベントソースにつき 1 Lambda 構成を基本とするため、そのまま Action に振り分けます。

実装例

  • src/sample_event/index.ts
from sample_event.actions.data_import_action import DataImportAction
from aws_lambda_powertools import Logger

logger = Logger()


@logger.inject_lambda_context(clear_state=True)
def handler(event, context):
"""
Extract necessary information from the event according to the event format that triggers the Lambda.
The following is an example of an S3 Event.
"""
DataImportAction().handle(event)

Action を実装

「Action」は、以下の役割を担当します。

  • イベント処理の「入り口」と「出口」となる部分
  • 必要なパタメータの抽出、Service 実行
  • 処理結果を通知または保存

実装例

  • src/sample_event/actions/data_import_action.py
from sample_event.services.data_import_service import DataImportService
from aws_lambda_powertools import Logger

logger = Logger(child=True)


class DataImportAction:

def handle(self, s3_event):
logger.info(
'Initiate DataImportAction Process. Incoming s3_event %s', s3_event)

# Acquire bucket name and object key from S3 event
bucket = s3_event['Records'][0]['s3']['bucket']['name']
key = s3_event['Records'][0]['s3']['object']['key']

# Call main logics
service = DataImportService(bucket, key)
service.get_csv_and_parse()
service.save()

# Log / send / notify the result of this action
logger.info('DataImportedAction is successfully finished.')

Service を実装

「Service」は、イベント処理のメインロジック、ビジネスロジックを担当する部分です。

実装例

  • src/sample_event/services/data_import_service.py
import csv

from sample_event.aws.s3_client import S3Client
from sample_event.daos.users_dao import UsersDao


class DataImportService:
def __init__(self, __bucket_name, __bucket_key):
self.__bucket_name = __bucket_name
self.__bucket_key = __bucket_key
self.__reader = None

def get_and_parse_csv(self):
s3_client = S3Client()

# Acquire S3 object
csv_string = s3_client.get_object_as_string(
bucket_name=self.__bucket_name,
bucket_key=self.__bucket_key
)

# Parse CSV
self.__reader = csv.DictReader(csv_string.splitlines())

def save(self):
users_dao = UsersDao()
users_dao.create_users(self.__reader)

DAO 実装

「DAO」は、データベースにアクセスし、クエリーを実行する処理を担当します。

実装例

  • src/sample_event/daos/users_dao.py
import os, psycopg
from psycopg.rows import dict_row


class UsersDao:
def get_connection_info(self):
# psycopg3 のコネクション作成に必要なパラメータ情報を読み込みます。
# こちらのサンプルコードはカタログ AMI に含まれています。

def create_user(self, name, country, age):
try:
# データベースのコネクションを作成します
# 実際には、コネクションプールの仕組みを検討してください。
conn_info = self.get_connection_info()
conn = psycopg.connect(**conn_info)
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, country, age) VALUES (%s, %s, %s)",
(name, country, age)
)
conn.commit()
conn.close()

動作確認

ローカル実行によるデバッグとユニットテスト」を参照してください。