Skip to main content

Event Processing Implementation Guide

This document describes implementation practices for event processing Lambda functions. The key points are as follows:

  • Maintains consistency with WebAPI configuration and enables shared use of Service/Dao layers
  • Based on a configuration of 1 Lambda per 1 event source
    • Event processing Lambda functions often don't require immediacy, so this maintains an easy-to-debug development experience

Implementing Event Lambda to Process Data Uploaded to S3

Implement processing to register a list of user data uploaded to S3 as new entries in the database.

Content of User Data (CSV) Uploaded to S3

Assume that a samples_users.csv file like the following is uploaded:

name,country,age
Taro,Japan,30
Alice,USA,28
Bob,UK,35
Maria,Germany,25
Li,China,32

Event Routing Implementation

An event object containing event information notified from S3 is passed to the Lambda function's entry point (src/sample-event/index.ts).

In this catalog, Event Lambda is based on a configuration of 1 Lambda per 1 event source, so it routes directly to the Action.

Implementation Example

  • src/sample-event/index.ts
import type { Context, S3Event } from 'aws-lambda'
import { ProcessS3EventAction } from '~/sample-event/actions'
import { logger } from '~/sample-event/logger'

export const handler = async (event: S3Event, context: Context): Promise<void> => {
logger.addContext(context)

try {
logger.info('S3 event handler started', {
functionName: context.functionName,
requestId: context.awsRequestId,
recordCount: event.Records?.length || 0,
})

const processS3EventAction = new ProcessS3EventAction()
await processS3EventAction.run(event)

logger.info('S3 event handler completed successfully')
} catch (error) {
// If an error is thrown during processing, one approach is to not treat it as a Lambda execution error,
// but instead treat it as a normal completion and build a monitoring mechanism with log Subscription Filters
logger.error('Error in S3 event handler', { error })
throw error
}
}

Implementing Action

"Action" is responsible for the following roles:

  • Entry and exit points for event processing
  • Extracting necessary parameters and executing Service
  • Notifying or saving processing results

Implementation Example

  • src/sample-event/actions/ProcessS3EventAction.ts
import type { S3Event } from 'aws-lambda'
import { logger } from '~/sample-event/logger'
import { UserImportService } from '~/sample-event/services'

export class ProcessS3EventAction {

async run(event: S3Event): Promise<void> {
logger.info('S3 event received', { recordCount: event.Records.length })
for (const record of event.Records) {
const bucket = record.s3.bucket.name
const key = record.s3.object.key

const userImportService = new UserImportService()
const count = await userImportService.importUsersFromS3(bucket, key)
logger.info('Imported users from S3', { bucket, key, count })
}
}
}

Implementing Service

"Service" is the part responsible for the main logic and business logic of event processing.

Implementation Example

  • src/sample-event/services/UserImportService.ts
import { S3 } from '~/sample-event/aws'
import { UserDao } from '~/sample-event/daos'
import { CsvParserHelper } from '~/sample-event/helpers'
import { logger } from '~/sample-event/logger'

export class UserImportService {
private userDao: UserDao
private s3: S3

constructor() {
this.userDao = new UserDao()
this.s3 = new S3()
}

async importUsersFromS3(bucket: string, key: string): Promise<number> {
// Download CSV from S3
const csvContent = await this.s3.getObjectAsString(bucket, key)
if (!csvContent) {
throw new Error('Failed CSV content from S3 becuase it does not exists with the given object key.')
}

// Parse CSV and import to database
const users = CsvParserHelper.parseUsersFromCsv(csvContent)
const count = await this.userDao.createMany(users)
logger.info('Importing process completed', { bucket, key, count })

return count
}
}

DAO Implementation

"DAO" is responsible for accessing the database and executing queries.

Implementation Example

  • src/sample-event/daos/UserDao.py
import { prisma } from '~/sample-api/libs'
import { logger } from '~/sample-api/logger'
import type { CreateUserPayload } from '~/sample-api/schemas'

export class UserDao {
async createMany(users: CreateUserPayload[]): Promise<number> {
logger.info('Creating users', { count: users.length })

const result = await prisma.user.createMany({
data: users,
skipDuplicates: true,
})

logger.info('Users created', { count: result.count })
return result.count
}
}

Testing

Please refer to "Local Debugging and Unit Testing".