Author: Aaron VanSledright

  • Streamline Your S3 Management: How to Count Small Files Locally with Python

    I recently came across a need to count objects in an S3 bucket that were of particular size. There isn’t a native way to do this within the S3 console so I decided to script it out using Python and the AWS SDK. You can see all of the code on my GitHub.

    The script is very easy to utilize. The logic is as follows:

    Step 1: Setup

    First, the script initializes a client for the S3 service using Boto3. This setup requires your AWS credentials configured, which the script uses to authenticate requests to your S3 buckets.

    Step 2: Input Parameters

    The script accepts two command-line arguments:

    • The name of the S3 bucket.
    • The maximum file size (in bytes) for which you want to count the files.

    Step 3: List and Count

    Using Boto3’s paginator, the script efficiently handles large datasets by fetching lists of objects in batches. It iterates through each object in the specified bucket:

    • It checks if the object’s size is less than or equal to the size limit you specified.
    • It increments a count for each file that meets the criteria.

    Step 4: Error Handling

    If the script encounters issues accessing the bucket or if an API call fails, it catches the exception and prints an error message. This helps in debugging and ensures you understand why a particular operation failed.

    Step 5: Output

    Finally, the script outputs the total count of files that meet the size criteria. This result can be used directly in reports, further automation scripts, or just for informational purposes.

    Usage:

    python count_small_files.py my-example-bucket 1048576

    Replace my-example-bucket with your bucket name and 1048576 with the maximum file size in bytes (1 MB in this example). This command will tell you how many files in my-example-bucket are 1 MB or smaller.

    This Python script is a practical tool for anyone looking to manage S3 storage more efficiently. By running this script, you can quickly gather insights into your data distribution, helping you make informed decisions about storage management and optimization.

    Stay tuned for more insights on managing cloud services and enhancing your productivity through automation. Don’t forget to subscribe for more updates and tutorials!

  • Securing AWS S3 Objects with Python: Implementing SSE-S3 Encryption

    In the cloud-native world, data security is paramount, and securing Amazon Web Services (AWS) S3 storage is a critical task for any developer. In this article, we dive into a Python script designed to ensure that all your S3 objects are encrypted using Server-Side Encryption with S3-Managed Keys (SSE-S3). This method provides robust security by encrypting S3 objects at the server level using keys managed by S3.

    Understanding the Python Script

    Using the code located at: https://github.com/avansledright/s3-object-re-encryption we have a good framework for re-encrypting our objects.

    The script utilizes the boto3 library, a Python SDK for AWS, enabling developers to integrate their applications with AWS services directly. It includes functions to list objects in an S3 bucket, check their encryption status, and apply SSE-S3 encryption if necessary.

    Key Functions:

    1. Listing Objects: Retrieves all objects within a specified bucket and prefix, managing pagination to handle large datasets.
    2. Checking Encryption: Examines if each object is encrypted with SSE-S3 by accessing its metadata.
    3. Applying Encryption: Updates objects not encrypted with SSE-S3, ensuring all data is securely encrypted using copy_object with the ServerSideEncryption parameter.

    Why Encrypt with SSE-S3?

    Encrypting your S3 objects with SSE-S3 ensures that data is automatically encrypted before being saved to disk and decrypted when accessed. This happens transparently, allowing you to secure your data without modifying your application code.

    Running the Script

    The script is executed via the command line, where users specify the S3 bucket and prefix. It then processes each object, ensuring encryption standards meet organizational and compliance requirements.

    Expanding the Script

    While this script provides a basic framework for S3 encryption, it can be expanded with additional error handling, logging, and perhaps integration into a larger AWS security auditing tool.

    AWS developers looking to enhance their application security will find this script a valuable starting point for implementing standard security practices within their S3 environments. By automating the encryption process, developers can ensure consistency and security across all stored data.

    For those who manage sensitive or regulated data in AWS, applying SSE-S3 encryption programmatically can help meet legal and compliance obligations while providing peace of mind about data security.

    If you find this article helpful please share it with your friends!

  • Creating a local “Lex” Environment

    This came up in a conversation I was having so I decided to take a stab at coding something to help. Let’s say you are building an Amazon Web Services chat bot using Lex and you want to test your Lambda logic locally before you deploy to your AWS account. Currently, as far as I know, there is no way to do this.

    So, I create a simple framework for “simulating” the Lex bot locally. I use quotes around simulating because this does not, in any form or fashion, compare to Lex’s abilities to handle user input. But, if you are providing it with basic inputs that you know would work it could be a good starting point for your local assembly testing.

    Let’s get into the code.

    if __name__ == "__main__":
        session_state = {"currentIntent": None, "slots": {}}
        while True:
            user_input = input("You: ")
            input_response = process_input(user_input, session_state)
            print(f"Bot: {session_state['bot_response']}")
            response = generate_response(session_state)
            print("Bot: ", response)
            print(f"Bot: What else can i help you with?")

    The main code here is a while loop that prompts the user to provide input. Using that input we process the users input against a set of predefined intents.

    def process_input(user_input, session_state):
        # Logic to match user_input to an intent and fill slots
        # Update session_state with the current intent and slots
        for intent, intent_config in intents.items():
            if user_input in intent_config['utterances']:
                print(f"Found {intent} as input")
                session_state["currentIntent"] = intent
                session_state["bot_response"] = intent_config['response']
                return session_state

    This function takes a look at a separate variable that we have defined for intents. You would need to expand this out for your use case but the structure is pretty straightforward.

    intents = {
        "time": {
            "utterances": ['what time is it'],
            "slots": [],
            "response": "Checking to see what time it is"
        }
    }

    Once the utterance matches an intent we update the session state and return it back to the while loop and then use that to generate a response. This is where your logic would begin to get complex as you call your fulfillment lambda to handle returning a valid response to the user.

    You can take a look at the full code and contribute to it on over on my Githbub. If this code helps you or someone on your team feel free to share it across your social media!

  • Building a Generative AI Workflow with AWS Bedrock

    I’ve finally been tasked with a Generative AI project to work on. I’ve done this workflow manually with ChatGPT in the past and it works quite well but, for this project, the requirement was to use Amazon Web Services’ new product “AWS Bedrock”.

    The workflow takes in some code and writes a technical document to support a clear English understanding of what the code is going to accomplish. Using AWS Bedrock, the AI will write the document and output it to an S3 bucket.

    The architecture involves uploading the initial code to an S3 Bucket which will then send the request to an SQS queue and ultimately trigger a Lambda to prompt the AI and fulfill the output upload to a separate S3 bucket. Because this was a proof of concept, the Lambda function was a significant compute resource however going forward I am going to look at placing this code into a Docker container so that it can scale for larger code inputs.

    Here is the architecture diagram:

    Let’s take a look at some of the important code. First is the prompt management. I wrote a function that will take input of the code as well as a parameter of “prompt_type”. This will allow the function to be scalable to accommodate other prompts in the future.

    def return_prompt(code, prompt_type):
        if prompt_type == "testPrompt":
            prompt1 = f"Human: <your prompt>. Assistant:"
            return prompt1

    The important thing to look at here is the format of the message. You have to include the “Human:” and the “Assistant:”. Without this formatting, your API call will error.

    The next bit of code is what we use to prompt the Bedrock AI.

     prompt_to_send = prompts.return_prompt(report_file, "testPrompt")
            body = {
                "prompt": prompt_to_send,
                "max_tokens_to_sample": 300,
                "temperature": 0.1,
                "top_p": 0.9
            }
            accept = 'application/json'
            contentType = 'application/json'
    
    
            # Return Psuedo code
            bedrock_response = h.bedrock_actions.invoke_model(json.dumps(body, indent=2).encode('utf-8'), contentType, accept, modelId=modelid)
        def invoke_model(body, contentType, accept, modelId):
            print(f"Body being sent: {body}")
            try:
                response = bedrock_runtime.invoke_model(
                    body=body,
                    contentType=contentType,
                    accept=accept,
                    modelId=modelId
                )
                return response
            except ClientError as e:
                print("Failed to invoke Bedrock model")
                print(e)
                return False

    The body of our request is what configures Bedrock to run and create a response. These values can be tweaked as follows:

    max_tokens_to_sample: This specifies the number of tokens to sample in your request. Amazon recommends setting this to 4000
    TopP: Use a lower value to ignore less probable options.
    Top K: Specify the number of token choices the model uses to generate the next token.
    Temperature: Use a lower value to decrease randomness in the response.

    You can read more about the inputs here.

    If you want to see more of this code take a look at my GitHub repository below. Feel free to use it wherever you want. If you have any questions be sure to reach out to me!

    GitHub: https://github.com/avansledright/bedrock-poc-public

  • Convert Spotify Links to Youtube Links

    In a continuation of my Discord Bot feature deployment, I found a need to convert Spotify links to YouTube links. I use Youtube music for my music streaming needs and the rest of the Discord uses Spotify.

    With the help of ChatGPT, I created a script that converts Spotify links to Youtube links! This utilizes both the Spotify API and Youtube APIs to grab track information and format search queries to return a relevant Youtube link.

    The code consists of two primary functions which I have shared below. One to get the artist and track names and another to query YouTube. Combined, we can return a YouTube link to a multitude of applications.

    def get_spotify_track_info(spotify_url):
        track_id = sp.track(spotify_url)['id']
        track_info = sp.track(track_id)
        return {
            'name': track_info['name'],
            'artists': [artist['name'] for artist in track_info['artists']]
        }
    
    def search_youtube_video(track_info):
        search_query = f"{track_info['name']} {track_info['artists'][0]} official video"
        request = youtube.search().list(q=search_query, part='snippet', type='video', maxResults=1)
        response = request.execute()
        video_id = response['items'][0]['id']['videoId']
        return f"https://www.youtube.com/watch?v={video_id}"
    

    I took this code and incorporated it into my Discord bot so that anytime a user posts a Spotify link it will automatically convert it to a Youtube link. Here is an example:

    If you want to utilize this code check out the Github link below. As always, if you found this article helpful please share it across your social media.

    Github – https://github.com/avansledright/spotify-to-youtube

  • Automated Lambda Testing

    Look, I know there are a bunch of test frameworks that you could use for your Lambda functions. But what if you wanted something simple? I spent an afternoon putting together what I would want in a testing pipeline that returns a simple “Success/Fail” type response to me via Email.

    An architecture diagram for your eyes:

    The idea is to create a JSON object with a key and value pair of the name of the Lambda function and the test event to pass to the lambda. Once the file is uploaded to the S3 bucket the pipeline can be triggered where a Codebuild job will iterate through the Lambdas and their events. The Lambdas will be tested with the event and return whether or not they are successful. The results are then sent to an SNS topic to be distributed to the developers.

    Going forward, I hope to automate adding new Lambda functions to the JSON file so that testing can also be scheduled.

    I spent time packaging this solution up with all the appropriate Terraform files and code. If you are interested in this solution feel free to reach out and I can deliver the packaged application to you!

    Sample Code: GitHub

  • SES Monitoring

    I love AWS. But one thing they don’t do is build complete tools. SES is one of them. I recently started getting emails about high usage for one of the identities that I have set up for SES. I would assume that there was a way to track usage within CloudWatch but for the life of me I couldn’t find one. So I guess that means I need to build something.

    The idea here is pretty simple, within SES identities you can set up a notification. So, I created an SNS topic and subscribed all delivery notifications to the topic. Then, subscribe a Lambda function to the topic. The lambda function acts as the processor for the records then formats them in a usable way and puts them into DynamoDB. I used the identity as the primary key. The result is a simple application architecture like the below image.

    Every time an email is delivered the lambda function processes the event and checks the DynamoDB table to see if we have an existing record. If the identity is already present in the table it returns the “count” value so that we can increment the value. The “destination” value appends the destination of the email being sent. Below is a sample of the code I used to put the object into the DynamoDB Table.

    def put_dynamo_object(dynamo_object):
        count = str(dynamo_get_item(dynamo_object))
        if count == None or count == 0:
            count = str(1)
        else:
            count = int(count) + 1
        # get email address from the long string
        source_string = dynamo_object['source']
        email_match = match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', source_string)
        email = match.group(0)
        try:
            table.update_item(
                Key={
                        'identity': email
                        
                    },
                AttributeUpdates={
                    'details': {
                        'Value': {
                                'caller_identity': dynamo_object['caller_identity'],
                                'source': dynamo_object['source'],
                                'destination': dynamo_object['destination'],
                                'count': str(count)
                        }
                        
                    }
                }
            )
            return True
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False

    If you want to use this code feel free to reach out to me and I will share with you the Terraform to deploy the application and as always, reach out with questions or feedback!

  • Building a Discord Bot with Python and AWS

    I’m a member of a lot of Discord servers. The one I participate in most is one with my brothers and our friends. In this server, we joke around a lot about people posting off-topic messages in the various text channels and we give them fake “warnings”. I decided to take this a step further and create a bot where we could track the warnings and then present them in a leaderboard.

    The Discord bot API documentation is great and allowed me to quickly get a proof of concept up and running. I then relied on my Python, Terraform, and AWS skills to get the bot up and running quickly. Below is a simple architecture diagram that I started and will most likely be adding to as the members of the server request more features.

    We have three current commands, !warning, !feature, !leaderboard. The !warning command takes input of a tagged user. It then uses the Boto3 library for Python and adds the attribute to the user in the table. Here is the code:

    # Adds an attribute to a user
        def add_warning_to_user(username, attribute):
            client = boto3.resource("dynamodb", region_name="us-west-2",
                                    aws_access_key_id=os.getenv('AWS_KEY'),
                                    aws_secret_access_key=os.getenv('AWS_SECRET'))
            table = client.Table(table_name)
            print("adding", attribute, "to", str(username))
    
            try:
                response = table.update_item(
                    Key={'username': str(username)},
                    AttributeUpdates={attribute: {
                        'Value': str(dynamodb.get_warning_count_of_user(username, attribute) + 1)
                        }
                    }
                )
                print(response)
            except ClientError as e:
                print("Failed to update count")
                print(e)
                return False
            return True
    
    

    I have another function within this code that will call out to the DynamoDB table and gets the user’s current value so that we can increment the count.

    The !leaderboard command takes input of an “attribute” I built it this way so that we can have future attributes added to users without having to rebuild everything from scratch. To get the data I used the DynamoDB scan function to retrieve all of the data for all the users and then filter within the Python application on just the attribute that we are requesting the leaderboard for. I then have a function that formats the leaderboard into something that the bot can publish back to the server.

        def create_table(data, attribute):
            if attribute == "warning_count":
                attribute = "Warnings"
            table = ""
            rows = []
            rows.append("``` ")
            rows.append(f"{attribute}: Leaderboard")
            for key, value in data.items():
                rows.append(f"{key}: {str(value)}")
            rows.append("``` ")
            for row in rows:
                table += " " + row + "\n "
            return table

    This code I want to revisit to make the formatting cleaner as the list gets longer. But for now it works as intended.

    The last function I created so that the users could submit feature requests. The code is very simple and the command !feature takes the input of all text following the command and passes it to an SNS function I wrote which sends an email to me containing the user’s feature request. I have hopes that I can transition this to create some sort of Jira task or other workflow. Below is the bot’s code to handle this interaction:

    @client.command(name="feature", help="sends a feature request")
    async def send_feature_request(ctx, *, args):
        print("THIS IS THE FEATURE REQUEST", args)
        if sns.send_message(args) == True:
            await ctx.send("Your request has been sent")
        else:
            await ctx.send("Failed to send your request. Plz try again later.")
    
    

    Right now the bot is running inside a Docker container within my homelab. I need to create better logging and implement some sort of logging server so that I can better handle errors as well as monitoring in case of any outages.

    If you have questions about building Discord bots or AWS and its various components feel free to reach out to me at any time. This was a great project that I worked on over a few days and it was great to see it come together quickly!

  • Deleting MANY Lambda Function Versions

    I recently came across a challenge where I wanted to purge old Lambda Function versions. Some of the functions had over 65 versions!

    The code below will iterate through a text file with a Lambda function defined on each line. It will get a list of all the versions and delete any version that is not the highest version or any versions that are attached to an alias.

    import boto3
    from botocore.exceptions import ClientError
    
    client = boto3.client("lambda", region_name='us-west-2')
    def delete_function_version(function_name, version):
        
        try:
           response = client.delete_function(
               FunctionName=function_name,
               Qualifier=str(version)
           ) 
        except ClientError as e:
            print("Failed to delete version of", function_name, "version number", str(version))
            print(e)
    def get_layer_versions(function_name):
        try:
            response = client.list_versions_by_function(
                FunctionName=function_name
            )
            return response['Versions']
        except ClientError as e:
            print('failed to get info for', function_name)
            print(e)
    
    if __name__ == "__main__":
        print("Starting lambda update")
        with open("lambda_list.txt", "r") as text:
            lambda_list = text.read().splitlines()
            text.close()
        version_info_results = {}
        for lambda_function in lambda_list:
            print("Working with lambda", lambda_function)
            lambda_versions = get_layer_versions(lambda_function)
            lambda_version_list = []
            for version in lambda_versions:
                version_number = version['Version']
                if version_number == "$LATEST":
                    pass
                else:
                    lambda_version_list.append(int(version_number))
            for lambda_version in lambda_version_list:
                if lambda_version == max(lambda_version_list):
                    print("This is the latest version skipping", str(lambda_version))
                    pass
                else:
                    print("Deleting version", lambda_version, "of", lambda_function)
                    delete_function_version(lambda_function, lambda_version)
             

    To use this function you first need to populate the text file with any Lambda function(s) that you want to evaluate and then execute the Python script.

    I hope that this helps you or a coworker!

    EDIT: Thanks to Alex Iliev for testing and finding some bugs in this code!

    Links:
    Github
    Twitter

  • Moving AWS Cloudfront Logs to DynamoDB

    I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.

    I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:

    import boto3
    import gzip
    import uuid
    from datetime import datetime
    from datetime import timedelta
    import time
    from botocore.exceptions import ClientError
    
    #Creates a time to live value
    def ttl_time():
        now = datetime.now()
        ttl_date = now + timedelta(90)
        final = str(time.mktime(ttl_date.timetuple()))
        return final
    
    #Puts the log json into dynamodb:
    def put_to_dynamo(record):
        client = boto3.resource('dynamodb', region_name='us-west-2')
        table = client.Table('YOUR_TABLE_NAME')
        try:
            response = table.put_item(
                Item=record
            )
            print(response)
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False
    
        return True
    def lambda_handler(event, context):
        print(event)
        s3_key = event['Records'][0]['s3']['object']['key']
        s3 = boto3.resource("s3")
        obj = s3.Object("YOUR_BUCKET", s3_key)
        with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
            content = gzipfile.read()
        #print(content)
        my_json = content.decode('utf8').splitlines()
    
        my_dict = {}
        for x in my_json:
            if x.startswith("#Fields:"):
                keys = x.split(" ")
            else:
                values = x.split("\t")
    
        for key in keys:
            if key == "#Fields:":
                pass
            else:
                for value in values:
                    my_dict[key] = value
        x = 0
        for item in keys:
            if item == "#Fields:":
                pass
            else:
                my_dict[item] = values[x]
                x +=1
    
    
        print('- ' * 20)
        myuuid = str(uuid.uuid4())
        print(myuuid)
        my_dict["uuid"] = myuuid
        my_dict['ttl'] = ttl_time()
    
        print(my_dict)
        if put_to_dynamo(my_dict) == True:
            print("Successfully imported item")
            return True
        else:
            print("Failed to put record")
            return False

    This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.

    I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!

    If this code helps you please share it with your friends and co-workers!

    Code on Github