DynamoDB Stream Events
DynamoDB Streams enable you to capture changes to your DynamoDB tables and process them with Lambda functions.
Event Structure
class DynamoDBStreamEvent:
records: List[DynamoDBStreamRecord] # List of DynamoDB stream records
class DynamoDBStreamRecord:
event_id: str # Unique event ID
event_name: str # INSERT, MODIFY, or REMOVE
event_version: str # Event version
event_source: str # aws:dynamodb
aws_region: str # AWS region
dynamodb: Dict[str, Any] # Contains Keys, NewImage, OldImage
Usage Examples
Basic Change Processing
from lambda_universal_router import Router
from lambda_universal_router.events import DynamoDBStreamEvent
router = Router()
@router.dynamodb()
def process_changes(event: DynamoDBStreamEvent, context):
for record in event.records:
print(f"Event ID: {record.event_id}")
print(f"Event Type: {record.event_name}")
if record.event_name == "INSERT":
handle_insert(record.dynamodb['NewImage'])
elif record.event_name == "MODIFY":
handle_update(
record.dynamodb['OldImage'],
record.dynamodb['NewImage']
)
elif record.event_name == "REMOVE":
handle_delete(record.dynamodb['OldImage'])
Change Data Capture (CDC)
@router.dynamodb()
def sync_to_elasticsearch(event: DynamoDBStreamEvent, context):
for record in event.records:
if record.event_name in ["INSERT", "MODIFY"]:
# Get the new version of the document
document = record.dynamodb['NewImage']
# Index in Elasticsearch
index_document(document)
elif record.event_name == "REMOVE":
# Delete from Elasticsearch
delete_document(record.dynamodb['Keys'])
Filtering and Validation
@router.dynamodb()
def filtered_processing(event: DynamoDBStreamEvent, context):
for record in event.records:
# Only process high-priority items
if record.event_name != "REMOVE":
new_image = record.dynamodb['NewImage']
if new_image.get('priority', {}).get('S') == 'HIGH':
process_priority_item(new_image)
Cross-Region Replication
@router.dynamodb()
def replicate_changes(event: DynamoDBStreamEvent, context):
for record in event.records:
# Replicate to another region
if record.event_name in ["INSERT", "MODIFY"]:
replicate_item(
record.dynamodb['NewImage'],
target_region="us-west-2"
)
elif record.event_name == "REMOVE":
delete_item(
record.dynamodb['Keys'],
target_region="us-west-2"
)
Event Examples
- Insert Event
{ "Records": [ { "eventID": "1", "eventName": "INSERT", "eventVersion": "1.0", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "Keys": { "Id": {"S": "101"} }, "NewImage": { "Id": {"S": "101"}, "Title": {"S": "Book 101"}, "ISBN": {"S": "978-1234567890"}, "Price": {"N": "29.99"} }, "SequenceNumber": "111", "SizeBytes": 26, "StreamViewType": "NEW_AND_OLD_IMAGES" } } ] }
- Modify Event
{ "Records": [ { "eventName": "MODIFY", "dynamodb": { "OldImage": { "Id": {"S": "101"}, "Price": {"N": "29.99"} }, "NewImage": { "Id": {"S": "101"}, "Price": {"N": "25.99"} } } } ] }
Best Practices
- Error Handling
@router.dynamodb() def safe_process(event: DynamoDBStreamEvent, context): for record in event.records: try: process_record(record) except ValidationError: # Data validation failed log_validation_error(record) except ConnectionError: # Failed to connect to downstream service # Re-throw to retry raise except Exception as e: # Unexpected error log_error(record, e) raise
- Batch Processing
@router.dynamodb() def batch_process(event: DynamoDBStreamEvent, context): inserts = [] updates = [] deletes = [] for record in event.records: if record.event_name == "INSERT": inserts.append(record.dynamodb['NewImage']) elif record.event_name == "MODIFY": updates.append(record.dynamodb['NewImage']) elif record.event_name == "REMOVE": deletes.append(record.dynamodb['Keys']) if inserts: batch_insert(inserts) if updates: batch_update(updates) if deletes: batch_delete(deletes)
- Monitoring
@router.dynamodb() def monitored_process(event: DynamoDBStreamEvent, context): metrics = { "inserts": 0, "updates": 0, "deletes": 0, "errors": 0 } for record in event.records: try: if record.event_name == "INSERT": metrics["inserts"] += 1 elif record.event_name == "MODIFY": metrics["updates"] += 1 elif record.event_name == "REMOVE": metrics["deletes"] += 1 process_record(record) except Exception: metrics["errors"] += 1 log_metrics(metrics)
Configuration Tips
- Stream Settings
- Choose appropriate StreamViewType
- Configure batch size
- Set proper retention period
- Lambda Settings
- Set memory based on record size
- Configure timeout for batch processing
- Enable X-Ray tracing
- Performance
- Use batch processing when possible
- Implement concurrent processing
- Monitor throughput and latency