Your cart is currently empty!
Category: Cloud Architecting
Automating Security Group Rule Removal
I’m using an Amazon Web Services Security Group as a way to allow traffic into an EC2 instance for the instance’s users. The users can give themselves access through a web interface that I wrote for them. Maybe I’ll cover that in a different post.
I found recently that the Security Group was nearing its maximum rule list. So I decided to start purging rules which would ultimately force them to re-add their IP addresses to the group.
Going in and manually removing rules is rather time-consuming. I figured I could write a script that would handle it for me. The first step was to update my previous script that inserts the rule to add a tag to the rule. The function below takes input of Security Group Id’s as a list and returns all of the rules.
def get_sg_rules(sg_id): client = boto3.client('ec2') response = client.describe_security_group_rules( Filters=[ { 'Name': 'group-id', 'Values': sg_id } ], ) return response
The script below iterates through each of the rules returned and will append the tag of “dateAdded” and a stringified date code.
for sg_rule in get_sg_rules(sg_list)['SecurityGroupRules']: try: client = boto3.client('ec2') response = client.create_tags( DryRun=False, Resources=[ sg_rule['SecurityGroupRuleId'], ], Tags=[ { 'Key': 'dateAdded', 'Value': '2022-11-05' }, ] ) except ClientError as e: print(e)
I then wrote the following Lambda function that runs every day and checks for any expired rules. The schedule is set up by a Cloudwatch Event’s rule.
import boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError def return_today(): now = datetime.now() return now def get_sg_rules(sg_id, old_date): client = boto3.client('ec2') response = client.describe_security_group_rules( Filters=[ { 'Name': 'group-id', 'Values': sg_id }, { 'Name': 'tag:dateAdded', 'Values': [old_date] } ], ) return response def lambda_handler(event, context): sg_list = ["xxxx", "xxx"] old_date = datetime.strftime(return_today() - timedelta(days=30), "%Y-%m-%d") print(old_date) for sg_rule in get_sg_rules(sg_list, old_date)['SecurityGroupRules']: try: client = boto3.client("ec2") response = client.revoke_security_group_ingress( GroupId=sg_rule['GroupId'], SecurityGroupRuleIds=[sg_rule['SecurityGroupRuleId']] ) print(response) print("Successfully deleted the rule") except ClientError as e: print(e) print("Failed to delete rule")
You’ll see that the code has a list of Security Groups to check. It compares the current date to that of 30 days previous. If the tag of “dateAdded” matches that previous date then we will go ahead and remove the rule.
I hope this helps you automate your AWS Accounts. Below are links to the code repository so you can edit the code as needed. Please share it with your friends if this helps you!
EC2 Reservation Notification
I realized today that I haven’t updated my EC2 reservations recently. Wondering why I never did this I came to understand that there was no way that I was getting notified that the reservations were expiring. I spent the day putting together a script that would look through my reservations, assess the time of their expiration, and then notify me if it was nearing my threshold of 3 weeks.
I put this together as a local script but it can also be adapted to run as a lambda function which is what I have it set up to do. As always, you can view my code below and on GitHub.
import boto3 from datetime import datetime, timezone, timedelta from botocore.exceptions import ClientError import os import json ec2_client = boto3.client("ec2", region_name="us-west-2") def get_reserved_instances(): response = ec2_client.describe_reserved_instances() reserved_instances = {} for reservedInstances in response['ReservedInstances']: reserved_instances.update({ reservedInstances['ReservedInstancesId']: { "ExpireDate": reservedInstances['End'], "Type": reservedInstances['InstanceType'] } }) return reserved_instances def determine_expirery(expirery_date): now = datetime.now(timezone.utc) delta_min = timedelta(days=21) delta_max = timedelta(days=22) if expirery_date - now >= delta_min and expirery_date - now < delta_max: return True else: return False #Send Result to SNS def sendToSNS(messages): sns = boto3.client('sns') try: send_message = sns.publish( TargetArn=os.environ['SNS_TOPIC'], Subject='EC2-Reservation', Message=messages, ) return send_message except ClientError as e: print("Failed to send message to SNS") print(e) if __name__ == "__main__": for reservation, res_details in get_reserved_instances().items(): if determine_expirery(res_details['ExpireDate']) == True: sns_message = {"reservation": reservation, "expires": res_details['ExpireDate'].strftime("%m/%d/%Y, %H:%M:%S")} sendToSNS(json.dumps(sns_message)) #
I have an SNS topic setup that is set to send messages to a Lambda function in the backend so I can format my messages and send them to a Slack channel for notifications.
If you have any questions, feel free to comment or message me on Twitter!
Adding a Standard Bucket Policy
It is good practice to deny traffic that is not HTTPS to your S3 bucket. For this reason, I wrote a script that I can use to apply a standard policy to each of my S3 buckets. While the script could be more robust to iterate through each bucket in my account, I decided to write this script to take input of the name of the bucket and then apply the changes.
import boto3 from botocore.exceptions import ClientError import json import sys def check_s3_policy(bucket_name): client = boto3.client("s3", region_name='us-west-2') # Get existing policy so that we don't overwrite anything try: result = client.get_bucket_policy(Bucket=bucket_name) if result == None: return None else: return result except ClientError as e: print("failed to retrieve policy") print(e) return None if __name__ == "__main__": bucket_name = sys.argv[1] source_aws_account = boto3.client('sts').get_caller_identity().get('Account') print("Our current account number: " + source_aws_account) connect_instance_arn = "" standard_bucket_policy = { "Sid": "AWSHTTPSAccess", "Action": [ "s3:*" ], "Effect": "Deny", "Resource": [ "arn:aws:s3:::"+ bucket_name, "arn:aws:s3:::" + bucket_name + "/*" ], "Condition": { "Bool": { "aws:SecureTransport": "false" } }, "Principal": "*" } existing_policy = check_s3_policy(bucket_name) if existing_policy == None: print("No policy exists so lets create a new one") print("Applying our standard bucket policy that denies non-HTTPS traffic...") try: new_bucket_policy = { "Version": "2012-10-17", "Statement": [standard_bucket_policy] } client = boto3.client("s3", region_name='us-west-2') client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(new_bucket_policy)) except ClientError as e: print("failed to put bucket policy") print(e) else: print("There is a policy so we need to modify") policy_to_modify = json.loads(existing_policy['Policy']) policy_to_modify['Statement'].append(standard_bucket_policy) try: client = boto3.client("s3", region_name="us-west-2") client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy_to_modify)) except ClientError as e: print("Error putting new bucket policy") print(e) print("Our bucket now follows all compliance ...") print("Exiting ...")
You can change the policy as needed and use this script to apply changes to your buckets!
I hope that this is helpful to someone. Please share this to your friends!
GithubSecurity Group ID Finder
I have been working on deploying resources to a lot of AWS accounts lately where each account has the same network infrastructure. When deploying Lambdas, I had the common name of the security group but not the ID. I wrote this utility to get the security group ID for me quickly.
import boto3 import sys def get_security_group_id(common_name): ec2 = boto3.client("ec2", region_name="us-west-2") response = ec2.describe_security_groups() for security_group in response['SecurityGroups']: if security_group['GroupName'] == common_name: return security_group['GroupId'] if __name__ == '__main__': if sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "usage" or sys.argv[1] == "--usage": print("USAGE: python3 main.py <security group name>") else: sg_id = get_security_group_id(sys.argv[1]) if sg_id == None: print("Security Group Not found") else: print(sg_id)
This is a simple tool that can be used on your command line by doing:
python3 main.py <security group name>
I hope this helps speed up your deployments. Feel free to share the code with your friends and team!
A Dynamo Data Migration Tool
Have you ever wanted to migrate data from one Dynamo DB table to another? I haven’t seen an AWS tool to do this so I wrote one using Python.
A quick walk through video import sys import boto3 ## USAGE ############################################################################ ## python3 dynamo.py <Source_Table> <destination table> ## ## Requires two profiles to be set in your AWS Config file "source", "destination" ## ##################################################################################### def dynamo_bulk_reader(): session = boto3.session.Session(profile_name='source') dynamodb = session.resource('dynamodb', region_name="us-west-2") table = dynamodb.Table(sys.argv[1]) print("Exporting items from: " + str(sys.argv[1])) response = table.scan() data = response['Items'] while 'LastEvaluatedKey' in response: response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) data.extend(response['Items']) print("Finished exporting: " + str(len(data)) + " items.") return data def dynamo_bulk_writer(): session = boto3.session.Session(profile_name='destination') dynamodb = session.resource('dynamodb', region_name='us-west-2') table = dynamodb.Table(sys.argv[2]) print("Importing items into: " + str(sys.argv[2])) for table_item in dynamo_bulk_reader(): with table.batch_writer() as batch: response = batch.put_item( Item=table_item ) print("Finished importing items...") if __name__ == '__main__': print("Starting Dynamo Migrater...") dynamo_bulk_writer() print("Exiting Dynamo Migrator")
The process is pretty simple. First, we get all of our data from our source table. We store this in a list. Next, we iterate over that list and write it to our destination table using the ‘Batch Writer’.
The program has been tested against tables containing over 300 items. Feel free to use it for your environments! If you do use it, please share it with your friends and link back to this article!
Querying and Editing a Single Dynamo Object
I have a workflow that creates a record inside of a DynamoDB table as part of a pipeline within AWS. The record has a primary key of the Code Pipeline job. Later in the pipeline I wanted to edit that object to append the status of resources created by this pipeline.
In order to do this, I created two functions. One that first returns the item from the table and the second that actually does the update and puts the updated item back into the table. Take a look at the code below and utilize it if you need to!
import boto3 from boto3.dynamodb.conditions import Key def query_table(id): dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('XXXXXXXXXXXXXX') response = table.query( KeyConditionExpression=Key('PRIMARYKEY').eq(id) ) return response['Items'] def update_dynanmo_status(id, resource_name, status): dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('XXXXXXXXXXXXX') items = query_table(id) for item in items: # Do your update here response = table.put_item(Item=item) return response
Pandas & NumPy with AWS Lambda
Fun fact: Pandas and NumPy don’t work out of the box with Lambda. The libraries that you might download from your development machine probably won’t work either.
The standard Lambda Python environment is very barebones by default. There is no point in loading in a bunch of libraries if they aren’t needed. This is why we package our Lambda functions into ZIP files to be deployed.
My first time attempting to use Pandas on AWS Lambda was in regards to concatenating Excel files. The point of this was to take a multi-sheet Excel file and combine it into one sheet for ingestion into a data lake. To accomplish this I used the Pandas library to build the new sheet. In order to automate the process I setup an S3 trigger on a Lambda function to execute the script every time a file was uploaded.
And then I ran into this error:
[ERROR] Runtime.ImportModuleError: Unable to import module 'your_module': IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE! Importing the numpy c-extensions failed.
I had clearly added the NumPy library into my ZIP file:
So what was the problem? Well, apparently, the version of NumPy that I downloaded on both my Macbook and my Windows desktop is not compatible with Amazon Linux.
To resolve this issue, I first attempted to download the package files manually from PyPi.org. I grabbed the latest “manylinux1_x86_x64.whl” file for both NumPy and Pandas. I put them back into my ZIP file and re-uploaded the file. This resulted in the same error.
THE FIX THAT WORKED:
The way to get this to work without failure is to spin up an Amazon Linux EC2 instance. Yes this seems excessive and it is. Not only did I have to spin up a new instance I had to install Python 3.8 because Amazon Linux ships with Python 2.7 by default. But, once installed you can use Pip to install the libraries to a directory by doing:
pip3 install -t . <package name>
This is useful for getting the libraries in the same location to ZIP back up for use. You can remove a lot of the files that are not needed by running:
rm -r *.dist-info __pycache__
After you have done the cleanup, you can ZIP up the files and move them back to your development machine, add your Lambda function and, upload to the Lambda console.
Run a test! It should work as you intended now!
If you need help with this please reach out to me on social media or leave a comment below.
A File Management Architecture
This post is a continuation of my article: “A File Extraction Project”. This project has been a great learning experience for both frontend and backend application architecture and design. Below you will find a diagram and an explanation of all the pieces that make this work.
- The entire architecture is powered by Flask on an EC2 instance. When I move this project to production I intend to put an application load balancer in front to manage traffic. The frontend is also secured by Google Authentication. This provides authentication against the users existing GSuite deployment so that only individuals within the organization can access the application.
- The first Lambda function processes the upload functions. I am allowing for as many files as needed by the customer. The form also includes a single text field for specifying the value of the object tag. The function sends the objects into the first bucket which is object #4.
- The second Lambda function is the search functionality. This function allows the user to provide a tag value. The function queries all objects in bucket #4 and creates a list of objects that match the query. It then moves the objects to bucket #5 where it packages them up and presents them to the user in the form of a ZIP file.
- The first bucket is the storage for all of the objects. This is the bucket where all the objects are uploaded to from the first Lambda function. It is not publicly accessible.
- The second bucket is a temporary storage for files requested by the user. Objects are moved into this bucket from the first bucket. This bucket has a deletion policy that only allows objects to live inside it for 24 hours.
Lambda Function for File Uploading:
def upload(): if request.method == 'POST': tag = request.form['tag'] files = request.files.getlist('file') print(files) for file in files: print(file) if file: filename = secure_filename(file.filename) file.save(filename) s3.upload_file( Bucket = BUCKET_NAME, Filename=filename, Key = filename ) s3.put_object_tagging( Bucket=BUCKET_NAME, Key=filename, Tagging={ 'TagSet': [ { 'Key': 'Tag1', 'Value': tag }, { 'Key': 'Tag2', 'Value': 'Tag-value' }, ] }, ) msg = "Upload Done ! "
The function lives within the Flask application. I have AWS permissions setup on my EC2 instance to allow the “put_object” function. You can assign tags as needed. The first tag references the $tag variable which is provided by the form submission.
For Google Authentication I utilized a project I found on Github here. In the “auth” route that is created I modified it to authenticate against the “hd” parameter passed by the processes. You can see how this works here:
@app.route('/auth') def auth(): token = oauth.google.authorize_access_token() user = oauth.google.parse_id_token(token) session['user'] = user if "hd" not in user: abort(403) elif user['hd'] != 'Your hosted domain': abort(403) else: return redirect('/')
If the “hd” parameter is not passed through the function it will abort with a “403” error.
If you are interested in this project and want more information feel free to reach out and I can provide more code examples or package up the project for you to deploy on your own!
If you found this article helpful please share it with your friends.
A File Extraction Project
I had a client approach me regarding a set of files they had. The files were a set of certificates to support their products. They deliver these files to customers in the sales process.
The workflow currently involves manually packaging the files up into a deliverable format. The client asked me to automate this process across their thousands of documents.
As I started thinking through how this would work, I decided to create a serverless approach utilizing Amazon S3 for document storage and Lambda to do the processing and Amazon S3 and Cloudfront to generate a front end for the application.
My current architecture involves two S3 buckets. One bucket to store the original PDF documents and one to pull in the documents that we are going to package up for the client before sending.
The idea is that we can tag each PDF file with its appropriate lot number supplied by the client. I will then use a simple form submission process to supply input into the function that will collect the required documents.
Here is the code for the web frontend:
<!DOCTYPE html> <html> <head> <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.4/jquery.min.js"></script> <script type="text/javascript"> $(document).ready(function() { $("#submit").click(function(e) { e.preventDefault(); var lot = $("#lot").val(); $.ajax({ type: "POST", url: 'API_URLHERE', contentType: 'application/json', data: JSON.stringify({ 'body': lot, }), success: function(res){ $('#form-response').text('Query Was processed.'); }, error: function(){ $('#form-response').text('Error.'); } }); }) }); </script> </head> <body> <form> <label for="lot">Lot</label> <input id="lot"> <button id="submit">Submit</button> </form> <div id="form-response"></div> </body> </html>
This is a single field input form that sends a string to my Lambda function. Once the string is received we will convert it into a JSON object and then use that to find our objects within Amazon S3.
Here is the function:
import boto3 import json def lambda_handler(event, context): form_response = event['body'] tag_list = json.loads(form_response) print(tag_list) tag_we_want = tag_list['body'] s3 = boto3.client('s3') bucket = "source_bucket" destBucket = "destination_bucket" download_list = [] #get all the objects in a bucket get_objects = s3.list_objects( Bucket= bucket, ) object_list = get_objects['Contents'] object_keys = [] for object in object_list: object_keys.append(object['Key']) object_tags = [] for key in object_keys: object_key = s3.get_object_tagging( Bucket= bucket, Key=key, ) object_tags.append( { 'Key': key, 'tags': object_key['TagSet'][0]['Value'] } ) for tag in object_tags: if tag['tags'] == tag_we_want: object_name = tag['Key'] s3.copy_object( Bucket= destBucket, CopySource= { 'Bucket': bucket, 'Key': object_name, }, Key= object_name, ) download_list.append(object_name) return download_list, tag_we_want
In this code, we define our source and destination buckets first. With the string from the form submission, we first gather all the objects within the bucket and then iterate over each object to find matching tags.
Once we gather the files we want for our customers we then transfer these files to a new bucket. I return the list of files out of the function as well as the tag name.
My next step is to package all the files required into a ZIP file for downloading. I first attempted to do this in Lambda but quickly realized you cannot use Lambda to generate files as the file system is read only.
Right now, I am thinking of utilizing Docker to spawn a worker which will generate the ZIP file, place it back into the bucket and provide a time-sensitive download link to the client.
Stay tuned for more updates on this project.
A Self Hosted Server Health Check
I’m not big on creating dashboards. I find that I don’t look at them enough to warrant hosting the software on an instance and having to have the browser open to the page all the time.
Instead, I prefer to be alerted via Slack as much as possible. I wrote scripts to collect DNS records from Route53. I decided that I should expand on the idea and create a scheduled job that would execute at a time interval. This way my health checks are fully automated.
Before we get into the script, you might ask me why I don’t just use Route53 health checks! The answer is fairly simple. First, the cost of health checks for HTTPS doesn’t make sense for the number of web servers that I am testing. Second, I don’t want to test Route53 or any AWS resource from within AWS. Rather, I would like to use my own network to test as it is not connected to AWS.
You can find the code and the Lambda function hosted on GitHub. The overall program utilizes a few different AWS products:
- Lambda
- SNS
- CloudWatch Logs
It also uses Slack but that is an optional piece that I will explain. The main functions reside in “main.py”. This piece of code follows the process of:
- Iterating over Route53 Records
- Filtering out “A” records and compiling a list of domains
- Testing each domain and processing the response code
- Logging all of the results to CloudWatch Logs
- Sending errors to the SNS topic
I have the script running on a CRON job every hour.
The second piece of this is the Lambda function. The function is all packaged in the “lambda_function.zip” but, I also added the function outside of the ZIP file for editing. You can modify this function to utilize your Slack credentials.
The Lambda function is subscribed to your SNS topic so that whenever a new message appears, that message is sent to your specified Slack channel.
I have plans to test my Terraform skills to automate the deployment of the Lambda function, SNS topic, CloudWatch Logs, and the primary script in some form.
If you have any comments on how I could improve this function please post a comment here or raise an issue on GitHub. If you find this script helpful in anyway feel free to share it with your friends!
Links:
Server Health Check – GitHubCode – Main Function (main.py)
import boto3 import requests import os import time #aws variables sns = boto3.client('sns') aws = boto3.client('route53') cw = boto3.client('logs') paginator = aws.get_paginator('list_resource_record_sets') response = aws.list_hosted_zones() hosted_zones = response['HostedZones'] time_now = int(round(time.time() * 1000)) #create empty lists zone_id_to_test = [] dns_entries = [] zones_with_a_record = [] #Create list of ZoneID's to get record sets from for key in hosted_zones: zoneid = key['Id'] final_zone_id = zoneid[12:] zone_id_to_test.append(final_zone_id) #Create ZoneID List def getARecord(zoneid): for zone in zoneid: try: response = paginator.paginate(HostedZoneId=zone) for record_set in response: dns = record_set['ResourceRecordSets'] dns_entries.append(dns) except Exception as error: print('An Error') print(str(error)) raise #Get Records to test def getCNAME(entry): for dns_entry in entry: for record in dns_entry: if record['Type'] == 'A': url = (record['Name']) final_url = url[:-1] zones_with_a_record.append(f"https://{final_url}") #Send Result to SNS def sendToSNS(messages): message = messages try: send_message = sns.publish( TargetArn='YOUR_SNS_TOPIC_ARN_HERE', Message=message, ) except: print("something didn't work") def tester(urls): for url in urls: try: user_agent = {'User-agent': 'Mozilla/5.0'} status = requests.get(url, headers = user_agent, allow_redirects=True) code = (status.status_code) if code == 401: response = f"The site {url} reports status code: {code}" writeLog(response) elif code == 301: response = f"The site {url} reports status code: {code}" writeLog(response) elif code == 302: response = f"The site {url} reports status code: {code}" writeLog(response) elif code == 403: response = f"The site {url} reports status code: {code}" writeLog(response) elif code !=200: sendToSNS(f"The site {url} reports: {code}") response = f"The site {url} reports status code: {code}" writeLog(response) else: response = f"The site {url} reports status code: {code}" writeLog(response) except: sendToSNS(f"The site {url} failed testing") response = f"The site {url} reports status code: {code}" writeLog(response) def writeLog(message): getToken = cw.describe_log_streams( logGroupName='healthchecks', ) logInfo = (getToken['logStreams']) nextToken = logInfo[0]['uploadSequenceToken'] response = cw.put_log_events( logGroupName='YOUR_LOG_GROUP_NAME', logStreamName='YOUR_LOG_STREAM_NAME', logEvents=[ { 'timestamp': time_now, 'message': message }, ], sequenceToken=nextToken ) #Execute getARecord(zone_id_to_test) getCNAME(dns_entries) tester(zones_with_a_record)
Code: Lambda Function (lambda_function.py)
import logging logging.basicConfig(level=logging.DEBUG) import os from slack import WebClient from slack.errors import SlackApiError slack_token = os.environ["slackBot"] client = WebClient(token=slack_token) def lambda_handler(event, context): detail = event['Records'][0]['Sns']['Message'] response_string = f"{detail}" try: response = client.chat_postMessage( channel="YOUR CHANNEL HERE", text="SERVER DOWN", blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}] ) except SlackApiError as e: assert e.response["error"] return