Category: Cloud Architecting

  • Building a Discord Bot with Python and AWS

    I’m a member of a lot of Discord servers. The one I participate in most is one with my brothers and our friends. In this server, we joke around a lot about people posting off-topic messages in the various text channels and we give them fake “warnings”. I decided to take this a step further and create a bot where we could track the warnings and then present them in a leaderboard.

    The Discord bot API documentation is great and allowed me to quickly get a proof of concept up and running. I then relied on my Python, Terraform, and AWS skills to get the bot up and running quickly. Below is a simple architecture diagram that I started and will most likely be adding to as the members of the server request more features.

    We have three current commands, !warning, !feature, !leaderboard. The !warning command takes input of a tagged user. It then uses the Boto3 library for Python and adds the attribute to the user in the table. Here is the code:

    # Adds an attribute to a user
        def add_warning_to_user(username, attribute):
            client = boto3.resource("dynamodb", region_name="us-west-2",
                                    aws_access_key_id=os.getenv('AWS_KEY'),
                                    aws_secret_access_key=os.getenv('AWS_SECRET'))
            table = client.Table(table_name)
            print("adding", attribute, "to", str(username))
    
            try:
                response = table.update_item(
                    Key={'username': str(username)},
                    AttributeUpdates={attribute: {
                        'Value': str(dynamodb.get_warning_count_of_user(username, attribute) + 1)
                        }
                    }
                )
                print(response)
            except ClientError as e:
                print("Failed to update count")
                print(e)
                return False
            return True
    
    

    I have another function within this code that will call out to the DynamoDB table and gets the user’s current value so that we can increment the count.

    The !leaderboard command takes input of an “attribute” I built it this way so that we can have future attributes added to users without having to rebuild everything from scratch. To get the data I used the DynamoDB scan function to retrieve all of the data for all the users and then filter within the Python application on just the attribute that we are requesting the leaderboard for. I then have a function that formats the leaderboard into something that the bot can publish back to the server.

        def create_table(data, attribute):
            if attribute == "warning_count":
                attribute = "Warnings"
            table = ""
            rows = []
            rows.append("``` ")
            rows.append(f"{attribute}: Leaderboard")
            for key, value in data.items():
                rows.append(f"{key}: {str(value)}")
            rows.append("``` ")
            for row in rows:
                table += " " + row + "\n "
            return table

    This code I want to revisit to make the formatting cleaner as the list gets longer. But for now it works as intended.

    The last function I created so that the users could submit feature requests. The code is very simple and the command !feature takes the input of all text following the command and passes it to an SNS function I wrote which sends an email to me containing the user’s feature request. I have hopes that I can transition this to create some sort of Jira task or other workflow. Below is the bot’s code to handle this interaction:

    @client.command(name="feature", help="sends a feature request")
    async def send_feature_request(ctx, *, args):
        print("THIS IS THE FEATURE REQUEST", args)
        if sns.send_message(args) == True:
            await ctx.send("Your request has been sent")
        else:
            await ctx.send("Failed to send your request. Plz try again later.")
    
    

    Right now the bot is running inside a Docker container within my homelab. I need to create better logging and implement some sort of logging server so that I can better handle errors as well as monitoring in case of any outages.

    If you have questions about building Discord bots or AWS and its various components feel free to reach out to me at any time. This was a great project that I worked on over a few days and it was great to see it come together quickly!

  • Moving AWS Cloudfront Logs to DynamoDB

    I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.

    I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:

    import boto3
    import gzip
    import uuid
    from datetime import datetime
    from datetime import timedelta
    import time
    from botocore.exceptions import ClientError
    
    #Creates a time to live value
    def ttl_time():
        now = datetime.now()
        ttl_date = now + timedelta(90)
        final = str(time.mktime(ttl_date.timetuple()))
        return final
    
    #Puts the log json into dynamodb:
    def put_to_dynamo(record):
        client = boto3.resource('dynamodb', region_name='us-west-2')
        table = client.Table('YOUR_TABLE_NAME')
        try:
            response = table.put_item(
                Item=record
            )
            print(response)
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False
    
        return True
    def lambda_handler(event, context):
        print(event)
        s3_key = event['Records'][0]['s3']['object']['key']
        s3 = boto3.resource("s3")
        obj = s3.Object("YOUR_BUCKET", s3_key)
        with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
            content = gzipfile.read()
        #print(content)
        my_json = content.decode('utf8').splitlines()
    
        my_dict = {}
        for x in my_json:
            if x.startswith("#Fields:"):
                keys = x.split(" ")
            else:
                values = x.split("\t")
    
        for key in keys:
            if key == "#Fields:":
                pass
            else:
                for value in values:
                    my_dict[key] = value
        x = 0
        for item in keys:
            if item == "#Fields:":
                pass
            else:
                my_dict[item] = values[x]
                x +=1
    
    
        print('- ' * 20)
        myuuid = str(uuid.uuid4())
        print(myuuid)
        my_dict["uuid"] = myuuid
        my_dict['ttl'] = ttl_time()
    
        print(my_dict)
        if put_to_dynamo(my_dict) == True:
            print("Successfully imported item")
            return True
        else:
            print("Failed to put record")
            return False

    This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.

    I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!

    If this code helps you please share it with your friends and co-workers!

    Code on Github

  • Automating Security Group Rule Removal

    I’m using an Amazon Web Services Security Group as a way to allow traffic into an EC2 instance for the instance’s users. The users can give themselves access through a web interface that I wrote for them. Maybe I’ll cover that in a different post.

    I found recently that the Security Group was nearing its maximum rule list. So I decided to start purging rules which would ultimately force them to re-add their IP addresses to the group.

    Going in and manually removing rules is rather time-consuming. I figured I could write a script that would handle it for me. The first step was to update my previous script that inserts the rule to add a tag to the rule. The function below takes input of Security Group Id’s as a list and returns all of the rules.

    def get_sg_rules(sg_id):
        client = boto3.client('ec2')
        response = client.describe_security_group_rules(
        Filters=[
            {
                'Name': 'group-id',
                'Values': sg_id
            }
        ],
        )
        return response

    The script below iterates through each of the rules returned and will append the tag of “dateAdded” and a stringified date code.

    for sg_rule in get_sg_rules(sg_list)['SecurityGroupRules']:
            try:
                client = boto3.client('ec2')
                response = client.create_tags(
                DryRun=False,
                Resources=[
                    sg_rule['SecurityGroupRuleId'],
                ],
                Tags=[
                    {
                        'Key': 'dateAdded',
                        'Value': '2022-11-05'
                    },
                ]
            )
            except ClientError as e:
                print(e)

    I then wrote the following Lambda function that runs every day and checks for any expired rules. The schedule is set up by a Cloudwatch Event’s rule.

    import boto3
    from datetime import datetime, timedelta
    from botocore.exceptions import ClientError
    def return_today():
        now = datetime.now()
        return now
    def get_sg_rules(sg_id, old_date):
        client = boto3.client('ec2')
        response = client.describe_security_group_rules(
        Filters=[
            {
                'Name': 'group-id',
                'Values': sg_id
            },
            {
                'Name': 'tag:dateAdded',
                'Values': [old_date]
            }
        ],
        )
        
        return response
    
    def lambda_handler(event, context):
        sg_list = ["xxxx", "xxx"]
        old_date = datetime.strftime(return_today() - timedelta(days=30), "%Y-%m-%d")
        print(old_date)
        for sg_rule in get_sg_rules(sg_list, old_date)['SecurityGroupRules']:
            try:
                client = boto3.client("ec2")
                response = client.revoke_security_group_ingress(
                    GroupId=sg_rule['GroupId'],
                    SecurityGroupRuleIds=[sg_rule['SecurityGroupRuleId']]
                    )
                print(response)
                print("Successfully deleted the rule")
            except ClientError as e:
                print(e)
                print("Failed to delete rule")

    You’ll see that the code has a list of Security Groups to check. It compares the current date to that of 30 days previous. If the tag of “dateAdded” matches that previous date then we will go ahead and remove the rule.

    I hope this helps you automate your AWS Accounts. Below are links to the code repository so you can edit the code as needed. Please share it with your friends if this helps you!

    Github

  • EC2 Reservation Notification

    I realized today that I haven’t updated my EC2 reservations recently. Wondering why I never did this I came to understand that there was no way that I was getting notified that the reservations were expiring. I spent the day putting together a script that would look through my reservations, assess the time of their expiration, and then notify me if it was nearing my threshold of 3 weeks.

    I put this together as a local script but it can also be adapted to run as a lambda function which is what I have it set up to do. As always, you can view my code below and on GitHub.

    import boto3
    from datetime import datetime, timezone, timedelta
    from botocore.exceptions import ClientError
    import os
    import json
    ec2_client = boto3.client("ec2", region_name="us-west-2")
    
    def get_reserved_instances():
        response = ec2_client.describe_reserved_instances()
        reserved_instances = {}
        for reservedInstances in response['ReservedInstances']:
            reserved_instances.update({
                reservedInstances['ReservedInstancesId']: {
                    "ExpireDate": reservedInstances['End'],
                    "Type": reservedInstances['InstanceType']
                }
            })
        return reserved_instances
    def determine_expirery(expirery_date):
        now = datetime.now(timezone.utc)
        delta_min = timedelta(days=21)
        delta_max = timedelta(days=22)
        if expirery_date - now >= delta_min and expirery_date - now < delta_max:
            return True
        else:
            return False
    #Send Result to SNS
    def sendToSNS(messages):
        sns = boto3.client('sns')
        try:
            send_message = sns.publish(
                TargetArn=os.environ['SNS_TOPIC'],
                Subject='EC2-Reservation',
                Message=messages,
                )
            return send_message
        except ClientError as e:
            print("Failed to send message to SNS")
            print(e)
    
    
    if __name__ == "__main__":
    
        for reservation, res_details in get_reserved_instances().items():
            if determine_expirery(res_details['ExpireDate']) == True:
                sns_message = {"reservation": reservation, "expires": res_details['ExpireDate'].strftime("%m/%d/%Y, %H:%M:%S")}
                sendToSNS(json.dumps(sns_message))
    #  

    I have an SNS topic setup that is set to send messages to a Lambda function in the backend so I can format my messages and send them to a Slack channel for notifications.

    If you have any questions, feel free to comment or message me on Twitter!

    GitHub

  • Adding a Standard Bucket Policy

    It is good practice to deny traffic that is not HTTPS to your S3 bucket. For this reason, I wrote a script that I can use to apply a standard policy to each of my S3 buckets. While the script could be more robust to iterate through each bucket in my account, I decided to write this script to take input of the name of the bucket and then apply the changes.

    import boto3
    from botocore.exceptions import ClientError
    import json
    import sys
    
    def check_s3_policy(bucket_name):
        client = boto3.client("s3", region_name='us-west-2')
    
        # Get existing policy so that we don't overwrite anything
        try:
            result = client.get_bucket_policy(Bucket=bucket_name)
            if result == None:
                return None
            else:
                return result
        except ClientError as e:
            print("failed to retrieve policy")
            print(e)
            return None
    
    if __name__ == "__main__":
        bucket_name = sys.argv[1]
        source_aws_account = boto3.client('sts').get_caller_identity().get('Account')
        print("Our current account number: " + source_aws_account)
        connect_instance_arn = ""
        standard_bucket_policy = {
    
                    "Sid": "AWSHTTPSAccess",
                    "Action": [
                        "s3:*"
                    ],
                    "Effect": "Deny",
                    "Resource": [
                        "arn:aws:s3:::"+ bucket_name,
                        "arn:aws:s3:::" + bucket_name + "/*"
                    ],
                    "Condition": {
                        "Bool": {
                            "aws:SecureTransport": "false"
                        }
                    },
                    "Principal": "*"
    
        }
        
        existing_policy = check_s3_policy(bucket_name)
        if existing_policy == None:
            print("No policy exists so lets create a new one")
            print("Applying our standard bucket policy that denies non-HTTPS traffic...")
            try:
                new_bucket_policy = {
                    "Version": "2012-10-17",
                    "Statement": [standard_bucket_policy]
                }
                client = boto3.client("s3", region_name='us-west-2')
                client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(new_bucket_policy))
                
            except ClientError as e:
                print("failed to put bucket policy")
                print(e)
        else:
            print("There is a policy so we need to modify")
            policy_to_modify = json.loads(existing_policy['Policy'])
            policy_to_modify['Statement'].append(standard_bucket_policy)
            try:
                client = boto3.client("s3", region_name="us-west-2")
                client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy_to_modify))
            except ClientError as e:
                print("Error putting new bucket policy")
                print(e)
            
        print("Our bucket now follows all compliance ...")
        print("Exiting ...")

    You can change the policy as needed and use this script to apply changes to your buckets!

    I hope that this is helpful to someone. Please share this to your friends!
    Github

  • Security Group ID Finder

    Security Group ID Finder

    I have been working on deploying resources to a lot of AWS accounts lately where each account has the same network infrastructure. When deploying Lambdas, I had the common name of the security group but not the ID. I wrote this utility to get the security group ID for me quickly.

    import boto3
    import sys
    
    def get_security_group_id(common_name):
        ec2 = boto3.client("ec2", region_name="us-west-2")
    
        response = ec2.describe_security_groups()
        for security_group in response['SecurityGroups']:
            if security_group['GroupName'] == common_name:
                return security_group['GroupId']
            
    if __name__ == '__main__':
        if sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "usage" or sys.argv[1] == "--usage":
            print("USAGE: python3 main.py <security group name>")
        else:
            sg_id = get_security_group_id(sys.argv[1])
            if sg_id == None:
                print("Security Group Not found")
            else:
                print(sg_id)

    This is a simple tool that can be used on your command line by doing:

    python3 main.py <security group name>

    I hope this helps speed up your deployments. Feel free to share the code with your friends and team!

    Github

  • A Dynamo Data Migration Tool

    A Dynamo Data Migration Tool

    Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.

    A quick walk through video
    import sys
    import boto3
    
    ## USAGE ############################################################################
    ## python3 dynamo.py <Source_Table> <destination table>                            ## 
    ## Requires two profiles to be set in your AWS Config file "source", "destination" ##
    #####################################################################################
    def dynamo_bulk_reader():
        session = boto3.session.Session(profile_name='source')
        dynamodb = session.resource('dynamodb', region_name="us-west-2")
        table = dynamodb.Table(sys.argv[1])
    
        print("Exporting items from: " + str(sys.argv[1]))
    
        response = table.scan()
        data = response['Items']
    
        while 'LastEvaluatedKey' in response:
            response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
            data.extend(response['Items'])
    
        print("Finished exporting: " + str(len(data)) + " items.")
        return data
    
    def dynamo_bulk_writer():
        session = boto3.session.Session(profile_name='destination')
        dynamodb = session.resource('dynamodb', region_name='us-west-2')
        table = dynamodb.Table(sys.argv[2])
        print("Importing items into: " + str(sys.argv[2]))
        for table_item in dynamo_bulk_reader():
            with table.batch_writer() as batch:
                response = batch.put_item(
                Item=table_item
                )
    
        print("Finished importing items...")
    if __name__ == '__main__':
        print("Starting Dynamo Migrater...")
        dynamo_bulk_writer()
        print("Exiting Dynamo Migrator")

    The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.

    The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!

    Github: https://github.com/avansledright/dynamo-migrate

  • Querying and Editing a Single Dynamo Object

    I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.

    In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!

    import boto3 
    from boto3.dynamodb.conditions import Key
    
    def query_table(id):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXXX')
        response = table.query(
            KeyConditionExpression=Key('PRIMARYKEY').eq(id)
        )
        return response['Items']
    
    
    def update_dynanmo_status(id, resource_name, status):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXX')
        items = query_table(id)
        for item in items:
            # Do your update here
            response = table.put_item(Item=item)
        return response
  • Pandas & NumPy with AWS Lambda

    Fun fact: Pandas and NumPy don’t work out of the box with Lambda. The libraries that you might download from your development machine probably won’t work either.

    The standard Lambda Python environment is very barebones by default. There is no point in loading in a bunch of libraries if they aren’t needed. This is why we package our Lambda functions into ZIP files to be deployed.

    My first time attempting to use Pandas on AWS Lambda was in regards to concatenating Excel files. The point of this was to take a multi-sheet Excel file and combine it into one sheet for ingestion into a data lake. To accomplish this I used the Pandas library to build the new sheet. In order to automate the process I setup an S3 trigger on a Lambda function to execute the script every time a file was uploaded.

    And then I ran into this error:

    [ERROR] Runtime.ImportModuleError: Unable to import module 'your_module':
    IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!
    Importing the numpy c-extensions failed.

    I had clearly added the NumPy library into my ZIP file:

    So what was the problem? Well, apparently, the version of NumPy that I downloaded on both my Macbook and my Windows desktop is not compatible with Amazon Linux.

    To resolve this issue, I first attempted to download the package files manually from PyPi.org. I grabbed the latest “manylinux1_x86_x64.whl” file for both NumPy and Pandas. I put them back into my ZIP file and re-uploaded the file. This resulted in the same error.

    THE FIX THAT WORKED:

    The way to get this to work without failure is to spin up an Amazon Linux EC2 instance. Yes this seems excessive and it is. Not only did I have to spin up a new instance I had to install Python 3.8 because Amazon Linux ships with Python 2.7 by default. But, once installed you can use Pip to install the libraries to a directory by doing:

    pip3 install -t . <package name>

    This is useful for getting the libraries in the same location to ZIP back up for use. You can remove a lot of the files that are not needed by running:

    rm -r *.dist-info __pycache__

    After you have done the cleanup, you can ZIP up the files and move them back to your development machine, add your Lambda function and, upload to the Lambda console.

    Run a test! It should work as you intended now!

    If you need help with this please reach out to me on social media or leave a comment below.

  • A File Management Architecture

    A File Management Architecture

    This post is a continuation of my article: “A File Extraction Project”. This project has been a great learning experience for both frontend and backend application architecture and design. Below you will find a diagram and an explanation of all the pieces that make this work.

    1. The entire architecture is powered by Flask on an EC2 instance. When I move this project to production I intend to put an application load balancer in front to manage traffic. The frontend is also secured by Google Authentication. This provides authentication against the users existing GSuite deployment so that only individuals within the organization can access the application.
    2. The first Lambda function processes the upload functions. I am allowing for as many files as needed by the customer. The form also includes a single text field for specifying the value of the object tag. The function sends the objects into the first bucket which is object #4.
    3. The second Lambda function is the search functionality. This function allows the user to provide a tag value. The function queries all objects in bucket #4 and creates a list of objects that match the query. It then moves the objects to bucket #5 where it packages them up and presents them to the user in the form of a ZIP file.
    4. The first bucket is the storage for all of the objects. This is the bucket where all the objects are uploaded to from the first Lambda function. It is not publicly accessible.
    5. The second bucket is a temporary storage for files requested by the user. Objects are moved into this bucket from the first bucket. This bucket has a deletion policy that only allows objects to live inside it for 24 hours.

    Lambda Function for File Uploading:

    def upload():
        if request.method == 'POST':
            tag = request.form['tag']
            files = request.files.getlist('file')
            print(files)
            for file in files:
    
                print(file)
                if file:
                        filename = secure_filename(file.filename)
                        file.save(filename)
                        s3.upload_file(
                            Bucket = BUCKET_NAME,
                            Filename=filename,
                            Key = filename
                        )
                        
                        s3.put_object_tagging(
                            Bucket=BUCKET_NAME,
                            Key=filename,
                            Tagging={
                                'TagSet': [
                                    {
                                        'Key': 'Tag1',
                                        'Value': tag
                                    },
                                    {
                                        'Key': 'Tag2',
                                        'Value': 'Tag-value'
                                    },
                                ]
                            },
                        )
            msg = "Upload Done ! "

    The function lives within the Flask application. I have AWS permissions setup on my EC2 instance to allow the “put_object” function. You can assign tags as needed. The first tag references the $tag variable which is provided by the form submission.

    For Google Authentication I utilized a project I found on Github here. In the “auth” route that is created I modified it to authenticate against the “hd” parameter passed by the processes. You can see how this works here:

    @app.route('/auth')
    def auth():
        token = oauth.google.authorize_access_token()
        user = oauth.google.parse_id_token(token)
        session['user'] = user
        if "hd" not in user:
            abort(403)
        elif user['hd'] != 'Your hosted domain':
            abort(403)
        else:
            return redirect('/')

    If the “hd” parameter is not passed through the function it will abort with a “403” error.

    If you are interested in this project and want more information feel free to reach out and I can provide more code examples or package up the project for you to deploy on your own!

    If you found this article helpful please share it with your friends.