メインコンテンツまでスキップ

イベント処理実装ガイド

イベント処理 Lambda の実装プラクティスを記載したドキュメントです。ポイントは以下2点です。

  • WebAPI の構成と統一感を持ち、Service / Dao レイヤーの共通利用ができる
  • 1 イベントソースにつき 1 Lambda 構成を基本とする
    • イベント処理 Lambda は即時性を求めないことが多く、デバッグしやすい開発体験を維持するため

S3 にアップロードされたデータを処理する Event Lambda の実装

S3 にアップロードされたユーザーデータの一覧をDBに新規登録する処理を実装します。

S3 にアップロードされるユーザーデータ(CSV)の内容

以下のような samples_users.csv がアップロードされる想定とします。

name,country,age
Taro,Japan,30
Alice,USA,28
Bob,UK,35
Maria,Germany,25
Li,China,32

Event Routing 実装

Lambda 関数のエントリーポイント( src/sample-event/index.ts ) に、S3 から通知されたイベント情報が入った event オブジェクトが渡されます。

本カタログでは Event Lambda に関しては 1 イベントソースにつき 1 Lambda 構成を基本とするため、そのまま Action に振り分けます。

実装例

  • src/sample-event/index.ts
import type { Context, S3Event } from 'aws-lambda'
import { ProcessS3EventAction } from '~/sample-event/actions'
import { logger } from '~/sample-event/logger'

export const handler = async (event: S3Event, context: Context): Promise<void> => {
logger.addContext(context)

try {
logger.info('S3 event handler started', {
functionName: context.functionName,
requestId: context.awsRequestId,
recordCount: event.Records?.length || 0,
})

const processS3EventAction = new ProcessS3EventAction()
await processS3EventAction.run(event)

logger.info('S3 event handler completed successfully')
} catch (error) {
// 処理中にエラーが投げられた場合、Lambda の実行エラーとして扱わず、
// 一旦正常終了と扱い、ログの ubScription Filter でモニタリング機構を組むアプローチもあります
logger.error('Error in S3 event handler', { error })
throw error
}
}

Action を実装

「Action」は、以下の役割を担当します。

  • イベント処理の「入り口」と「出口」となる部分
  • 必要なパタメータの抽出、Service 実行
  • 処理結果を通知または保存

実装例

  • src/sample-event/actions/ProcessS3EventAction.ts
import type { S3Event } from 'aws-lambda'
import { logger } from '~/sample-event/logger'
import { UserImportService } from '~/sample-event/services'

export class ProcessS3EventAction {

async run(event: S3Event): Promise<void> {
logger.info('S3 event received', { recordCount: event.Records.length })
for (const record of event.Records) {
const bucket = record.s3.bucket.name
const key = record.s3.object.key

const userImportService = new UserImportService()
const count = await userImportService.importUsersFromS3(bucket, key)
logger.info('Imported users from S3', { bucket, key, count })
}
}
}

Service を実装

「Service」は、イベント処理のメインロジック、ビジネスロジックを担当する部分です。

実装例

  • src/sample-event/services/UserImportService.ts
import { S3 } from '~/sample-event/aws'
import { UserDao } from '~/sample-event/daos'
import { CsvParserHelper } from '~/sample-event/helpers'
import { logger } from '~/sample-event/logger'

export class UserImportService {
private userDao: UserDao
private s3: S3

constructor() {
this.userDao = new UserDao()
this.s3 = new S3()
}

async importUsersFromS3(bucket: string, key: string): Promise<number> {
// Download CSV from S3
const csvContent = await this.s3.getObjectAsString(bucket, key)
if (!csvContent) {
throw new Error('Failed CSV content from S3 becuase it does not exists with the given object key.')
}

// Parse CSV and import to database
const users = CsvParserHelper.parseUsersFromCsv(csvContent)
const count = await this.userDao.createMany(users)
logger.info('Importing process completed', { bucket, key, count })

return count
}
}

DAO 実装

「DAO」は、データベースにアクセスし、クエリーを実行する処理を担当します。

実装例

  • src/sample-event/daos/UserDao.py
import { prisma } from '~/sample-api/libs'
import { logger } from '~/sample-api/logger'
import type { CreateUserPayload } from '~/sample-api/schemas'

export class UserDao {
async createMany(users: CreateUserPayload[]): Promise<number> {
logger.info('Creating users', { count: users.length })

const result = await prisma.user.createMany({
data: users,
skipDuplicates: true,
})

logger.info('Users created', { count: result.count })
return result.count
}
}

動作確認

ローカル実行によるデバッグとユニットテスト」を参照してください。