AWSTemplateFormatVersion: '2010-09-09' Description: 'Template to create Lambda function using Cloudformation' Parameters: CloudWatchGroupName: Description: Cloudwatch Group Name for bounce notifications. Default: /aws/ses/bounce_logs Type: String AllowedPattern: .+ ConstraintDescription: Cloudwatch Group Name for bounce notifications. SNSTopicARN: Description: Add SNS Topic ARN. Type: String AllowedPattern: .+ ConstraintDescription: Add SNS Topic ARN. EventType: Description: AWS SES Event Type to log to the CloudWatchGroupName Type: String Default: Bounce AllowedValues: - Bounce - Complaint - Delivery Resources: LambdaRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: {Version: '2012-10-17', Statement: [{Effect: Allow, Principal: {Service: [lambda.amazonaws.com]}, Action: ['sts:AssumeRole']}]} Policies: - PolicyName: cloudwatch_write_policy PolicyDocument: {Version: '2012-10-17', Statement: [{Effect: Allow, Action: ['logs:CreateLogGroup','logs:CreateLogStream','logs:PutLogEvents','logs:DescribeLogStreams'], "Resource" :['arn:aws:logs:*:*:log-group:/aws/ses/*']}]} Path: / SnsSubscription: Type: AWS::SNS::Subscription Properties: Protocol: lambda Endpoint: !GetAtt LambdaFunction.Arn TopicArn: !Ref SNSTopicARN LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction Principal: sns.amazonaws.com SourceArn: !Ref SNSTopicARN FunctionName: !Ref LambdaFunction LambdaFunction: Type: 'AWS::Lambda::Function' DependsOn: LambdaRole Properties: Environment: {Variables: {group_name: !Ref CloudWatchGroupName, event_type: !Ref EventType, LOG_LEVEL: 'INFO'}} Role: !GetAtt LambdaRole.Arn Timeout: 60 Handler: index.lambda_handler Runtime: python3.6 MemorySize: 128 Code: ZipFile: | import boto3 import time import json import sys import secrets import os import logging client = boto3.client('logs') log_group = os.getenv("group_name") event_type = os.getenv("event_type") def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) for record in event['Records']: logs = record['Sns']['Message'] logs_data = json.loads(logs) notification_type=logs_data['notificationType'] if(notification_type==event_type): LOG_GROUP= log_group else: sys.exit() LOG_STREAM= '{}{}{}'.format(time.strftime('%Y/%m/%d'),'[$LATEST]',secrets.token_hex(16)) try: client.create_log_group(logGroupName=LOG_GROUP) except client.exceptions.ResourceAlreadyExistsException: pass try: client.create_log_stream(logGroupName=LOG_GROUP, logStreamName=LOG_STREAM) except client.exceptions.ResourceAlreadyExistsException: pass response = client.describe_log_streams( logGroupName=LOG_GROUP, logStreamNamePrefix=LOG_STREAM ) event_log = { 'logGroupName': LOG_GROUP, 'logStreamName': LOG_STREAM, 'logEvents': [ { 'timestamp': int(round(time.time() * 1000)), 'message': logs } ], } if 'uploadSequenceToken' in response['logStreams'][0]: event_log.update({'sequenceToken': response['logStreams'][0] ['uploadSequenceToken']}) response = client.put_log_events(**event_log) logging.info(response)