Tag: python3

  • Moving AWS Cloudfront Logs to DynamoDB

    I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.

    I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:

    import boto3
    import gzip
    import uuid
    from datetime import datetime
    from datetime import timedelta
    import time
    from botocore.exceptions import ClientError
    
    #Creates a time to live value
    def ttl_time():
        now = datetime.now()
        ttl_date = now + timedelta(90)
        final = str(time.mktime(ttl_date.timetuple()))
        return final
    
    #Puts the log json into dynamodb:
    def put_to_dynamo(record):
        client = boto3.resource('dynamodb', region_name='us-west-2')
        table = client.Table('YOUR_TABLE_NAME')
        try:
            response = table.put_item(
                Item=record
            )
            print(response)
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False
    
        return True
    def lambda_handler(event, context):
        print(event)
        s3_key = event['Records'][0]['s3']['object']['key']
        s3 = boto3.resource("s3")
        obj = s3.Object("YOUR_BUCKET", s3_key)
        with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
            content = gzipfile.read()
        #print(content)
        my_json = content.decode('utf8').splitlines()
    
        my_dict = {}
        for x in my_json:
            if x.startswith("#Fields:"):
                keys = x.split(" ")
            else:
                values = x.split("\t")
    
        for key in keys:
            if key == "#Fields:":
                pass
            else:
                for value in values:
                    my_dict[key] = value
        x = 0
        for item in keys:
            if item == "#Fields:":
                pass
            else:
                my_dict[item] = values[x]
                x +=1
    
    
        print('- ' * 20)
        myuuid = str(uuid.uuid4())
        print(myuuid)
        my_dict["uuid"] = myuuid
        my_dict['ttl'] = ttl_time()
    
        print(my_dict)
        if put_to_dynamo(my_dict) == True:
            print("Successfully imported item")
            return True
        else:
            print("Failed to put record")
            return False

    This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.

    I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!

    If this code helps you please share it with your friends and co-workers!

    Code on Github

  • Updating AWS Managed Prefix Lists

    I was working with a customer the other day trying to come up with a way to import a bunch of IP addresses into a white list on AWS. We came up with the approach of using Managed Prefix Lists in VPC. I wrote some Python in order to grab some code from an API and then automatically put it into a prefix list.

    The code takes input from an API that is managed by a 3rd party. We first use that and parse the returned values into meaningful lists. After that, we pass the IPs to the function which it will check if the entry exists or not. If it does, it will pass the IP. If it doesn’t exist it will automatically add it.

    import requests
    import json
    import os
    import boto3
    from botocore.exceptions import ClientError
    import ipaddress
    
    def check_for_existing(list_id, ip):
        client = boto3.client("ec2", region_name="us-west-2")
        try:
            response = client.get_managed_prefix_list_entries(
                PrefixListId=list_id,
                MaxResults=100,
            )
            for entry in response['Entries']:
                if entry['Cidr'] == ip:
                    return True
                else:
                    pass
            return False
        except ClientError as e:
            print(e)
    
    
    
    def get_prefix_list_id(list_name):
        client = boto3.client("ec2", region_name="us-west-2")
        response = client.describe_managed_prefix_lists(
            MaxResults=100,
            Filters=[
                {
                    "Name": "prefix-list-name",
                    "Values": [list_name]
                }
            ]
        )
        for p_list in response['PrefixLists']:
            return {"ID": p_list['PrefixListId'], "VERSION": p_list['Version']}
    
    def update_managed_prefix_list(list_name, ip):
        client = boto3.client("ec2", region_name="us-west-2")
        if check_for_existing(get_prefix_list_id(list_name)['ID'], ip) == True:
            print("Rule already exists")
            return False
        else:
            try:
                response = client.modify_managed_prefix_list(
                            DryRun=False,
                            PrefixListId=get_prefix_list_id(list_name)['ID'],
                            CurrentVersion=get_prefix_list_id(list_name)['VERSION'],
                            AddEntries=[
                                {
                                    "Cidr": ip
                                }
                            ]
                        )
                return True
            except ClientError as e:
                print(e)
                print("Failed to update list")
    
    if __name__ == "__main__":
        url = "https://<my IP address URL>"
        headers = {}
        r = requests.get(url, headers=headers)
        json_ips = json.loads(r.content)
        ip = ""
        list_name = ""
        result = update_managed_prefix_list(list_name, ip)
        if result == True:
            print("Successfully Updates lists")
        else:
            print("Failed to update lists")

    If you are going to use this code it will need some modifications. I ultimately did not deploy this code but I had plans to run it as a Lambda function on a schedule so the lists would always be up to date.

    If this code is helpful to you please share it with your friends!

    Github

  • Automating Security Group Rule Removal

    I’m using an Amazon Web Services Security Group as a way to allow traffic into an EC2 instance for the instance’s users. The users can give themselves access through a web interface that I wrote for them. Maybe I’ll cover that in a different post.

    I found recently that the Security Group was nearing its maximum rule list. So I decided to start purging rules which would ultimately force them to re-add their IP addresses to the group.

    Going in and manually removing rules is rather time-consuming. I figured I could write a script that would handle it for me. The first step was to update my previous script that inserts the rule to add a tag to the rule. The function below takes input of Security Group Id’s as a list and returns all of the rules.

    def get_sg_rules(sg_id):
        client = boto3.client('ec2')
        response = client.describe_security_group_rules(
        Filters=[
            {
                'Name': 'group-id',
                'Values': sg_id
            }
        ],
        )
        return response

    The script below iterates through each of the rules returned and will append the tag of “dateAdded” and a stringified date code.

    for sg_rule in get_sg_rules(sg_list)['SecurityGroupRules']:
            try:
                client = boto3.client('ec2')
                response = client.create_tags(
                DryRun=False,
                Resources=[
                    sg_rule['SecurityGroupRuleId'],
                ],
                Tags=[
                    {
                        'Key': 'dateAdded',
                        'Value': '2022-11-05'
                    },
                ]
            )
            except ClientError as e:
                print(e)

    I then wrote the following Lambda function that runs every day and checks for any expired rules. The schedule is set up by a Cloudwatch Event’s rule.

    import boto3
    from datetime import datetime, timedelta
    from botocore.exceptions import ClientError
    def return_today():
        now = datetime.now()
        return now
    def get_sg_rules(sg_id, old_date):
        client = boto3.client('ec2')
        response = client.describe_security_group_rules(
        Filters=[
            {
                'Name': 'group-id',
                'Values': sg_id
            },
            {
                'Name': 'tag:dateAdded',
                'Values': [old_date]
            }
        ],
        )
        
        return response
    
    def lambda_handler(event, context):
        sg_list = ["xxxx", "xxx"]
        old_date = datetime.strftime(return_today() - timedelta(days=30), "%Y-%m-%d")
        print(old_date)
        for sg_rule in get_sg_rules(sg_list, old_date)['SecurityGroupRules']:
            try:
                client = boto3.client("ec2")
                response = client.revoke_security_group_ingress(
                    GroupId=sg_rule['GroupId'],
                    SecurityGroupRuleIds=[sg_rule['SecurityGroupRuleId']]
                    )
                print(response)
                print("Successfully deleted the rule")
            except ClientError as e:
                print(e)
                print("Failed to delete rule")

    You’ll see that the code has a list of Security Groups to check. It compares the current date to that of 30 days previous. If the tag of “dateAdded” matches that previous date then we will go ahead and remove the rule.

    I hope this helps you automate your AWS Accounts. Below are links to the code repository so you can edit the code as needed. Please share it with your friends if this helps you!

    Github

  • EC2 Reservation Notification

    I realized today that I haven’t updated my EC2 reservations recently. Wondering why I never did this I came to understand that there was no way that I was getting notified that the reservations were expiring. I spent the day putting together a script that would look through my reservations, assess the time of their expiration, and then notify me if it was nearing my threshold of 3 weeks.

    I put this together as a local script but it can also be adapted to run as a lambda function which is what I have it set up to do. As always, you can view my code below and on GitHub.

    import boto3
    from datetime import datetime, timezone, timedelta
    from botocore.exceptions import ClientError
    import os
    import json
    ec2_client = boto3.client("ec2", region_name="us-west-2")
    
    def get_reserved_instances():
        response = ec2_client.describe_reserved_instances()
        reserved_instances = {}
        for reservedInstances in response['ReservedInstances']:
            reserved_instances.update({
                reservedInstances['ReservedInstancesId']: {
                    "ExpireDate": reservedInstances['End'],
                    "Type": reservedInstances['InstanceType']
                }
            })
        return reserved_instances
    def determine_expirery(expirery_date):
        now = datetime.now(timezone.utc)
        delta_min = timedelta(days=21)
        delta_max = timedelta(days=22)
        if expirery_date - now >= delta_min and expirery_date - now < delta_max:
            return True
        else:
            return False
    #Send Result to SNS
    def sendToSNS(messages):
        sns = boto3.client('sns')
        try:
            send_message = sns.publish(
                TargetArn=os.environ['SNS_TOPIC'],
                Subject='EC2-Reservation',
                Message=messages,
                )
            return send_message
        except ClientError as e:
            print("Failed to send message to SNS")
            print(e)
    
    
    if __name__ == "__main__":
    
        for reservation, res_details in get_reserved_instances().items():
            if determine_expirery(res_details['ExpireDate']) == True:
                sns_message = {"reservation": reservation, "expires": res_details['ExpireDate'].strftime("%m/%d/%Y, %H:%M:%S")}
                sendToSNS(json.dumps(sns_message))
    #  

    I have an SNS topic setup that is set to send messages to a Lambda function in the backend so I can format my messages and send them to a Slack channel for notifications.

    If you have any questions, feel free to comment or message me on Twitter!

    GitHub

  • Security Group ID Finder

    Security Group ID Finder

    I have been working on deploying resources to a lot of AWS accounts lately where each account has the same network infrastructure. When deploying Lambdas, I had the common name of the security group but not the ID. I wrote this utility to get the security group ID for me quickly.

    import boto3
    import sys
    
    def get_security_group_id(common_name):
        ec2 = boto3.client("ec2", region_name="us-west-2")
    
        response = ec2.describe_security_groups()
        for security_group in response['SecurityGroups']:
            if security_group['GroupName'] == common_name:
                return security_group['GroupId']
            
    if __name__ == '__main__':
        if sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "usage" or sys.argv[1] == "--usage":
            print("USAGE: python3 main.py <security group name>")
        else:
            sg_id = get_security_group_id(sys.argv[1])
            if sg_id == None:
                print("Security Group Not found")
            else:
                print(sg_id)

    This is a simple tool that can be used on your command line by doing:

    python3 main.py <security group name>

    I hope this helps speed up your deployments. Feel free to share the code with your friends and team!

    Github

  • A Dynamo Data Migration Tool

    A Dynamo Data Migration Tool

    Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.

    A quick walk through video
    import sys
    import boto3
    
    ## USAGE ############################################################################
    ## python3 dynamo.py <Source_Table> <destination table>                            ## 
    ## Requires two profiles to be set in your AWS Config file "source", "destination" ##
    #####################################################################################
    def dynamo_bulk_reader():
        session = boto3.session.Session(profile_name='source')
        dynamodb = session.resource('dynamodb', region_name="us-west-2")
        table = dynamodb.Table(sys.argv[1])
    
        print("Exporting items from: " + str(sys.argv[1]))
    
        response = table.scan()
        data = response['Items']
    
        while 'LastEvaluatedKey' in response:
            response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
            data.extend(response['Items'])
    
        print("Finished exporting: " + str(len(data)) + " items.")
        return data
    
    def dynamo_bulk_writer():
        session = boto3.session.Session(profile_name='destination')
        dynamodb = session.resource('dynamodb', region_name='us-west-2')
        table = dynamodb.Table(sys.argv[2])
        print("Importing items into: " + str(sys.argv[2]))
        for table_item in dynamo_bulk_reader():
            with table.batch_writer() as batch:
                response = batch.put_item(
                Item=table_item
                )
    
        print("Finished importing items...")
    if __name__ == '__main__':
        print("Starting Dynamo Migrater...")
        dynamo_bulk_writer()
        print("Exiting Dynamo Migrator")

    The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.

    The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!

    Github: https://github.com/avansledright/dynamo-migrate

  • Querying and Editing a Single Dynamo Object

    I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.

    In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!

    import boto3 
    from boto3.dynamodb.conditions import Key
    
    def query_table(id):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXXX')
        response = table.query(
            KeyConditionExpression=Key('PRIMARYKEY').eq(id)
        )
        return response['Items']
    
    
    def update_dynanmo_status(id, resource_name, status):
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('XXXXXXXXXXXXX')
        items = query_table(id)
        for item in items:
            # Do your update here
            response = table.put_item(Item=item)
        return response
  • EC2 Action Slack Notification

    I took a brief break from my Lambda function creation journey to go on vacation but, now i’m back!

    This function will notify a Slack channel of your choosing when an EC2 instance enters “Starting, Stopping, Stopped, or Shutting-Down” status. I thought this might be useful for instances that reside under a load balancer. It would be useful to see when your load balancer is scaling up or down in real-time via Slack notification.

    In order to use this function, you will need to create a Slack Application with an OAuth key and set that key as an environment variable in your Lambda function. If you are unsure of how to do this I can walk you through it!

    Please review the function below

    import logging
    import requests
    import boto3
    import os
    from urllib.parse import unquote_plus
    from slack import WebClient
    from slack.errors import SlackApiError
    logging.basicConfig(level=logging.DEBUG)
    
    # Check EC2 Status
    def lambda_handler(event, context):
        detail = event['detail']
        ids = detail['instance-id']
        eventname = detail['state']
        ec2 = boto3.resource('ec2')
    # Slack Variables
        slack_token = os.environ["slackBot"]
        client = WebClient(token=slack_token)
        channel_string = "XXXXXXXXXXXXXXXXXXXX"
    
    # Post to slack that the instance is running
        if eventname == 'running':
            try:
              instance = ids
              response_string = f"The instance: {instance} has started"
              response = client.chat_postMessage(
                channel= channel_string,
              	text="An Instance has started",
               	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
            except SlackApiError as e:
              assert e.response["error"]  
    
    		#Post to slack that instance is shutting down
        elif eventname == 'shutting-down':
        	try:
    	        instance = ids
    	        response_string = f"The instance: {instance} is shutting down"
    	        response = client.chat_postMessage(
    	        	channel= channel_string,
    	        	text="An Instance is Shutting Down",
    	        	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
        	except SlackApiError as e:
               assert e.response["error"]
    	      	
        elif eventname == 'stopped':
        	try:
    	        instance = ids
    	        response_string = f"The instance: {instance} has stopped"
    	        response = client.chat_postMessage(
    	        	channel= channel_string,
    	        	text="An Instance has stopped",
    	        	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
        	except SlackApiError as e:
        		assert e.response["error"]
    	      	
        elif eventname == 'stopping':
        	try:
    	        instance = ids
    	        response_string = f"The instance: {instance} is stopping"
    	        response = client.chat_postMessage(
    	        	channel= channel_string,
    	        	text="An Instance is stopping",
    	        	blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
    	        	)
        	except SlackApiError as e:
        		assert e.response["error"]
    

    As always the function is available on GitHub as well:
    https://github.com/avansledright/ec2ActionPostToSlack

    If you find this function helpful please share it with your friends or repost it on your favorite social media platform!

  • Lambda Function Post to Slack

    I wrote this script out of a need to practice my Python skills. The idea is that if a file gets uploaded to an S3 bucket then the function will trigger and a message with that file name will be posted to a Slack channel of your choosing.

    To utilize this you will need to include the Slack pip package as well as the slackclient pip package when you upload the function to the AWS Console.

    You will also need to create an OAuth key for a Slack application. If you are unfamiliar with this process feel free to drop a comment below and or shoot me a message and I can walk you through the process or write a second part of the guide.

    Here is a link to the project:
    https://github.com/avansledright/posttoSlackLambda

    If this helps you please share this post on your favorite social media platform!

  • Automatically Transcribing Audio Files with Amazon Web Services

    Automatically Transcribing Audio Files with Amazon Web Services

    I wrote this Lambda function to automatically transcribe audio files that are uploaded to an S3 bucket. This is written in Python3 and utilizes the Boto3 library.

    You will need to give your Lambda function permissions to access S3, Transcribe and CloudWatch.

    The script will create an AWS Transcribe job with the format: 'filetranscription'+YYYYMMDD-HHMMSS

    I will be iterating over the script to hopefully add in a web front end as well as potentially branching to do voice call transcriptions for phone calls and Amazon Connect.

    You can view the code here

    If you have questions or comments feel free to reach out to me here or on any Social Media.