Moving AWS Cloudfront Logs to DynamoDB

I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.

I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:

import boto3
import gzip
import uuid
from datetime import datetime
from datetime import timedelta
import time
from botocore.exceptions import ClientError

#Creates a time to live value
def ttl_time():
    now = datetime.now()
    ttl_date = now + timedelta(90)
    final = str(time.mktime(ttl_date.timetuple()))
    return final

#Puts the log json into dynamodb:
def put_to_dynamo(record):
    client = boto3.resource('dynamodb', region_name='us-west-2')
    table = client.Table('YOUR_TABLE_NAME')
    try:
        response = table.put_item(
            Item=record
        )
        print(response)
    except ClientError as e:
        print("Failed to put record")
        print(e)
        return False

    return True
def lambda_handler(event, context):
    print(event)
    s3_key = event['Records'][0]['s3']['object']['key']
    s3 = boto3.resource("s3")
    obj = s3.Object("YOUR_BUCKET", s3_key)
    with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
        content = gzipfile.read()
    #print(content)
    my_json = content.decode('utf8').splitlines()

    my_dict = {}
    for x in my_json:
        if x.startswith("#Fields:"):
            keys = x.split(" ")
        else:
            values = x.split("\t")

    for key in keys:
        if key == "#Fields:":
            pass
        else:
            for value in values:
                my_dict[key] = value
    x = 0
    for item in keys:
        if item == "#Fields:":
            pass
        else:
            my_dict[item] = values[x]
            x +=1


    print('- ' * 20)
    myuuid = str(uuid.uuid4())
    print(myuuid)
    my_dict["uuid"] = myuuid
    my_dict['ttl'] = ttl_time()

    print(my_dict)
    if put_to_dynamo(my_dict) == True:
        print("Successfully imported item")
        return True
    else:
        print("Failed to put record")
        return False

This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.

I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!

If this code helps you please share it with your friends and co-workers!

Code on Github


Posted

in

, ,

by

Comments

Leave a Reply