SNS (Simple Notification Service) Events
Amazon SNS allows you to process notifications from topics using Lambda functions.
Event Structure
class SNSEvent:
records: List[SNSMessage] # List of SNS messages
class SNSMessage:
message_id: str # Unique message ID
topic_arn: str # SNS topic ARN
message: str # Message content
subject: str # Message subject
timestamp: str # Message timestamp
message_attributes: Dict[str, Any] # Message attributes
Usage Examples
Basic Message Processing
from lambda_universal_router import Router
from lambda_universal_router.events import SNSEvent
router = Router()
@router.sns()
def process_notifications(event: SNSEvent, context):
for message in event.records:
print(f"Message ID: {message.message_id}")
print(f"Subject: {message.subject}")
print(f"Message: {message.message}")
print(f"From topic: {message.topic_arn}")
JSON Message Processing
import json
@router.sns()
def process_json(event: SNSEvent, context):
for message in event.records:
try:
data = json.loads(message.message)
if data['type'] == 'alert':
handle_alert(data)
elif data['type'] == 'notification':
handle_notification(data)
except json.JSONDecodeError:
print(f"Invalid JSON in message: {message.message_id}")
Topic-Based Processing
@router.sns()
def handle_by_topic(event: SNSEvent, context):
for message in event.records:
if 'alerts' in message.topic_arn:
process_alert(message)
elif 'notifications' in message.topic_arn:
process_notification(message)
elif 'metrics' in message.topic_arn:
process_metrics(message)
Message Attributes
@router.sns()
def process_with_attributes(event: SNSEvent, context):
for message in event.records:
# Check message attributes
message_type = message.message_attributes.get('MessageType', {}).get('Value')
if message_type == 'CRITICAL':
handle_critical(message)
else:
handle_normal(message)
Event Examples
- Basic SNS Event
{ "Records": [ { "EventSource": "aws:sns", "EventVersion": "1.0", "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:ExampleTopic", "Sns": { "Type": "Notification", "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", "TopicArn": "arn:aws:sns:us-east-1:123456789012:ExampleTopic", "Subject": "Example notification", "Message": "Hello from SNS!", "Timestamp": "2024-03-17T12:00:00.000Z", "MessageAttributes": { "MessageType": { "Type": "String", "Value": "CRITICAL" } } } } ] }
- JSON Message
{ "Records": [ { "Sns": { "Message": "{\"type\":\"alert\",\"severity\":\"high\",\"message\":\"System alert\"}", "MessageAttributes": { "AlertType": { "Type": "String", "Value": "SystemAlert" } } } } ] }
Best Practices
- Error Handling
@router.sns() def safe_process(event: SNSEvent, context): for message in event.records: try: process_message(message) except ValidationError: # Message validation failed log_validation_error(message) except ProcessingError as e: # Processing error handle_processing_error(message, e) except Exception as e: # Unexpected error log_error(message, e) raise
- Message Validation
@router.sns() def validate_messages(event: SNSEvent, context): for message in event.records: try: data = json.loads(message.message) if not is_valid_schema(data): log_invalid_schema(message) continue process_valid_message(data) except json.JSONDecodeError: log_invalid_json(message)
- Monitoring
@router.sns() def monitored_process(event: SNSEvent, context): metrics = { "processed": 0, "errors": 0, "by_type": {} } for message in event.records: try: msg_type = get_message_type(message) metrics["by_type"][msg_type] = metrics["by_type"].get(msg_type, 0) + 1 process_message(message) metrics["processed"] += 1 except Exception: metrics["errors"] += 1 log_metrics(metrics)
Configuration Tips
- SNS Topic Settings
- Configure appropriate access policies
- Set up topic subscriptions
- Enable message attributes if needed
- Lambda Settings
- Set appropriate memory and timeout
- Configure error handling
- Enable X-Ray tracing if needed
- Security
- Use encryption for sensitive data
- Validate message sources
- Implement proper access controls