イベント処理実装ガイド
イベント処理 Lambda の実装プラクティスを記載したドキュメントです。ポイントは以下2点です。
- WebAPI の構成と統一感を持ち、Service / Dao レイヤーの共通利用ができる
- 1 イベントソースにつき 1 Lambda 構成を基本とする
- イベント処理 Lambda は即時性を求めないことが多く、デバッグしやすい開発体験を維持するため
S3 にアップロードされたデータを処理する Event Lambda の実装
S3 にアップロードされたユーザーデータの一覧をDBに新規登録する処理を実装します。
S3 にアップロードされるユーザーデータ(CSV)の内容
以下のような samples_users.csv がアップロードされる想定とします。
name,country,age
Taro,Japan,30
Alice,USA,28
Bob,UK,35
Maria,Germany,25
Li,China,32
Event Routing 実装
Lambda 関数のエントリーポイント( src/sample-event/index.ts
) に、S3 から通知されたイベント情報が入った event
オブジェクトが渡されます。
本カタログでは Event Lambda に関しては 1
イベントソースにつき 1
Lambda 構成を基本とするため、そのまま Action に振り分けます。
実装例
src/sample-event/index.ts
import type { Context, S3Event } from 'aws-lambda'
import { ProcessS3EventAction } from '~/sample-event/actions'
import { logger } from '~/sample-event/logger'
export const handler = async (event: S3Event, context: Context): Promise<void> => {
logger.addContext(context)
try {
logger.info('S3 event handler started', {
functionName: context.functionName,
requestId: context.awsRequestId,
recordCount: event.Records?.length || 0,
})
const processS3EventAction = new ProcessS3EventAction()
await processS3EventAction.run(event)
logger.info('S3 event handler completed successfully')
} catch (error) {
// 処理中にエラーが投げられた場合、Lambda の実行エラーとして扱わず、
// 一旦正常終了と扱い、ログの ubScription Filter でモニタリング機構を組むアプローチもあります
logger.error('Error in S3 event handler', { error })
throw error
}
}
Action を実装
「Action」は、以下の役割を担当します。
- イベント処理の「入り口」と「出口」となる部分
- 必要なパタメータの抽出、Service 実行
- 処理結果を通知または保存
実装例
src/sample-event/actions/ProcessS3EventAction.ts
import type { S3Event } from 'aws-lambda'
import { logger } from '~/sample-event/logger'
import { UserImportService } from '~/sample-event/services'
export class ProcessS3EventAction {
async run(event: S3Event): Promise<void> {
logger.info('S3 event received', { recordCount: event.Records.length })
for (const record of event.Records) {
const bucket = record.s3.bucket.name
const key = record.s3.object.key
const userImportService = new UserImportService()
const count = await userImportService.importUsersFromS3(bucket, key)
logger.info('Imported users from S3', { bucket, key, count })
}
}
}
Service を実装
「Service」は、イベント処理のメインロジック、ビジネスロジックを担当する部分です。
実装例
src/sample-event/services/UserImportService.ts
import { S3 } from '~/sample-event/aws'
import { UserDao } from '~/sample-event/daos'
import { CsvParserHelper } from '~/sample-event/helpers'
import { logger } from '~/sample-event/logger'
export class UserImportService {
private userDao: UserDao
private s3: S3
constructor() {
this.userDao = new UserDao()
this.s3 = new S3()
}
async importUsersFromS3(bucket: string, key: string): Promise<number> {
// Download CSV from S3
const csvContent = await this.s3.getObjectAsString(bucket, key)
if (!csvContent) {
throw new Error('Failed CSV content from S3 becuase it does not exists with the given object key.')
}
// Parse CSV and import to database
const users = CsvParserHelper.parseUsersFromCsv(csvContent)
const count = await this.userDao.createMany(users)
logger.info('Importing process completed', { bucket, key, count })
return count
}
}
DAO 実装
「DAO」は、データベースにアクセスし、クエリーを実行する処理を担当します。
実装例
src/sample-event/daos/UserDao.py
import { prisma } from '~/sample-api/libs'
import { logger } from '~/sample-api/logger'
import type { CreateUserPayload } from '~/sample-api/schemas'
export class UserDao {
async createMany(users: CreateUserPayload[]): Promise<number> {
logger.info('Creating users', { count: users.length })
const result = await prisma.user.createMany({
data: users,
skipDuplicates: true,
})
logger.info('Users created', { count: result.count })
return result.count
}
}
動作確認
「ローカル実行によるデバッグとユニットテスト」を参照してください。