Your cart is currently empty!
Tag: amazon web services
Deleting MANY Lambda Function Versions
I recently came across a challenge where I wanted to purge old Lambda Function versions. Some of the functions had over 65 versions!
The code below will iterate through a text file with a Lambda function defined on each line. It will get a list of all the versions and delete any version that is not the highest version or any versions that are attached to an alias.
import boto3 from botocore.exceptions import ClientError client = boto3.client("lambda", region_name='us-west-2') def delete_function_version(function_name, version): try: response = client.delete_function( FunctionName=function_name, Qualifier=str(version) ) except ClientError as e: print("Failed to delete version of", function_name, "version number", str(version)) print(e) def get_layer_versions(function_name): try: response = client.list_versions_by_function( FunctionName=function_name ) return response['Versions'] except ClientError as e: print('failed to get info for', function_name) print(e) if __name__ == "__main__": print("Starting lambda update") with open("lambda_list.txt", "r") as text: lambda_list = text.read().splitlines() text.close() version_info_results = {} for lambda_function in lambda_list: print("Working with lambda", lambda_function) lambda_versions = get_layer_versions(lambda_function) lambda_version_list = [] for version in lambda_versions: version_number = version['Version'] if version_number == "$LATEST": pass else: lambda_version_list.append(int(version_number)) for lambda_version in lambda_version_list: if lambda_version == max(lambda_version_list): print("This is the latest version skipping", str(lambda_version)) pass else: print("Deleting version", lambda_version, "of", lambda_function) delete_function_version(lambda_function, lambda_version)
To use this function you first need to populate the text file with any Lambda function(s) that you want to evaluate and then execute the Python script.
I hope that this helps you or a coworker!
EDIT: Thanks to Alex Iliev for testing and finding some bugs in this code!
Moving AWS Cloudfront Logs to DynamoDB
I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.
I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:
import boto3 import gzip import uuid from datetime import datetime from datetime import timedelta import time from botocore.exceptions import ClientError #Creates a time to live value def ttl_time(): now = datetime.now() ttl_date = now + timedelta(90) final = str(time.mktime(ttl_date.timetuple())) return final #Puts the log json into dynamodb: def put_to_dynamo(record): client = boto3.resource('dynamodb', region_name='us-west-2') table = client.Table('YOUR_TABLE_NAME') try: response = table.put_item( Item=record ) print(response) except ClientError as e: print("Failed to put record") print(e) return False return True def lambda_handler(event, context): print(event) s3_key = event['Records'][0]['s3']['object']['key'] s3 = boto3.resource("s3") obj = s3.Object("YOUR_BUCKET", s3_key) with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile: content = gzipfile.read() #print(content) my_json = content.decode('utf8').splitlines() my_dict = {} for x in my_json: if x.startswith("#Fields:"): keys = x.split(" ") else: values = x.split("\t") for key in keys: if key == "#Fields:": pass else: for value in values: my_dict[key] = value x = 0 for item in keys: if item == "#Fields:": pass else: my_dict[item] = values[x] x +=1 print('- ' * 20) myuuid = str(uuid.uuid4()) print(myuuid) my_dict["uuid"] = myuuid my_dict['ttl'] = ttl_time() print(my_dict) if put_to_dynamo(my_dict) == True: print("Successfully imported item") return True else: print("Failed to put record") return False
This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.
I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!
If this code helps you please share it with your friends and co-workers!
Updating AWS Managed Prefix Lists
I was working with a customer the other day trying to come up with a way to import a bunch of IP addresses into a white list on AWS. We came up with the approach of using Managed Prefix Lists in VPC. I wrote some Python in order to grab some code from an API and then automatically put it into a prefix list.
The code takes input from an API that is managed by a 3rd party. We first use that and parse the returned values into meaningful lists. After that, we pass the IPs to the function which it will check if the entry exists or not. If it does, it will pass the IP. If it doesn’t exist it will automatically add it.
import requests import json import os import boto3 from botocore.exceptions import ClientError import ipaddress def check_for_existing(list_id, ip): client = boto3.client("ec2", region_name="us-west-2") try: response = client.get_managed_prefix_list_entries( PrefixListId=list_id, MaxResults=100, ) for entry in response['Entries']: if entry['Cidr'] == ip: return True else: pass return False except ClientError as e: print(e) def get_prefix_list_id(list_name): client = boto3.client("ec2", region_name="us-west-2") response = client.describe_managed_prefix_lists( MaxResults=100, Filters=[ { "Name": "prefix-list-name", "Values": [list_name] } ] ) for p_list in response['PrefixLists']: return {"ID": p_list['PrefixListId'], "VERSION": p_list['Version']} def update_managed_prefix_list(list_name, ip): client = boto3.client("ec2", region_name="us-west-2") if check_for_existing(get_prefix_list_id(list_name)['ID'], ip) == True: print("Rule already exists") return False else: try: response = client.modify_managed_prefix_list( DryRun=False, PrefixListId=get_prefix_list_id(list_name)['ID'], CurrentVersion=get_prefix_list_id(list_name)['VERSION'], AddEntries=[ { "Cidr": ip } ] ) return True except ClientError as e: print(e) print("Failed to update list") if __name__ == "__main__": url = "https://<my IP address URL>" headers = {} r = requests.get(url, headers=headers) json_ips = json.loads(r.content) ip = "" list_name = "" result = update_managed_prefix_list(list_name, ip) if result == True: print("Successfully Updates lists") else: print("Failed to update lists")
If you are going to use this code it will need some modifications. I ultimately did not deploy this code but I had plans to run it as a Lambda function on a schedule so the lists would always be up to date.
If this code is helpful to you please share it with your friends!
Github
Automating Security Group Rule Removal
I’m using an Amazon Web Services Security Group as a way to allow traffic into an EC2 instance for the instance’s users. The users can give themselves access through a web interface that I wrote for them. Maybe I’ll cover that in a different post.
I found recently that the Security Group was nearing its maximum rule list. So I decided to start purging rules which would ultimately force them to re-add their IP addresses to the group.
Going in and manually removing rules is rather time-consuming. I figured I could write a script that would handle it for me. The first step was to update my previous script that inserts the rule to add a tag to the rule. The function below takes input of Security Group Id’s as a list and returns all of the rules.
def get_sg_rules(sg_id): client = boto3.client('ec2') response = client.describe_security_group_rules( Filters=[ { 'Name': 'group-id', 'Values': sg_id } ], ) return response
The script below iterates through each of the rules returned and will append the tag of “dateAdded” and a stringified date code.
for sg_rule in get_sg_rules(sg_list)['SecurityGroupRules']: try: client = boto3.client('ec2') response = client.create_tags( DryRun=False, Resources=[ sg_rule['SecurityGroupRuleId'], ], Tags=[ { 'Key': 'dateAdded', 'Value': '2022-11-05' }, ] ) except ClientError as e: print(e)
I then wrote the following Lambda function that runs every day and checks for any expired rules. The schedule is set up by a Cloudwatch Event’s rule.
import boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError def return_today(): now = datetime.now() return now def get_sg_rules(sg_id, old_date): client = boto3.client('ec2') response = client.describe_security_group_rules( Filters=[ { 'Name': 'group-id', 'Values': sg_id }, { 'Name': 'tag:dateAdded', 'Values': [old_date] } ], ) return response def lambda_handler(event, context): sg_list = ["xxxx", "xxx"] old_date = datetime.strftime(return_today() - timedelta(days=30), "%Y-%m-%d") print(old_date) for sg_rule in get_sg_rules(sg_list, old_date)['SecurityGroupRules']: try: client = boto3.client("ec2") response = client.revoke_security_group_ingress( GroupId=sg_rule['GroupId'], SecurityGroupRuleIds=[sg_rule['SecurityGroupRuleId']] ) print(response) print("Successfully deleted the rule") except ClientError as e: print(e) print("Failed to delete rule")
You’ll see that the code has a list of Security Groups to check. It compares the current date to that of 30 days previous. If the tag of “dateAdded” matches that previous date then we will go ahead and remove the rule.
I hope this helps you automate your AWS Accounts. Below are links to the code repository so you can edit the code as needed. Please share it with your friends if this helps you!
EC2 Reservation Notification
I realized today that I haven’t updated my EC2 reservations recently. Wondering why I never did this I came to understand that there was no way that I was getting notified that the reservations were expiring. I spent the day putting together a script that would look through my reservations, assess the time of their expiration, and then notify me if it was nearing my threshold of 3 weeks.
I put this together as a local script but it can also be adapted to run as a lambda function which is what I have it set up to do. As always, you can view my code below and on GitHub.
import boto3 from datetime import datetime, timezone, timedelta from botocore.exceptions import ClientError import os import json ec2_client = boto3.client("ec2", region_name="us-west-2") def get_reserved_instances(): response = ec2_client.describe_reserved_instances() reserved_instances = {} for reservedInstances in response['ReservedInstances']: reserved_instances.update({ reservedInstances['ReservedInstancesId']: { "ExpireDate": reservedInstances['End'], "Type": reservedInstances['InstanceType'] } }) return reserved_instances def determine_expirery(expirery_date): now = datetime.now(timezone.utc) delta_min = timedelta(days=21) delta_max = timedelta(days=22) if expirery_date - now >= delta_min and expirery_date - now < delta_max: return True else: return False #Send Result to SNS def sendToSNS(messages): sns = boto3.client('sns') try: send_message = sns.publish( TargetArn=os.environ['SNS_TOPIC'], Subject='EC2-Reservation', Message=messages, ) return send_message except ClientError as e: print("Failed to send message to SNS") print(e) if __name__ == "__main__": for reservation, res_details in get_reserved_instances().items(): if determine_expirery(res_details['ExpireDate']) == True: sns_message = {"reservation": reservation, "expires": res_details['ExpireDate'].strftime("%m/%d/%Y, %H:%M:%S")} sendToSNS(json.dumps(sns_message)) #
I have an SNS topic setup that is set to send messages to a Lambda function in the backend so I can format my messages and send them to a Slack channel for notifications.
If you have any questions, feel free to comment or message me on Twitter!
Adding a Standard Bucket Policy
It is good practice to deny traffic that is not HTTPS to your S3 bucket. For this reason, I wrote a script that I can use to apply a standard policy to each of my S3 buckets. While the script could be more robust to iterate through each bucket in my account, I decided to write this script to take input of the name of the bucket and then apply the changes.
import boto3 from botocore.exceptions import ClientError import json import sys def check_s3_policy(bucket_name): client = boto3.client("s3", region_name='us-west-2') # Get existing policy so that we don't overwrite anything try: result = client.get_bucket_policy(Bucket=bucket_name) if result == None: return None else: return result except ClientError as e: print("failed to retrieve policy") print(e) return None if __name__ == "__main__": bucket_name = sys.argv[1] source_aws_account = boto3.client('sts').get_caller_identity().get('Account') print("Our current account number: " + source_aws_account) connect_instance_arn = "" standard_bucket_policy = { "Sid": "AWSHTTPSAccess", "Action": [ "s3:*" ], "Effect": "Deny", "Resource": [ "arn:aws:s3:::"+ bucket_name, "arn:aws:s3:::" + bucket_name + "/*" ], "Condition": { "Bool": { "aws:SecureTransport": "false" } }, "Principal": "*" } existing_policy = check_s3_policy(bucket_name) if existing_policy == None: print("No policy exists so lets create a new one") print("Applying our standard bucket policy that denies non-HTTPS traffic...") try: new_bucket_policy = { "Version": "2012-10-17", "Statement": [standard_bucket_policy] } client = boto3.client("s3", region_name='us-west-2') client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(new_bucket_policy)) except ClientError as e: print("failed to put bucket policy") print(e) else: print("There is a policy so we need to modify") policy_to_modify = json.loads(existing_policy['Policy']) policy_to_modify['Statement'].append(standard_bucket_policy) try: client = boto3.client("s3", region_name="us-west-2") client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy_to_modify)) except ClientError as e: print("Error putting new bucket policy") print(e) print("Our bucket now follows all compliance ...") print("Exiting ...")
You can change the policy as needed and use this script to apply changes to your buckets!
I hope that this is helpful to someone. Please share this to your friends!
GithubSecurity Group ID Finder
I have been working on deploying resources to a lot of AWS accounts lately where each account has the same network infrastructure. When deploying Lambdas, I had the common name of the security group but not the ID. I wrote this utility to get the security group ID for me quickly.
import boto3 import sys def get_security_group_id(common_name): ec2 = boto3.client("ec2", region_name="us-west-2") response = ec2.describe_security_groups() for security_group in response['SecurityGroups']: if security_group['GroupName'] == common_name: return security_group['GroupId'] if __name__ == '__main__': if sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "usage" or sys.argv[1] == "--usage": print("USAGE: python3 main.py <security group name>") else: sg_id = get_security_group_id(sys.argv[1]) if sg_id == None: print("Security Group Not found") else: print(sg_id)
This is a simple tool that can be used on your command line by doing:
python3 main.py <security group name>
I hope this helps speed up your deployments. Feel free to share the code with your friends and team!
A Dynamo Data Migration Tool
Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.
A quick walk through video import sys import boto3 ## USAGE ############################################################################ ## python3 dynamo.py <Source_Table> <destination table> ## ## Requires two profiles to be set in your AWS Config file "source", "destination" ## ##################################################################################### def dynamo_bulk_reader(): session = boto3.session.Session(profile_name='source') dynamodb = session.resource('dynamodb', region_name="us-west-2") table = dynamodb.Table(sys.argv[1]) print("Exporting items from: " + str(sys.argv[1])) response = table.scan() data = response['Items'] while 'LastEvaluatedKey' in response: response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) data.extend(response['Items']) print("Finished exporting: " + str(len(data)) + " items.") return data def dynamo_bulk_writer(): session = boto3.session.Session(profile_name='destination') dynamodb = session.resource('dynamodb', region_name='us-west-2') table = dynamodb.Table(sys.argv[2]) print("Importing items into: " + str(sys.argv[2])) for table_item in dynamo_bulk_reader(): with table.batch_writer() as batch: response = batch.put_item( Item=table_item ) print("Finished importing items...") if __name__ == '__main__': print("Starting Dynamo Migrater...") dynamo_bulk_writer() print("Exiting Dynamo Migrator")
The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.
The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!
Querying and Editing a Single Dynamo Object
I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.
In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!
import boto3 from boto3.dynamodb.conditions import Key def query_table(id): dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('XXXXXXXXXXXXXX') response = table.query( KeyConditionExpression=Key('PRIMARYKEY').eq(id) ) return response['Items'] def update_dynanmo_status(id, resource_name, status): dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('XXXXXXXXXXXXX') items = query_table(id) for item in items: # Do your update here response = table.put_item(Item=item) return response
Where Is It Five O’Clock Pt: 3
So I left this project at a point where I felt it needed to be re-architected based on the fact that Flask only executes the function once and not every time the page loads.
I re-architected the application in my head to include an API that calls the Lambda function and returns a list of places where it is and is not acceptable to be drinking based on the 5 O’Clock rules. These two lists will be JSON objects that have a single key with multiple values. The values will be the timezones appropriate to be drinking in.
After the JSON objects are generated I can reference them through the web frontend and display them in an appropriate way.
At this point I have the API built out and fully funcitoning the way I think I want it. You can use it by executing the following:
curl https://5xztnem7v4.execute-api.us-west-2.amazonaws.com/whereisit5
I will probably only have this publically accessible for a few days before locking it back down.
Hopefully, in part 4 of this series, I will have a frontend demo to show!