Category: Python

  • Adding a Standard Bucket Policy

    It is good practice to deny traffic that is not HTTPS to your S3 bucket. For this reason, I wrote a script that I can use to apply a standard policy to each of my S3 buckets. While the script could be more robust to iterate through each bucket in my account, I decided to write this script to take input of the name of the bucket and then apply the changes.

    import boto3
    from botocore.exceptions import ClientError
    import json
    import sys
    
    def check_s3_policy(bucket_name):
        client = boto3.client("s3", region_name='us-west-2')
    
        # Get existing policy so that we don't overwrite anything
        try:
            result = client.get_bucket_policy(Bucket=bucket_name)
            if result == None:
                return None
            else:
                return result
        except ClientError as e:
            print("failed to retrieve policy")
            print(e)
            return None
    
    if __name__ == "__main__":
        bucket_name = sys.argv[1]
        source_aws_account = boto3.client('sts').get_caller_identity().get('Account')
        print("Our current account number: " + source_aws_account)
        connect_instance_arn = ""
        standard_bucket_policy = {
    
                    "Sid": "AWSHTTPSAccess",
                    "Action": [
                        "s3:*"
                    ],
                    "Effect": "Deny",
                    "Resource": [
                        "arn:aws:s3:::"+ bucket_name,
                        "arn:aws:s3:::" + bucket_name + "/*"
                    ],
                    "Condition": {
                        "Bool": {
                            "aws:SecureTransport": "false"
                        }
                    },
                    "Principal": "*"
    
        }
        
        existing_policy = check_s3_policy(bucket_name)
        if existing_policy == None:
            print("No policy exists so lets create a new one")
            print("Applying our standard bucket policy that denies non-HTTPS traffic...")
            try:
                new_bucket_policy = {
                    "Version": "2012-10-17",
                    "Statement": [standard_bucket_policy]
                }
                client = boto3.client("s3", region_name='us-west-2')
                client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(new_bucket_policy))
                
            except ClientError as e:
                print("failed to put bucket policy")
                print(e)
        else:
            print("There is a policy so we need to modify")
            policy_to_modify = json.loads(existing_policy['Policy'])
            policy_to_modify['Statement'].append(standard_bucket_policy)
            try:
                client = boto3.client("s3", region_name="us-west-2")
                client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy_to_modify))
            except ClientError as e:
                print("Error putting new bucket policy")
                print(e)
            
        print("Our bucket now follows all compliance ...")
        print("Exiting ...")

    You can change the policy as needed and use this script to apply changes to your buckets!

    I hope that this is helpful to someone. Please share this to your friends!
    Github

  • Security Group ID Finder

    Security Group ID Finder

    I have been working on deploying resources to a lot of AWS accounts lately where each account has the same network infrastructure. When deploying Lambdas, I had the common name of the security group but not the ID. I wrote this utility to get the security group ID for me quickly.

    import boto3
    import sys
    
    def get_security_group_id(common_name):
        ec2 = boto3.client("ec2", region_name="us-west-2")
    
        response = ec2.describe_security_groups()
        for security_group in response['SecurityGroups']:
            if security_group['GroupName'] == common_name:
                return security_group['GroupId']
            
    if __name__ == '__main__':
        if sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "usage" or sys.argv[1] == "--usage":
            print("USAGE: python3 main.py <security group name>")
        else:
            sg_id = get_security_group_id(sys.argv[1])
            if sg_id == None:
                print("Security Group Not found")
            else:
                print(sg_id)

    This is a simple tool that can be used on your command line by doing:

    python3 main.py <security group name>

    I hope this helps speed up your deployments. Feel free to share the code with your friends and team!

    Github

  • A Dynamo Data Migration Tool

    A Dynamo Data Migration Tool

    Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.

    A quick walk through video
    import sys
    import boto3
    
    ## USAGE ############################################################################
    ## python3 dynamo.py <Source_Table> <destination table>                            ## 
    ## Requires two profiles to be set in your AWS Config file "source", "destination" ##
    #####################################################################################
    def dynamo_bulk_reader():
        session = boto3.session.Session(profile_name='source')
        dynamodb = session.resource('dynamodb', region_name="us-west-2")
        table = dynamodb.Table(sys.argv[1])
    
        print("Exporting items from: " + str(sys.argv[1]))
    
        response = table.scan()
        data = response['Items']
    
        while 'LastEvaluatedKey' in response:
            response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
            data.extend(response['Items'])
    
        print("Finished exporting: " + str(len(data)) + " items.")
        return data
    
    def dynamo_bulk_writer():
        session = boto3.session.Session(profile_name='destination')
        dynamodb = session.resource('dynamodb', region_name='us-west-2')
        table = dynamodb.Table(sys.argv[2])
        print("Importing items into: " + str(sys.argv[2]))
        for table_item in dynamo_bulk_reader():
            with table.batch_writer() as batch:
                response = batch.put_item(
                Item=table_item
                )
    
        print("Finished importing items...")
    if __name__ == '__main__':
        print("Starting Dynamo Migrater...")
        dynamo_bulk_writer()
        print("Exiting Dynamo Migrator")

    The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.

    The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!

    Github: https://github.com/avansledright/dynamo-migrate

  • Querying and Editing a Single Dynamo Object

    I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.

    In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!

    import boto3 
    from boto3.dynamodb.conditions import Key
    
    def query_table(id):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXXX')
        response = table.query(
            KeyConditionExpression=Key('PRIMARYKEY').eq(id)
        )
        return response['Items']
    
    
    def update_dynanmo_status(id, resource_name, status):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXX')
        items = query_table(id)
        for item in items:
            # Do your update here
            response = table.put_item(Item=item)
        return response
  • Searching S3 Buckets for an Object

    I wrote this code for a project that I was working on for a client. The goal was to look in an S3 bucket to find objects that match a specific identification number. Specifically, they are looking to return audio logs from calls in an AWS Connect instance.

    In this script, we are utilizing Boto3 to iterate through objects in a provided S3 bucket and then returning the object keys. The keys are passed to a function that will generate a pre-signed URL for the user to utilize for downloading the object.

    import os
    import sys
    import boto3
    import logging
    from botocore.exceptions import ClientError
    
    # USAGE: python3 main.py <item you want>
    object_to_find = sys.argv[1]
    bucket = "Your bucket name"
    
    s3 = boto3.client('s3')
    def get_objects(object_to_find):
        links = []
        response = s3.list_objects_v2(
            Bucket=bucket,
        )
        for x in response['Contents']:
            if object_to_find in x['Key']:
                links.append(x['Key'])
        return links
    
    def create_presigned_url(bucket_name, object_name, expiration=3600):
        s3_client = boto3.client('s3')
        try:
            s3 = boto3.resource('s3')
            s3.Object(bucket_name, object_name).load()
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                return "Object doesn't exist " + object_name
        try:
            response = s3_client.generate_presigned_url('get_object',
                Params={'Bucket': bucket_name, 
                        'Key': object_name},
                ExpiresIn=expiration
                )
        except ClientError as e:
            print(e)
            return None
        return response
    links = get_objects(object_to_find)
    for x in links:
        print(create_presigned_url(bucket, x, expiration=3600))

    Test it out and let me know if you find it helpful!

  • Subscribing All SES Identities to an SNS Topic

    I recently ran across an issue where I was experiencing many bounced emails on my Amazon SES account. So much so that Amazon reached out and put me on a warning notice.

    I realized that I had no logging in place to handle this. In order to create a logging mechanism I decided to send all “Bounce” notifications to a Slack channel so that I could better understand what was going on.

    To accomplish this I first had to subscribe an SNS topic to a Slack channel. There are a multitude of ways that you can do this so I won’t go into detail here. If you have questions please reach out.

    I wrote a simple function to loop through all of my identities in SES and then subscribe them to my SNS topic. Here is the code:

    import boto3
    ses = boto3.client('ses')
    response = ses.list_identities()
    
    for id in response['Identities']:
        update = ses.set_identity_notification_topic(
            Identity=id,
            NotificationType='Bounce',
            SnsTopic='<your SNS ARN here>'
        )
        print(update)

    You can see this is a pretty straight forward loop that utilizes the Boto3 library in order to collect all of the identities.

    Feel free to use this code however you want and if you have any questions reach out via email or social media!

  • Building Dynamic DNS with Route53 and PFSense

    I use PFSense as my home router, firewall, VPN and much more. I’m sure a lot of my readers do as well. One thing that I have always set up is an entry in Route53 that points to my public IP address on my PFSense box. However, I use Comcast so, my IP address is changing every so often.

    Typically this isn’t a big deal because only a few applications utilize the DNS entry I have setup. But, what if I could automate the changes by scheduling a job that automatically checks my IP address on the PFSense side and then updates the Route53 record automatically?

    A couple of requirements:
    – PFSense with the API package installed
    – A subdomain setup in Route53 that points to your PFSense box

    Some Python to do some magic:

    import requests
    import json
    import boto3
    
    
    clientid = "<pfsense clientID here>"
    key = "<pfsense api key here>"
    route53 = boto3.client('route53')
    zoneID = "<route53 hosted zone here>"
    # be sure to include a trailing "." as this is how Route53 formats things
    # EX: https://google.com.
    pfsenseDNS = "<Your subdomain>"
    
    headers = {
        "Authorization": f"{clientid} {key}",
        "Content-type": 'application/json'
        }
    #GET Pfsense IP
    def getWanIP():
        response = requests.get('https://<your subdomain>/api/v1/system/arp', headers=headers)
        arptable = json.loads(response.content)
        entries = arptable['data']
        wan = []
    
        for entry in entries:
            # change the interface code if necessary
            if entry['interface'] == 'igb0':
                wan.append(entry)
        for entry in wan:
            if entry['status'] == 'permanent':
                wanIP = entry['ip']
                return wanIP
    
    record_set = route53.list_resource_record_sets(
        HostedZoneId=zoneID
    )
    
    for record in record_set['ResourceRecordSets']:
        if record['Name'] == pfsenseDNS:
            #pprint.pprint(record)
            if record['Type'] == 'A':
                for entry in record['ResourceRecords']:
                    if entry['Value'] != getWanIP():
                        print("The Records Do Not Match")
                        response = route53.change_resource_record_sets(
                            HostedZoneId=zoneID,
                            ChangeBatch={
                                'Changes': [
                                    {
                                    'Action': 'UPSERT',
                                    'ResourceRecordSet': {
                                        'Name': pfsenseDNS,
                                        'Type': 'A',
                                        'ResourceRecords': [
                                            {
                                                'Value': getWanIP(),
                                            }
                                        ],
                                        'TTL': 300,
                                    },
                                    }
                                ]
                             }
                        )
    

    What this code does is pretty simple. First we have a function that will get us the WAN IP through the ARP table of the PFSense box. We use this function later when we get and check our record sets against this IP address.

    If the addresses do not match, the script will automatically change the entry in Route53 for you!

    To test out the function modify your Route53 entry to some bogus IP address and then run the script. If everything goes as planned you should see your DNS entry changed!

    If you found this helpful please share it with your friends. If you have questions feel free to comment or reach out to me via any method.

  • Pandas & NumPy with AWS Lambda

    Fun fact: Pandas and NumPy don’t work out of the box with Lambda. The libraries that you might download from your development machine probably won’t work either.

    The standard Lambda Python environment is very barebones by default. There is no point in loading in a bunch of libraries if they aren’t needed. This is why we package our Lambda functions into ZIP files to be deployed.

    My first time attempting to use Pandas on AWS Lambda was in regards to concatenating Excel files. The point of this was to take a multi-sheet Excel file and combine it into one sheet for ingestion into a data lake. To accomplish this I used the Pandas library to build the new sheet. In order to automate the process I setup an S3 trigger on a Lambda function to execute the script every time a file was uploaded.

    And then I ran into this error:

    [ERROR] Runtime.ImportModuleError: Unable to import module 'your_module':
    IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!
    Importing the numpy c-extensions failed.

    I had clearly added the NumPy library into my ZIP file:

    So what was the problem? Well, apparently, the version of NumPy that I downloaded on both my Macbook and my Windows desktop is not compatible with Amazon Linux.

    To resolve this issue, I first attempted to download the package files manually from PyPi.org. I grabbed the latest “manylinux1_x86_x64.whl” file for both NumPy and Pandas. I put them back into my ZIP file and re-uploaded the file. This resulted in the same error.

    THE FIX THAT WORKED:

    The way to get this to work without failure is to spin up an Amazon Linux EC2 instance. Yes this seems excessive and it is. Not only did I have to spin up a new instance I had to install Python 3.8 because Amazon Linux ships with Python 2.7 by default. But, once installed you can use Pip to install the libraries to a directory by doing:

    pip3 install -t . <package name>

    This is useful for getting the libraries in the same location to ZIP back up for use. You can remove a lot of the files that are not needed by running:

    rm -r *.dist-info __pycache__

    After you have done the cleanup, you can ZIP up the files and move them back to your development machine, add your Lambda function and, upload to the Lambda console.

    Run a test! It should work as you intended now!

    If you need help with this please reach out to me on social media or leave a comment below.

  • Concatenating Multi-Sheet Excel Files with Python

    I recently came across a data source that used multi-sheets within an Excel file. My dashboard cannot read a multi-sheet Excel file so I needed to combine them into one sheet.

    The file is being uploaded into an S3 bucket and then needs to move through the data lake to be read into the dashboard. The final version of this script will be a Lambda function that is triggered on upload of the file, concatenate the sheets, and then place a new file into the next layer of the data lake.

    Using Pandas you can easily accomplish this task. One issue I did run into is that Pandas no longer will read XLSX files so I did have to convert it down into an XLS file which is easily done through Excel. In the future this will also have to be done programmatically. Let’s get into the code.

    import pandas as pd
    
    workbook = pd.ExcelFile('Yourfile.XLS')
    sheets = ['create', 'a', 'list']
    dataframe = []import pandas as pd
    
    workbook = pd.ExcelFile('file.xls')
    sheets = ['create', 'a', 'list']
    dataframe = []
    
    for sheet in sheets:
        df = pd.read_excel(workbook, sheet_name=sheet, skiprows=[list of rows to skip], skipfooter=number_of_rows_to_skip_from_bottom)
        df.columns = ['list', 'of', 'column', 'headers']
        dataframe.append(df)
    df = pd.concat(dataframe)
    df.to_excel("output.xls", index=False)
    

    To start we are going to import the Pandas library and then read in our Excel file. In the future revision of this script I will be reading in the file from S3 through the Lambda event so this will need to change.

    The “sheets” variable is a list of sheets that you want the script to look at. You can remove this if you want it to look at all the sheets. My file had a few sheets that could be ignored. We will also create an empty list called “dataframe”. This empty list will be used to store each of the sheets that we want to concatenate. In the production version of this script there is some modifications that need to be done on each sheet. I accomplished this by adding in “if/then” statements based on the sheet name.

    At the end of the “for” loop we will append the data frame into our empty list. Once all the sheets have been added, we will use Pandas to concatenate the objects and output the file. You can specify your output file name. I also included the “index=false” which removes the first column of index numbers. This is not needed for my project.

    So there you have it, a simple Python script to concatenate a multi-sheet Excel file. If this script helps you please share it with your network!

  • A File Management Architecture

    A File Management Architecture

    This post is a continuation of my article: “A File Extraction Project”. This project has been a great learning experience for both frontend and backend application architecture and design. Below you will find a diagram and an explanation of all the pieces that make this work.

    1. The entire architecture is powered by Flask on an EC2 instance. When I move this project to production I intend to put an application load balancer in front to manage traffic. The frontend is also secured by Google Authentication. This provides authentication against the users existing GSuite deployment so that only individuals within the organization can access the application.
    2. The first Lambda function processes the upload functions. I am allowing for as many files as needed by the customer. The form also includes a single text field for specifying the value of the object tag. The function sends the objects into the first bucket which is object #4.
    3. The second Lambda function is the search functionality. This function allows the user to provide a tag value. The function queries all objects in bucket #4 and creates a list of objects that match the query. It then moves the objects to bucket #5 where it packages them up and presents them to the user in the form of a ZIP file.
    4. The first bucket is the storage for all of the objects. This is the bucket where all the objects are uploaded to from the first Lambda function. It is not publicly accessible.
    5. The second bucket is a temporary storage for files requested by the user. Objects are moved into this bucket from the first bucket. This bucket has a deletion policy that only allows objects to live inside it for 24 hours.

    Lambda Function for File Uploading:

    def upload():
        if request.method == 'POST':
            tag = request.form['tag']
            files = request.files.getlist('file')
            print(files)
            for file in files:
    
                print(file)
                if file:
                        filename = secure_filename(file.filename)
                        file.save(filename)
                        s3.upload_file(
                            Bucket = BUCKET_NAME,
                            Filename=filename,
                            Key = filename
                        )
                        
                        s3.put_object_tagging(
                            Bucket=BUCKET_NAME,
                            Key=filename,
                            Tagging={
                                'TagSet': [
                                    {
                                        'Key': 'Tag1',
                                        'Value': tag
                                    },
                                    {
                                        'Key': 'Tag2',
                                        'Value': 'Tag-value'
                                    },
                                ]
                            },
                        )
            msg = "Upload Done ! "

    The function lives within the Flask application. I have AWS permissions setup on my EC2 instance to allow the “put_object” function. You can assign tags as needed. The first tag references the $tag variable which is provided by the form submission.

    For Google Authentication I utilized a project I found on Github here. In the “auth” route that is created I modified it to authenticate against the “hd” parameter passed by the processes. You can see how this works here:

    @app.route('/auth')
    def auth():
        token = oauth.google.authorize_access_token()
        user = oauth.google.parse_id_token(token)
        session['user'] = user
        if "hd" not in user:
            abort(403)
        elif user['hd'] != 'Your hosted domain':
            abort(403)
        else:
            return redirect('/')

    If the “hd” parameter is not passed through the function it will abort with a “403” error.

    If you are interested in this project and want more information feel free to reach out and I can provide more code examples or package up the project for you to deploy on your own!

    If you found this article helpful please share it with your friends.