Moving AWS Cloudfront Logs to DynamoDB

I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.

I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:

import boto3
import gzip
import uuid
from datetime import datetime
from datetime import timedelta
import time
from botocore.exceptions import ClientError

#Creates a time to live value
def ttl_time():
    now =
    ttl_date = now + timedelta(90)
    final = str(time.mktime(ttl_date.timetuple()))
    return final

#Puts the log json into dynamodb:
def put_to_dynamo(record):
    client = boto3.resource('dynamodb', region_name='us-west-2')
    table = client.Table('YOUR_TABLE_NAME')
        response = table.put_item(
    except ClientError as e:
        print("Failed to put record")
        return False

    return True
def lambda_handler(event, context):
    s3_key = event['Records'][0]['s3']['object']['key']
    s3 = boto3.resource("s3")
    obj = s3.Object("YOUR_BUCKET", s3_key)
    with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
        content =
    my_json = content.decode('utf8').splitlines()

    my_dict = {}
    for x in my_json:
        if x.startswith("#Fields:"):
            keys = x.split(" ")
            values = x.split("\t")

    for key in keys:
        if key == "#Fields:":
            for value in values:
                my_dict[key] = value
    x = 0
    for item in keys:
        if item == "#Fields:":
            my_dict[item] = values[x]
            x +=1

    print('- ' * 20)
    myuuid = str(uuid.uuid4())
    my_dict["uuid"] = myuuid
    my_dict['ttl'] = ttl_time()

    if put_to_dynamo(my_dict) == True:
        print("Successfully imported item")
        return True
        print("Failed to put record")
        return False

This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.

I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!

If this code helps you please share it with your friends and co-workers!

Code on Github



, ,



Leave a Reply