Tag: dynamo

  • SES Monitoring

    I love AWS. But one thing they don’t do is build complete tools. SES is one of them. I recently started getting emails about high usage for one of the identities that I have set up for SES. I would assume that there was a way to track usage within CloudWatch but for the life of me I couldn’t find one. So I guess that means I need to build something.

    The idea here is pretty simple, within SES identities you can set up a notification. So, I created an SNS topic and subscribed all delivery notifications to the topic. Then, subscribe a Lambda function to the topic. The lambda function acts as the processor for the records then formats them in a usable way and puts them into DynamoDB. I used the identity as the primary key. The result is a simple application architecture like the below image.

    Every time an email is delivered the lambda function processes the event and checks the DynamoDB table to see if we have an existing record. If the identity is already present in the table it returns the “count” value so that we can increment the value. The “destination” value appends the destination of the email being sent. Below is a sample of the code I used to put the object into the DynamoDB Table.

    def put_dynamo_object(dynamo_object):
        count = str(dynamo_get_item(dynamo_object))
        if count == None or count == 0:
            count = str(1)
        else:
            count = int(count) + 1
        # get email address from the long string
        source_string = dynamo_object['source']
        email_match = match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', source_string)
        email = match.group(0)
        try:
            table.update_item(
                Key={
                        'identity': email
                        
                    },
                AttributeUpdates={
                    'details': {
                        'Value': {
                                'caller_identity': dynamo_object['caller_identity'],
                                'source': dynamo_object['source'],
                                'destination': dynamo_object['destination'],
                                'count': str(count)
                        }
                        
                    }
                }
            )
            return True
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False

    If you want to use this code feel free to reach out to me and I will share with you the Terraform to deploy the application and as always, reach out with questions or feedback!

  • Building a Discord Bot with Python and AWS

    I’m a member of a lot of Discord servers. The one I participate in most is one with my brothers and our friends. In this server, we joke around a lot about people posting off-topic messages in the various text channels and we give them fake “warnings”. I decided to take this a step further and create a bot where we could track the warnings and then present them in a leaderboard.

    The Discord bot API documentation is great and allowed me to quickly get a proof of concept up and running. I then relied on my Python, Terraform, and AWS skills to get the bot up and running quickly. Below is a simple architecture diagram that I started and will most likely be adding to as the members of the server request more features.

    We have three current commands, !warning, !feature, !leaderboard. The !warning command takes input of a tagged user. It then uses the Boto3 library for Python and adds the attribute to the user in the table. Here is the code:

    # Adds an attribute to a user
        def add_warning_to_user(username, attribute):
            client = boto3.resource("dynamodb", region_name="us-west-2",
                                    aws_access_key_id=os.getenv('AWS_KEY'),
                                    aws_secret_access_key=os.getenv('AWS_SECRET'))
            table = client.Table(table_name)
            print("adding", attribute, "to", str(username))
    
            try:
                response = table.update_item(
                    Key={'username': str(username)},
                    AttributeUpdates={attribute: {
                        'Value': str(dynamodb.get_warning_count_of_user(username, attribute) + 1)
                        }
                    }
                )
                print(response)
            except ClientError as e:
                print("Failed to update count")
                print(e)
                return False
            return True
    
    

    I have another function within this code that will call out to the DynamoDB table and gets the user’s current value so that we can increment the count.

    The !leaderboard command takes input of an “attribute” I built it this way so that we can have future attributes added to users without having to rebuild everything from scratch. To get the data I used the DynamoDB scan function to retrieve all of the data for all the users and then filter within the Python application on just the attribute that we are requesting the leaderboard for. I then have a function that formats the leaderboard into something that the bot can publish back to the server.

        def create_table(data, attribute):
            if attribute == "warning_count":
                attribute = "Warnings"
            table = ""
            rows = []
            rows.append("``` ")
            rows.append(f"{attribute}: Leaderboard")
            for key, value in data.items():
                rows.append(f"{key}: {str(value)}")
            rows.append("``` ")
            for row in rows:
                table += " " + row + "\n "
            return table

    This code I want to revisit to make the formatting cleaner as the list gets longer. But for now it works as intended.

    The last function I created so that the users could submit feature requests. The code is very simple and the command !feature takes the input of all text following the command and passes it to an SNS function I wrote which sends an email to me containing the user’s feature request. I have hopes that I can transition this to create some sort of Jira task or other workflow. Below is the bot’s code to handle this interaction:

    @client.command(name="feature", help="sends a feature request")
    async def send_feature_request(ctx, *, args):
        print("THIS IS THE FEATURE REQUEST", args)
        if sns.send_message(args) == True:
            await ctx.send("Your request has been sent")
        else:
            await ctx.send("Failed to send your request. Plz try again later.")
    
    

    Right now the bot is running inside a Docker container within my homelab. I need to create better logging and implement some sort of logging server so that I can better handle errors as well as monitoring in case of any outages.

    If you have questions about building Discord bots or AWS and its various components feel free to reach out to me at any time. This was a great project that I worked on over a few days and it was great to see it come together quickly!

  • Moving AWS Cloudfront Logs to DynamoDB

    I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.

    I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:

    import boto3
    import gzip
    import uuid
    from datetime import datetime
    from datetime import timedelta
    import time
    from botocore.exceptions import ClientError
    
    #Creates a time to live value
    def ttl_time():
        now = datetime.now()
        ttl_date = now + timedelta(90)
        final = str(time.mktime(ttl_date.timetuple()))
        return final
    
    #Puts the log json into dynamodb:
    def put_to_dynamo(record):
        client = boto3.resource('dynamodb', region_name='us-west-2')
        table = client.Table('YOUR_TABLE_NAME')
        try:
            response = table.put_item(
                Item=record
            )
            print(response)
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False
    
        return True
    def lambda_handler(event, context):
        print(event)
        s3_key = event['Records'][0]['s3']['object']['key']
        s3 = boto3.resource("s3")
        obj = s3.Object("YOUR_BUCKET", s3_key)
        with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
            content = gzipfile.read()
        #print(content)
        my_json = content.decode('utf8').splitlines()
    
        my_dict = {}
        for x in my_json:
            if x.startswith("#Fields:"):
                keys = x.split(" ")
            else:
                values = x.split("\t")
    
        for key in keys:
            if key == "#Fields:":
                pass
            else:
                for value in values:
                    my_dict[key] = value
        x = 0
        for item in keys:
            if item == "#Fields:":
                pass
            else:
                my_dict[item] = values[x]
                x +=1
    
    
        print('- ' * 20)
        myuuid = str(uuid.uuid4())
        print(myuuid)
        my_dict["uuid"] = myuuid
        my_dict['ttl'] = ttl_time()
    
        print(my_dict)
        if put_to_dynamo(my_dict) == True:
            print("Successfully imported item")
            return True
        else:
            print("Failed to put record")
            return False

    This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.

    I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!

    If this code helps you please share it with your friends and co-workers!

    Code on Github

  • A Dynamo Data Migration Tool

    A Dynamo Data Migration Tool

    Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.

    A quick walk through video
    import sys
    import boto3
    
    ## USAGE ############################################################################
    ## python3 dynamo.py <Source_Table> <destination table>                            ## 
    ## Requires two profiles to be set in your AWS Config file "source", "destination" ##
    #####################################################################################
    def dynamo_bulk_reader():
        session = boto3.session.Session(profile_name='source')
        dynamodb = session.resource('dynamodb', region_name="us-west-2")
        table = dynamodb.Table(sys.argv[1])
    
        print("Exporting items from: " + str(sys.argv[1]))
    
        response = table.scan()
        data = response['Items']
    
        while 'LastEvaluatedKey' in response:
            response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
            data.extend(response['Items'])
    
        print("Finished exporting: " + str(len(data)) + " items.")
        return data
    
    def dynamo_bulk_writer():
        session = boto3.session.Session(profile_name='destination')
        dynamodb = session.resource('dynamodb', region_name='us-west-2')
        table = dynamodb.Table(sys.argv[2])
        print("Importing items into: " + str(sys.argv[2]))
        for table_item in dynamo_bulk_reader():
            with table.batch_writer() as batch:
                response = batch.put_item(
                Item=table_item
                )
    
        print("Finished importing items...")
    if __name__ == '__main__':
        print("Starting Dynamo Migrater...")
        dynamo_bulk_writer()
        print("Exiting Dynamo Migrator")

    The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.

    The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!

    Github: https://github.com/avansledright/dynamo-migrate

  • Querying and Editing a Single Dynamo Object

    I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.

    In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!

    import boto3 
    from boto3.dynamodb.conditions import Key
    
    def query_table(id):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXXX')
        response = table.query(
            KeyConditionExpression=Key('PRIMARYKEY').eq(id)
        )
        return response['Items']
    
    
    def update_dynanmo_status(id, resource_name, status):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXX')
        items = query_table(id)
        for item in items:
            # Do your update here
            response = table.put_item(Item=item)
        return response