Skip to main content

Event Processing Implementation Guide

Documentation describing implementation practices for event processing Lambda. The key points are the following two:

  • Has consistency with WebAPI configuration and enables common use of Service / Dao layers
  • Basic configuration of 1 Lambda per 1 event source
    • Event processing Lambda often doesn't require immediacy, and to maintain an easy-to-debug development experience

Implementation of Event Lambda to Process Data Uploaded to S3

Implement processing to register a list of user data uploaded to S3 as new entries in the database.

Content of User Data (CSV) Uploaded to S3

Assume that the following samples_users.csv will be uploaded.

name,country,age
Taro,Japan,30
Alice,USA,28
Bob,UK,35
Maria,Germany,25
Li,China,32

Event Routing Implementation

An event object containing event information notified from S3 is passed to the Lambda function entry point (src/sample_event/index.ts).

Since this catalog basically follows a configuration of 1 Lambda per 1 event source for Event Lambda, it directly distributes to Actions.

Implementation Example

  • src/sample_event/index.ts
from sample_event.actions.data_import_action import DataImportAction
from aws_lambda_powertools import Logger

logger = Logger()


@logger.inject_lambda_context(clear_state=True)
def handler(event, context):
"""
Extract necessary information from the event according to the event format that triggers the Lambda.
The following is an example of an S3 Event.
"""
DataImportAction().handle(event)

Implement Action

"Action" is responsible for the following roles:

  • Acts as the "entry" and "exit" points for event processing
  • Extract necessary parameters and execute Service
  • Notify or save processing results

Implementation Example

  • src/sample_event/actions/data_import_action.py
from sample_event.services.data_import_service import DataImportService
from aws_lambda_powertools import Logger

logger = Logger(child=True)


class DataImportAction:

def handle(self, s3_event):
logger.info(
'Initiate DataImportAction Process. Incoming s3_event %s', s3_event)

# Acquire bucket name and object key from S3 event
bucket = s3_event['Records'][0]['s3']['bucket']['name']
key = s3_event['Records'][0]['s3']['object']['key']

# Call main logics
service = DataImportService(bucket, key)
service.get_csv_and_parse()
service.save()

# Log / send / notify the result of this action
logger.info('DataImportedAction is successfully finished.')

Implement Service

"Service" is the part responsible for the main logic and business logic of event processing.

Implementation Example

  • src/sample_event/services/data_import_service.py
import csv

from sample_event.aws.s3_client import S3Client
from sample_event.daos.users_dao import UsersDao


class DataImportService:
def __init__(self, __bucket_name, __bucket_key):
self.__bucket_name = __bucket_name
self.__bucket_key = __bucket_key
self.__reader = None

def get_and_parse_csv(self):
s3_client = S3Client()

# Acquire S3 object
csv_string = s3_client.get_object_as_string(
bucket_name=self.__bucket_name,
bucket_key=self.__bucket_key
)

# Parse CSV
self.__reader = csv.DictReader(csv_string.splitlines())

def save(self):
users_dao = UsersDao()
users_dao.create_users(self.__reader)

DAO Implementation

"DAO" is responsible for processing that accesses the database and executes queries.

Implementation Example

  • src/sample_event/daos/users_dao.py
import os, psycopg
from psycopg.rows import dict_row


class UsersDao:
def get_connection_info(self):
# Load parameter information required for psycopg3 connection creation.
# This sample code is included in the catalog AMI.

def create_user(self, name, country, age):
try:
# Create database connection
# In practice, please consider a connection pool mechanism.
conn_info = self.get_connection_info()
conn = psycopg.connect(**conn_info)
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, country, age) VALUES (%s, %s, %s)",
(name, country, age)
)
conn.commit()
conn.close()

Operation Verification

Please refer to "Local Debugging and Unit Testing".