Skip to the content.

Examples

API Gateway Examples

REST API with Multiple Endpoints

from lambda_universal_router import Router
from lambda_universal_router.events import APIGatewayEvent

router = Router()

@router.apigateway("/users", method="GET")
def list_users(event: APIGatewayEvent, context):
    page = event.query_string_parameters.get('page', '1')
    limit = event.query_string_parameters.get('limit', '10')
    
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": {
            "users": [{"id": 1, "name": "John"}],
            "page": int(page),
            "limit": int(limit)
        }
    }

@router.apigateway("/users/{user_id}", method="GET")
def get_user(event: APIGatewayEvent, context):
    user_id = event.path_parameters['user_id']
    return {
        "statusCode": 200,
        "body": {"id": user_id, "name": "John Doe"}
    }

@router.apigateway("/users", method="POST")
def create_user(event: APIGatewayEvent, context):
    user_data = json.loads(event.body)
    return {
        "statusCode": 201,
        "body": {"message": "User created", "user": user_data}
    }

Queue Processing Examples

SQS Message Processing

from lambda_universal_router import Router
from lambda_universal_router.events import SQSEvent

router = Router()

@router.sqs()
def process_orders(event: SQSEvent, context):
    for message in event.records:
        order = json.loads(message.body)
        print(f"Processing order {order['id']}")
        
        # Access message attributes
        priority = message.attributes.get('Priority')
        if priority == 'HIGH':
            process_priority_order(order)
        else:
            process_normal_order(order)

Batch Processing with Error Handling

@router.sqs()
def safe_process(event: SQSEvent, context):
    results = {
        "successful": [],
        "failed": []
    }
    
    for message in event.records:
        try:
            process_message(message)
            results["successful"].append(message.message_id)
        except Exception as e:
            results["failed"].append({
                "message_id": message.message_id,
                "error": str(e)
            })
    
    return results

S3 Event Examples

File Processing

from lambda_universal_router import Router
from lambda_universal_router.events import S3Event

router = Router()

@router.s3()
def process_uploads(event: S3Event, context):
    for record in event.records:
        if record.event_name.startswith('ObjectCreated:'):
            bucket = record.bucket.name
            key = record.s3_object.key
            size = record.s3_object.size
            
            print(f"Processing new file: {key}")
            print(f"Size: {size} bytes")
            
            if key.endswith('.jpg'):
                process_image(bucket, key)
            elif key.endswith('.pdf'):
                process_document(bucket, key)

DynamoDB Stream Examples

Change Data Capture

from lambda_universal_router import Router
from lambda_universal_router.events import DynamoDBStreamEvent

router = Router()

@router.dynamodb()
def sync_changes(event: DynamoDBStreamEvent, context):
    for record in event.records:
        table_name = record.dynamodb.get('TableName')
        
        if record.event_name == 'INSERT':
            handle_insert(record.dynamodb['NewImage'])
        elif record.event_name == 'MODIFY':
            handle_update(
                record.dynamodb['OldImage'],
                record.dynamodb['NewImage']
            )
        elif record.event_name == 'REMOVE':
            handle_delete(record.dynamodb['OldImage'])

EventBridge Examples

Scheduled Tasks

from lambda_universal_router import Router
from lambda_universal_router.events import EventBridgeEvent

router = Router()

@router.eventbridge()
def daily_cleanup(event: EventBridgeEvent, context):
    if event.detail_type == "Scheduled Event":
        print(f"Running daily cleanup at {event.time}")
        cleanup_old_data()

Custom Event Handler Example

Fallback Handler

from lambda_universal_router import Router
from lambda_universal_router.events import CustomEvent

router = Router()

@router.custom()
def handle_unknown(event: CustomEvent, context):
    # Log unknown event type
    print(f"Received unknown event: {event.event_data}")
    
    # Try to handle based on event structure
    if 'Records' in event.event_data:
        return handle_record_based_event(event.event_data)
    elif 'detail-type' in event.event_data:
        return handle_eventbridge_like_event(event.event_data)
    else:
        return {
            "statusCode": 200,
            "body": "Event processed by fallback handler"
        }

Complete Example

from lambda_universal_router import Router
from lambda_universal_router.events import (
    APIGatewayEvent, SQSEvent, S3Event,
    DynamoDBStreamEvent, CustomEvent
)

router = Router()

# API Gateway handler
@router.apigateway("/process", method="POST")
def process_api(event: APIGatewayEvent, context):
    return {"statusCode": 200, "body": "Processed"}

# SQS handler
@router.sqs()
def process_queue(event: SQSEvent, context):
    for msg in event.records:
        process_message(msg)

# S3 handler
@router.s3()
def process_files(event: S3Event, context):
    for record in event.records:
        process_file(record)

# DynamoDB handler
@router.dynamodb()
def sync_data(event: DynamoDBStreamEvent, context):
    for record in event.records:
        sync_record(record)

# Fallback handler
@router.custom()
def handle_unknown(event: CustomEvent, context):
    return {"status": "processed by fallback"}

# Main handler
def lambda_handler(event, context):
    return router.dispatch(event, context)