Tag: aws

  • EC2 Reservation Notification

    I realized today that I haven’t updated my EC2 reservations recently. Wondering why I never did this I came to understand that there was no way that I was getting notified that the reservations were expiring. I spent the day putting together a script that would look through my reservations, assess the time of their expiration, and then notify me if it was nearing my threshold of 3 weeks.

    I put this together as a local script but it can also be adapted to run as a lambda function which is what I have it set up to do. As always, you can view my code below and on GitHub.

    import boto3
    from datetime import datetime, timezone, timedelta
    from botocore.exceptions import ClientError
    import os
    import json
    ec2_client = boto3.client("ec2", region_name="us-west-2")
    
    def get_reserved_instances():
        response = ec2_client.describe_reserved_instances()
        reserved_instances = {}
        for reservedInstances in response['ReservedInstances']:
            reserved_instances.update({
                reservedInstances['ReservedInstancesId']: {
                    "ExpireDate": reservedInstances['End'],
                    "Type": reservedInstances['InstanceType']
                }
            })
        return reserved_instances
    def determine_expirery(expirery_date):
        now = datetime.now(timezone.utc)
        delta_min = timedelta(days=21)
        delta_max = timedelta(days=22)
        if expirery_date - now >= delta_min and expirery_date - now < delta_max:
            return True
        else:
            return False
    #Send Result to SNS
    def sendToSNS(messages):
        sns = boto3.client('sns')
        try:
            send_message = sns.publish(
                TargetArn=os.environ['SNS_TOPIC'],
                Subject='EC2-Reservation',
                Message=messages,
                )
            return send_message
        except ClientError as e:
            print("Failed to send message to SNS")
            print(e)
    
    
    if __name__ == "__main__":
    
        for reservation, res_details in get_reserved_instances().items():
            if determine_expirery(res_details['ExpireDate']) == True:
                sns_message = {"reservation": reservation, "expires": res_details['ExpireDate'].strftime("%m/%d/%Y, %H:%M:%S")}
                sendToSNS(json.dumps(sns_message))
    #  

    I have an SNS topic setup that is set to send messages to a Lambda function in the backend so I can format my messages and send them to a Slack channel for notifications.

    If you have any questions, feel free to comment or message me on Twitter!

    GitHub

  • Adding a Standard Bucket Policy

    It is good practice to deny traffic that is not HTTPS to your S3 bucket. For this reason, I wrote a script that I can use to apply a standard policy to each of my S3 buckets. While the script could be more robust to iterate through each bucket in my account, I decided to write this script to take input of the name of the bucket and then apply the changes.

    import boto3
    from botocore.exceptions import ClientError
    import json
    import sys
    
    def check_s3_policy(bucket_name):
        client = boto3.client("s3", region_name='us-west-2')
    
        # Get existing policy so that we don't overwrite anything
        try:
            result = client.get_bucket_policy(Bucket=bucket_name)
            if result == None:
                return None
            else:
                return result
        except ClientError as e:
            print("failed to retrieve policy")
            print(e)
            return None
    
    if __name__ == "__main__":
        bucket_name = sys.argv[1]
        source_aws_account = boto3.client('sts').get_caller_identity().get('Account')
        print("Our current account number: " + source_aws_account)
        connect_instance_arn = ""
        standard_bucket_policy = {
    
                    "Sid": "AWSHTTPSAccess",
                    "Action": [
                        "s3:*"
                    ],
                    "Effect": "Deny",
                    "Resource": [
                        "arn:aws:s3:::"+ bucket_name,
                        "arn:aws:s3:::" + bucket_name + "/*"
                    ],
                    "Condition": {
                        "Bool": {
                            "aws:SecureTransport": "false"
                        }
                    },
                    "Principal": "*"
    
        }
        
        existing_policy = check_s3_policy(bucket_name)
        if existing_policy == None:
            print("No policy exists so lets create a new one")
            print("Applying our standard bucket policy that denies non-HTTPS traffic...")
            try:
                new_bucket_policy = {
                    "Version": "2012-10-17",
                    "Statement": [standard_bucket_policy]
                }
                client = boto3.client("s3", region_name='us-west-2')
                client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(new_bucket_policy))
                
            except ClientError as e:
                print("failed to put bucket policy")
                print(e)
        else:
            print("There is a policy so we need to modify")
            policy_to_modify = json.loads(existing_policy['Policy'])
            policy_to_modify['Statement'].append(standard_bucket_policy)
            try:
                client = boto3.client("s3", region_name="us-west-2")
                client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy_to_modify))
            except ClientError as e:
                print("Error putting new bucket policy")
                print(e)
            
        print("Our bucket now follows all compliance ...")
        print("Exiting ...")

    You can change the policy as needed and use this script to apply changes to your buckets!

    I hope that this is helpful to someone. Please share this to your friends!
    Github

  • Security Group ID Finder

    Security Group ID Finder

    I have been working on deploying resources to a lot of AWS accounts lately where each account has the same network infrastructure. When deploying Lambdas, I had the common name of the security group but not the ID. I wrote this utility to get the security group ID for me quickly.

    import boto3
    import sys
    
    def get_security_group_id(common_name):
        ec2 = boto3.client("ec2", region_name="us-west-2")
    
        response = ec2.describe_security_groups()
        for security_group in response['SecurityGroups']:
            if security_group['GroupName'] == common_name:
                return security_group['GroupId']
            
    if __name__ == '__main__':
        if sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "usage" or sys.argv[1] == "--usage":
            print("USAGE: python3 main.py <security group name>")
        else:
            sg_id = get_security_group_id(sys.argv[1])
            if sg_id == None:
                print("Security Group Not found")
            else:
                print(sg_id)

    This is a simple tool that can be used on your command line by doing:

    python3 main.py <security group name>

    I hope this helps speed up your deployments. Feel free to share the code with your friends and team!

    Github

  • A Dynamo Data Migration Tool

    A Dynamo Data Migration Tool

    Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.

    A quick walk through video
    import sys
    import boto3
    
    ## USAGE ############################################################################
    ## python3 dynamo.py <Source_Table> <destination table>                            ## 
    ## Requires two profiles to be set in your AWS Config file "source", "destination" ##
    #####################################################################################
    def dynamo_bulk_reader():
        session = boto3.session.Session(profile_name='source')
        dynamodb = session.resource('dynamodb', region_name="us-west-2")
        table = dynamodb.Table(sys.argv[1])
    
        print("Exporting items from: " + str(sys.argv[1]))
    
        response = table.scan()
        data = response['Items']
    
        while 'LastEvaluatedKey' in response:
            response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
            data.extend(response['Items'])
    
        print("Finished exporting: " + str(len(data)) + " items.")
        return data
    
    def dynamo_bulk_writer():
        session = boto3.session.Session(profile_name='destination')
        dynamodb = session.resource('dynamodb', region_name='us-west-2')
        table = dynamodb.Table(sys.argv[2])
        print("Importing items into: " + str(sys.argv[2]))
        for table_item in dynamo_bulk_reader():
            with table.batch_writer() as batch:
                response = batch.put_item(
                Item=table_item
                )
    
        print("Finished importing items...")
    if __name__ == '__main__':
        print("Starting Dynamo Migrater...")
        dynamo_bulk_writer()
        print("Exiting Dynamo Migrator")

    The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.

    The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!

    Github: https://github.com/avansledright/dynamo-migrate

  • Querying and Editing a Single Dynamo Object

    I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.

    In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!

    import boto3 
    from boto3.dynamodb.conditions import Key
    
    def query_table(id):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXXX')
        response = table.query(
            KeyConditionExpression=Key('PRIMARYKEY').eq(id)
        )
        return response['Items']
    
    
    def update_dynanmo_status(id, resource_name, status):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXX')
        items = query_table(id)
        for item in items:
            # Do your update here
            response = table.put_item(Item=item)
        return response
  • Where Is It 5 O’Clock Pt: 4

    As much as I’ve scratched my head working on this project it has been fun to learn some new things and build something that isn’t infrastructure automation. I’ve learned some frontend web development some backend development and utilized some new Amazon Web Services products.

    With all that nice stuff said I’m proud to announce that I have built a fully functioning project that is finally working the way I intended it. You can visit the website here:

    www.whereisitfiveoclock.net

    To recap, I bought this domain one night as a joke and thought “Hey, maybe one day I’ll build something”. I started off building a fully Python application backed by Flask. You can read about that in Part 1.This did not work out the way I intended as it did not refresh the timezones on page load. In part 3 I discussed how I was rearchitecting the project to include an API that would be called upon page load.

    The API worked great and delivered two JSON objects into my frontend. I then parsed the two JSON objects into two separate tables that display where you can be drinking and where you probably shouldn’t be drinking.

    This is a snippet of the JavaScript I wrote to iterate over the JSON objects while adding them into the appropriate table:

    function buildTable(someinfo){
                    var table1 = document.getElementById('its5pmsomewhere')
                    var table2 = document.getElementById('itsnot5here')
                    var its5_json = JSON.parse(someinfo[0]);
                    var not5_json = JSON.parse(someinfo[1]);
                    var its5_array = []
                    var not5_array = []
                    its5_json['its5'].forEach((value, index) => {
    
                        var row = `<tr>
                                    <td>${value}</td>
                                    <td></td>
                                    </tr>`
                    
                        table1.innerHTML += row
                    })  
                    not5_json['not5'].forEach((value, index) => {
    
                            var row = `<tr>
                                    <td></td>
                                    <td>${value}</td>
                                    </tr>`
                    
                        table2.innerHTML += row
                    })  

    First I reference two different HTML tables. I then parse the JSON from the API. I take both JSON objects and iterate over them adding the timezones into the table and then returning them into the HTML table.

    If you want more information on how I did this feel free to reach out.

    I want to continue iterating over this application to add new features. I need to do some standard things like adding Google Analytics so I can track traffic. I also want to add a search feature and a map that displays the different areas of drinking acceptability.

    I also am open to requests. One of my friends suggested that I add a countdown timer to each location that it is not yet acceptable to be drinking.

    Feel free to reach out in the comments or on your favorite social media platform! And as always, if you liked this project please share it with your friends.

  • Where Is It Five O’Clock Pt: 3

    So I left this project at a point where I felt it needed to be re-architected based on the fact that Flask only executes the function once and not every time the page loads.

    I re-architected the application in my head to include an API that calls the Lambda function and returns a list of places where it is and is not acceptable to be drinking based on the 5 O’Clock rules. These two lists will be JSON objects that have a single key with multiple values. The values will be the timezones appropriate to be drinking in.

    After the JSON objects are generated I can reference them through the web frontend and display them in an appropriate way.

    At this point I have the API built out and fully funcitoning the way I think I want it. You can use it by executing the following:
    curl https://5xztnem7v4.execute-api.us-west-2.amazonaws.com/whereisit5

    I will probably only have this publically accessible for a few days before locking it back down.

    Hopefully, in part 4 of this series, I will have a frontend demo to show!

  • Where Is It 5 O’Clock Pt: 2

    So I spend the evening deploying this web application to Amazon Web Services. In my test environment, everything appeared to be working great because every time I reloaded the page it reloaded the function as well.

    When I transferred this over to a live environment I realized the Python function only ran every time I committed a change and it was re-deployed to my Elastic Beanstalk environment.

    This poses a new problem. If the function doesn’t fire every time the page is refreshed the time won’t properly update and it will show incorrect areas of where it is 5 O’Clock. Ugh.

    So, over the next few weeks, in my spare time, I will be re-writing this entire application to function the way I intended it to.

    I think to do this I will write each function as an AWS Lambda function and then write a frontend that calls these functions on page load. Or, the entire thing will be one function and return the information and it will deploy in one API call.

    I also really want to display a map that shows the areas that it is 5PM or later but I think this will come in a later revision once the project is actually functioning correctly. Along with some more CSS to make it pretty and responsive so it works on all devices.

    The punch list is getting long…

    Follow along here: https://whereisitfiveoclock.net

  • Where Is It Five O’Clock Pt: 1

    I bought the domain whereisitfiveoclock.net a while back and have been sitting on it for quite some time. I had an idea to make a web application that would tell you where it is five o’clock. Yes, this is a drinking website.

    I saw this project as a way to learn more Python skills, as well as some more AWS skills, and boy, has it put me to the test. So I’m going to write this series of posts as a way to document my progress in building this application.

    Part One: Building The Application

    I know that I want to use Python because it is my language of choice. I then researched what libraries I could use to build the frontend with. I came across Flask as an option and decided to run with that. The next step I had to do was actually find out where it was 5PM.

    In my head, I came up with the process that if I could first get a list of all the timezone and identify the current time in them I could filter out which timezones it was 5PM. Once establishing where it was 5PM, I can then get that information to Flask and figure out a way to display it.

    Here is the function for identifying the current time in all timezones and then storing each key pair of {Timezone : Current_Time }

    def getTime():
        now_utc = datetime.now(timezone('UTC'))
        #print('UTC:', now_utc)
        timezones = pytz.all_timezones
        #get all current times and store them into a list
        tz_array = []
        for tz in timezones:
            current_time = now_utc.astimezone(timezone(tz))
            values = {tz: current_time.hour}
            tz_array.append(values)
            
        return tz_array

    Once everything was stored into tz_array I took that info and passed it through the following function to identify it was 5PM. I have another function that identifies everything that is NOT 5PM.

    def find5PM():
        its5pm = []
        for tz in tz_array:
            timezones = tz.items()
            for timezone, hour in timezones:
                if hour >= 17:
                    its5pm.append(timezone)
        return its5pm

    I made a new list and stored just the timezone name into that list and return it.

    Once I had all these together I passed them through as variables to Flask. This is where I first started to struggle. In my original revisions of the functions, I was only returning one of the values rather than returning ALL of the values. This resulted in hours of struggling to identify the cause of the problem. Eventually, I had to start over and completely re-work the code until I ended up with what you see above.

    The code was finally functional and I was ready to deploy it to Amazon Web Services for public access. I will discuss my design and deployment in Part Two.

    http://whereisitfiveoclock.net

  • EC2 Action Slack Notification

    I took a brief break from my Lambda function creation journey to go on vacation but, now i’m back!

    This function will notify a Slack channel of your choosing when an EC2 instance enters “Starting, Stopping, Stopped, or Shutting-Down” status. I thought this might be useful for instances that reside under a load balancer. It would be useful to see when your load balancer is scaling up or down in real-time via Slack notification.

    In order to use this function, you will need to create a Slack Application with an OAuth key and set that key as an environment variable in your Lambda function. If you are unsure of how to do this I can walk you through it!

    Please review the function below

    import logging
    import requests
    import boto3
    import os
    from urllib.parse import unquote_plus
    from slack import WebClient
    from slack.errors import SlackApiError
    logging.basicConfig(level=logging.DEBUG)
    
    # Check EC2 Status
    def lambda_handler(event, context):
        detail = event['detail']
        ids = detail['instance-id']
        eventname = detail['state']
        ec2 = boto3.resource('ec2')
    # Slack Variables
        slack_token = os.environ["slackBot"]
        client = WebClient(token=slack_token)
        channel_string = "XXXXXXXXXXXXXXXXXXXX"
    
    # Post to slack that the instance is running
        if eventname == 'running':
            try:
              instance = ids
              response_string = f"The instance: {instance} has started"
              response = client.chat_postMessage(
                channel= channel_string,
              	text="An Instance has started",
               	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
            except SlackApiError as e:
              assert e.response["error"]  
    
    		#Post to slack that instance is shutting down
        elif eventname == 'shutting-down':
        	try:
    	        instance = ids
    	        response_string = f"The instance: {instance} is shutting down"
    	        response = client.chat_postMessage(
    	        	channel= channel_string,
    	        	text="An Instance is Shutting Down",
    	        	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
        	except SlackApiError as e:
               assert e.response["error"]
    	      	
        elif eventname == 'stopped':
        	try:
    	        instance = ids
    	        response_string = f"The instance: {instance} has stopped"
    	        response = client.chat_postMessage(
    	        	channel= channel_string,
    	        	text="An Instance has stopped",
    	        	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
        	except SlackApiError as e:
        		assert e.response["error"]
    	      	
        elif eventname == 'stopping':
        	try:
    	        instance = ids
    	        response_string = f"The instance: {instance} is stopping"
    	        response = client.chat_postMessage(
    	        	channel= channel_string,
    	        	text="An Instance is stopping",
    	        	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
        	except SlackApiError as e:
        		assert e.response["error"]
    

    As always the function is available on GitHub as well:
    https://github.com/avansledright/ec2ActionPostToSlack

    If you find this function helpful please share it with your friends or repost it on your favorite social media platform!