Your cart is currently empty!
Category: Cloud Architecting
Building out a reusable Terraform framework for Flask Applications
I find myself utilizing the same architecture for deploying demo applications on the great Python library Flask. I’ve been using the same Terraform files over and over again to build out the infrastructure.
Last weekend I decided it was time to build a reusable framework for deploying these applications. So, I began building out the repository. The purpose of this repository is to give myself a jumping off point to quickly deploy applications for demonstrations or live environments.
Let’s take a look at the features:
- Customizable Environments within Terraform for managing the infrastructure across your development and production environments
- Modules for:
- Application Load Balancer
- Elastic Container registry
- Elastic Container Service
- VPC & Networking components
- Dockerfile and Docker Compose file for launching and building the application
- Demo code for the Flask application
- Automated build and deploy for the container upon code changes
This module is built for any developer who wants to get started quickly and deploy applications fast. Using this framework will allow you to speed up your development time by being able to focus solely on the application rather than the infrastructure.
Upcoming features:
- CI/CD features using either GitHub Actions or Amazon Web Services like CodePipeline and Codebuild
- Custom Domain Name support for your application
If there are other features you would like to see me add shoot me a message anytime!
Check out the repository here:
https://github.com/avansledright/terraform-flask-moduleCreate an Image Labeling Application using Artificial Intelligence
I have a PowerPoint party to go to soon. Yes you read that right. At this party everyone is required to present a short presentation about any topic they want. Last year I made a really cute presentation about a day in the life of my dog.
This year I have decided that I want to bore everyone to death and talk about technology, Python, Terraform and Artificial Intelligence. Specifically, I built an application that allows a user to upload an image and have it return to them a renamed file that is labeled based on the object or scene in the image.The architecture is fairly simple. We have a user connecting to a load balancer which routes traffic to our containers. The containers connect Bedrock and S3 for image.
If you want to try it out the site is hosted at https://image-labeler.vansledright.com It will be up for some time, I haven’t decided how long I will host it for but at least through this weekend!
Here is the code that interacts with Bedrock and S3 to process the image:
def process_image(): if not request.is_json: return jsonify({'error': 'Content-Type must be application/json'}), 400 data = request.json file_key = data.get('fileKey') if not file_key: return jsonify({'error': 'fileKey is required'}), 400 try: # Get the image from S3 response = s3.get_object(Bucket=app.config['S3_BUCKET_NAME'], Key=file_key) image_data = response['Body'].read() # Check if image is larger than 5MB if len(image_data) > 5 * 1024 * 1024: logger.info("File size to large. Compressing image") image_data = compress_image(image_data) # Convert image to base64 base64_image = base64.b64encode(image_data).decode('utf-8') # Prepare prompt for Claude prompt = """Please analyze the image and identify the main object or subject. Respond with just the object name in lowercase, hyphenated format. For example: 'coca-cola-can' or 'golden-retriever'.""" # Call Bedrock with Claude response = bedrock.invoke_model( modelId='anthropic.claude-3-sonnet-20240229-v1:0', body=json.dumps({ "anthropic_version": "bedrock-2023-05-31", "max_tokens": 100, "messages": [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image", "source": { "type": "base64", "media_type": response['ContentType'], "data": base64_image } } ] } ] }) ) response_body = json.loads(response['body'].read()) object_name = response_body['content'][0]['text'].strip() logging.info(f"Object found is: {object_name}") if not object_name: return jsonify({'error': 'Could not identify object in image'}), 422 # Get file extension and create new filename _, ext = os.path.splitext(unquote(file_key)) new_file_name = f"{object_name}{ext}" new_file_key = f'processed/{new_file_name}' # Copy object to new location s3.copy_object( Bucket=app.config['S3_BUCKET_NAME'], CopySource={'Bucket': app.config['S3_BUCKET_NAME'], 'Key': file_key}, Key=new_file_key ) # Generate download URL download_url = s3.generate_presigned_url( 'get_object', Params={ 'Bucket': app.config['S3_BUCKET_NAME'], 'Key': new_file_key }, ExpiresIn=3600 ) return jsonify({ 'downloadUrl': download_url, 'newFileName': new_file_name }) except json.JSONDecodeError as e: logger.error(f"Error decoding Bedrock response: {str(e)}") return jsonify({'error': 'Invalid response from AI service'}), 500 except Exception as e: logger.error(f"Error processing image: {str(e)}") return jsonify({'error': 'Error processing image'}), 500
If you think this project is interesting, feel free to share it with your friends or message me if you want all of the code!
Converting DrawIO Diagrams to Terraform
I’m going to start this post of by saying that I need testers. People to test this process from an interface perspective as well as a data perspective. I’m limited on the amount of test data that I have to put through the process.
With that said, I spent my Thanksgiving Holiday writing code, building this project and putting in way more time that I thought I would but boy is it cool.
If you’re like me and working in a Cloud Engineering capacity then you probably have built a DrawIO diagram at some point in your life to describe or define your AWS architecture. Then you have spent countless hours using that diagram to write your Terraform. I’ve built something that will save you those hours and get you started on your cloud journey.
Enter https://drawiototerraform.com. My new tool that allows you to convert your DrawIO AWS Architecture diagrams to Terraform just by uploading them. The process uses a combination of Python and LLM’s to identify the components in your diagram and their relationships, write the base Terraform, analyze the initial Terraform for syntax errors and ultimately test the Terraform by generating a Terraform plan.
All this is then delivered to you as a ZIP file for you to review, modify and ultimately deploy to your environment. By no means is it perfect yet and that is why I am looking for people to test the platform.
If you, or someone you know, is interested in helping me test have them reach out to me on through the website’s support page and I will get them some free credits so that they can test out the platform with their own diagrams.
If you are interested in learning more about the project in any capacity do not hesitate to reach out to me at anytime.
Website: https://drawiototerraform.com
API For Pre-signed URLs
Pre-signed URL’s are used for downloading objects from AWS S3 buckets. I’ve used them many times in the past for various reasons but this idea was a new one. A proof of concept for an API that would create the pre-signed URL and return it to the user.
This solution utilizes an API Gateway and an AWS Lambda function. The API Gateway takes two parameters “key” and “expiration”. Ultimately, you could add another parameter for “bucket” if you wanted the gateway to be able to get objects from multiple buckets.
I used Terraform to create the infrastructure and Python to program the Lambda.
Take a look at the Lambda code below:
import boto3 import json import os from botocore.exceptions import ClientError def lambda_handler(event, context): # Get the query parameters query_params = event.get('queryStringParameters', {}) if not query_params or 'key' not in query_params: return { 'statusCode': 400, 'body': json.dumps({'error': 'Missing required parameter: key'}) } object_key = query_params['key'] expiration = int(query_params.get('expiration', 3600)) # Default 1 hour # Initialize S3 client s3_client = boto3.client('s3') bucket_name = os.environ['BUCKET_NAME'] try: # Generate presigned URL url = s3_client.generate_presigned_url( 'get_object', Params={ 'Bucket': bucket_name, 'Key': object_key }, ExpiresIn=expiration ) return { 'statusCode': 200, 'headers': { 'Access-Control-Allow-Origin': '*', 'Content-Type': 'application/json' }, 'body': json.dumps({ 'url': url, 'expires_in': expiration }) } except ClientError as e: return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) }
The Terraform will also output a Postman collection JSON file so that you can immediately import it for testing. If this code and pattern is useful for you check it out on my GitHub below.
Securing AWS S3 Objects with Python: Implementing SSE-S3 Encryption
In the cloud-native world, data security is paramount, and securing Amazon Web Services (AWS) S3 storage is a critical task for any developer. In this article, we dive into a Python script designed to ensure that all your S3 objects are encrypted using Server-Side Encryption with S3-Managed Keys (SSE-S3). This method provides robust security by encrypting S3 objects at the server level using keys managed by S3.
Understanding the Python Script
Using the code located at: https://github.com/avansledright/s3-object-re-encryption we have a good framework for re-encrypting our objects.
The script utilizes the
boto3
library, a Python SDK for AWS, enabling developers to integrate their applications with AWS services directly. It includes functions to list objects in an S3 bucket, check their encryption status, and apply SSE-S3 encryption if necessary.Key Functions:
- Listing Objects: Retrieves all objects within a specified bucket and prefix, managing pagination to handle large datasets.
- Checking Encryption: Examines if each object is encrypted with SSE-S3 by accessing its metadata.
- Applying Encryption: Updates objects not encrypted with SSE-S3, ensuring all data is securely encrypted using
copy_object
with theServerSideEncryption
parameter.
Why Encrypt with SSE-S3?
Encrypting your S3 objects with SSE-S3 ensures that data is automatically encrypted before being saved to disk and decrypted when accessed. This happens transparently, allowing you to secure your data without modifying your application code.
Running the Script
The script is executed via the command line, where users specify the S3 bucket and prefix. It then processes each object, ensuring encryption standards meet organizational and compliance requirements.
Expanding the Script
While this script provides a basic framework for S3 encryption, it can be expanded with additional error handling, logging, and perhaps integration into a larger AWS security auditing tool.
AWS developers looking to enhance their application security will find this script a valuable starting point for implementing standard security practices within their S3 environments. By automating the encryption process, developers can ensure consistency and security across all stored data.
For those who manage sensitive or regulated data in AWS, applying SSE-S3 encryption programmatically can help meet legal and compliance obligations while providing peace of mind about data security.
If you find this article helpful please share it with your friends!
Building a Generative AI Workflow with AWS Bedrock
I’ve finally been tasked with a Generative AI project to work on. I’ve done this workflow manually with ChatGPT in the past and it works quite well but, for this project, the requirement was to use Amazon Web Services’ new product “AWS Bedrock”.
The workflow takes in some code and writes a technical document to support a clear English understanding of what the code is going to accomplish. Using AWS Bedrock, the AI will write the document and output it to an S3 bucket.
The architecture involves uploading the initial code to an S3 Bucket which will then send the request to an SQS queue and ultimately trigger a Lambda to prompt the AI and fulfill the output upload to a separate S3 bucket. Because this was a proof of concept, the Lambda function was a significant compute resource however going forward I am going to look at placing this code into a Docker container so that it can scale for larger code inputs.
Here is the architecture diagram:
Let’s take a look at some of the important code. First is the prompt management. I wrote a function that will take input of the code as well as a parameter of “prompt_type”. This will allow the function to be scalable to accommodate other prompts in the future.
def return_prompt(code, prompt_type): if prompt_type == "testPrompt": prompt1 = f"Human: <your prompt>. Assistant:" return prompt1
The important thing to look at here is the format of the message. You have to include the “Human:” and the “Assistant:”. Without this formatting, your API call will error.
The next bit of code is what we use to prompt the Bedrock AI.
prompt_to_send = prompts.return_prompt(report_file, "testPrompt") body = { "prompt": prompt_to_send, "max_tokens_to_sample": 300, "temperature": 0.1, "top_p": 0.9 } accept = 'application/json' contentType = 'application/json' # Return Psuedo code bedrock_response = h.bedrock_actions.invoke_model(json.dumps(body, indent=2).encode('utf-8'), contentType, accept, modelId=modelid)
def invoke_model(body, contentType, accept, modelId): print(f"Body being sent: {body}") try: response = bedrock_runtime.invoke_model( body=body, contentType=contentType, accept=accept, modelId=modelId ) return response except ClientError as e: print("Failed to invoke Bedrock model") print(e) return False
The body of our request is what configures Bedrock to run and create a response. These values can be tweaked as follows:
max_tokens_to_sample: This specifies the number of tokens to sample in your request. Amazon recommends setting this to 4000
TopP: Use a lower value to ignore less probable options.
Top K: Specify the number of token choices the model uses to generate the next token.
Temperature: Use a lower value to decrease randomness in the response.
You can read more about the inputs here.If you want to see more of this code take a look at my GitHub repository below. Feel free to use it wherever you want. If you have any questions be sure to reach out to me!
Automated Lambda Testing
Look, I know there are a bunch of test frameworks that you could use for your Lambda functions. But what if you wanted something simple? I spent an afternoon putting together what I would want in a testing pipeline that returns a simple “Success/Fail” type response to me via Email.
An architecture diagram for your eyes:
The idea is to create a JSON object with a key and value pair of the name of the Lambda function and the test event to pass to the lambda. Once the file is uploaded to the S3 bucket the pipeline can be triggered where a Codebuild job will iterate through the Lambdas and their events. The Lambdas will be tested with the event and return whether or not they are successful. The results are then sent to an SNS topic to be distributed to the developers.
Going forward, I hope to automate adding new Lambda functions to the JSON file so that testing can also be scheduled.
I spent time packaging this solution up with all the appropriate Terraform files and code. If you are interested in this solution feel free to reach out and I can deliver the packaged application to you!
Sample Code: GitHub
SES Monitoring
I love AWS. But one thing they don’t do is build complete tools. SES is one of them. I recently started getting emails about high usage for one of the identities that I have set up for SES. I would assume that there was a way to track usage within CloudWatch but for the life of me I couldn’t find one. So I guess that means I need to build something.
The idea here is pretty simple, within SES identities you can set up a notification. So, I created an SNS topic and subscribed all delivery notifications to the topic. Then, subscribe a Lambda function to the topic. The lambda function acts as the processor for the records then formats them in a usable way and puts them into DynamoDB. I used the identity as the primary key. The result is a simple application architecture like the below image.
Every time an email is delivered the lambda function processes the event and checks the DynamoDB table to see if we have an existing record. If the identity is already present in the table it returns the “count” value so that we can increment the value. The “destination” value appends the destination of the email being sent. Below is a sample of the code I used to put the object into the DynamoDB Table.
def put_dynamo_object(dynamo_object): count = str(dynamo_get_item(dynamo_object)) if count == None or count == 0: count = str(1) else: count = int(count) + 1 # get email address from the long string source_string = dynamo_object['source'] email_match = match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', source_string) email = match.group(0) try: table.update_item( Key={ 'identity': email }, AttributeUpdates={ 'details': { 'Value': { 'caller_identity': dynamo_object['caller_identity'], 'source': dynamo_object['source'], 'destination': dynamo_object['destination'], 'count': str(count) } } } ) return True except ClientError as e: print("Failed to put record") print(e) return False
If you want to use this code feel free to reach out to me and I will share with you the Terraform to deploy the application and as always, reach out with questions or feedback!
Building a Discord Bot with Python and AWS
I’m a member of a lot of Discord servers. The one I participate in most is one with my brothers and our friends. In this server, we joke around a lot about people posting off-topic messages in the various text channels and we give them fake “warnings”. I decided to take this a step further and create a bot where we could track the warnings and then present them in a leaderboard.
The Discord bot API documentation is great and allowed me to quickly get a proof of concept up and running. I then relied on my Python, Terraform, and AWS skills to get the bot up and running quickly. Below is a simple architecture diagram that I started and will most likely be adding to as the members of the server request more features.
We have three current commands, !warning, !feature, !leaderboard. The !warning command takes input of a tagged user. It then uses the Boto3 library for Python and adds the attribute to the user in the table. Here is the code:
# Adds an attribute to a user def add_warning_to_user(username, attribute): client = boto3.resource("dynamodb", region_name="us-west-2", aws_access_key_id=os.getenv('AWS_KEY'), aws_secret_access_key=os.getenv('AWS_SECRET')) table = client.Table(table_name) print("adding", attribute, "to", str(username)) try: response = table.update_item( Key={'username': str(username)}, AttributeUpdates={attribute: { 'Value': str(dynamodb.get_warning_count_of_user(username, attribute) + 1) } } ) print(response) except ClientError as e: print("Failed to update count") print(e) return False return True
I have another function within this code that will call out to the DynamoDB table and gets the user’s current value so that we can increment the count.
The !leaderboard command takes input of an “attribute” I built it this way so that we can have future attributes added to users without having to rebuild everything from scratch. To get the data I used the DynamoDB scan function to retrieve all of the data for all the users and then filter within the Python application on just the attribute that we are requesting the leaderboard for. I then have a function that formats the leaderboard into something that the bot can publish back to the server.
def create_table(data, attribute): if attribute == "warning_count": attribute = "Warnings" table = "" rows = [] rows.append("``` ") rows.append(f"{attribute}: Leaderboard") for key, value in data.items(): rows.append(f"{key}: {str(value)}") rows.append("``` ") for row in rows: table += " " + row + "\n " return table
This code I want to revisit to make the formatting cleaner as the list gets longer. But for now it works as intended.
The last function I created so that the users could submit feature requests. The code is very simple and the command !feature takes the input of all text following the command and passes it to an SNS function I wrote which sends an email to me containing the user’s feature request. I have hopes that I can transition this to create some sort of Jira task or other workflow. Below is the bot’s code to handle this interaction:
@client.command(name="feature", help="sends a feature request") async def send_feature_request(ctx, *, args): print("THIS IS THE FEATURE REQUEST", args) if sns.send_message(args) == True: await ctx.send("Your request has been sent") else: await ctx.send("Failed to send your request. Plz try again later.")
Right now the bot is running inside a Docker container within my homelab. I need to create better logging and implement some sort of logging server so that I can better handle errors as well as monitoring in case of any outages.
If you have questions about building Discord bots or AWS and its various components feel free to reach out to me at any time. This was a great project that I worked on over a few days and it was great to see it come together quickly!
Moving AWS Cloudfront Logs to DynamoDB
I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.
I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:
import boto3 import gzip import uuid from datetime import datetime from datetime import timedelta import time from botocore.exceptions import ClientError #Creates a time to live value def ttl_time(): now = datetime.now() ttl_date = now + timedelta(90) final = str(time.mktime(ttl_date.timetuple())) return final #Puts the log json into dynamodb: def put_to_dynamo(record): client = boto3.resource('dynamodb', region_name='us-west-2') table = client.Table('YOUR_TABLE_NAME') try: response = table.put_item( Item=record ) print(response) except ClientError as e: print("Failed to put record") print(e) return False return True def lambda_handler(event, context): print(event) s3_key = event['Records'][0]['s3']['object']['key'] s3 = boto3.resource("s3") obj = s3.Object("YOUR_BUCKET", s3_key) with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile: content = gzipfile.read() #print(content) my_json = content.decode('utf8').splitlines() my_dict = {} for x in my_json: if x.startswith("#Fields:"): keys = x.split(" ") else: values = x.split("\t") for key in keys: if key == "#Fields:": pass else: for value in values: my_dict[key] = value x = 0 for item in keys: if item == "#Fields:": pass else: my_dict[item] = values[x] x +=1 print('- ' * 20) myuuid = str(uuid.uuid4()) print(myuuid) my_dict["uuid"] = myuuid my_dict['ttl'] = ttl_time() print(my_dict) if put_to_dynamo(my_dict) == True: print("Successfully imported item") return True else: print("Failed to put record") return False
This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.
I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!
If this code helps you please share it with your friends and co-workers!