Aaron VanSledright

Category: Technology

  • Convert Spotify Links to Youtube Links

    In a continuation of my Discord Bot feature deployment, I found a need to convert Spotify links to YouTube links. I use Youtube music for my music streaming needs and the rest of the Discord uses Spotify.

    With the help of ChatGPT, I created a script that converts Spotify links to Youtube links! This utilizes both the Spotify API and Youtube APIs to grab track information and format search queries to return a relevant Youtube link.

    The code consists of two primary functions which I have shared below. One to get the artist and track names and another to query YouTube. Combined, we can return a YouTube link to a multitude of applications.

    def get_spotify_track_info(spotify_url):
        track_id = sp.track(spotify_url)['id']
        track_info = sp.track(track_id)
        return {
            'name': track_info['name'],
            'artists': [artist['name'] for artist in track_info['artists']]
        }
    
    def search_youtube_video(track_info):
        search_query = f"{track_info['name']} {track_info['artists'][0]} official video"
        request = youtube.search().list(q=search_query, part='snippet', type='video', maxResults=1)
        response = request.execute()
        video_id = response['items'][0]['id']['videoId']
        return f"https://www.youtube.com/watch?v={video_id}"
    

    I took this code and incorporated it into my Discord bot so that anytime a user posts a Spotify link it will automatically convert it to a Youtube link. Here is an example:

    If you want to utilize this code check out the Github link below. As always, if you found this article helpful please share it across your social media.

    Github – https://github.com/avansledright/spotify-to-youtube

  • SES Monitoring

    I love AWS. But one thing they don’t do is build complete tools. SES is one of them. I recently started getting emails about high usage for one of the identities that I have set up for SES. I would assume that there was a way to track usage within CloudWatch but for the life of me I couldn’t find one. So I guess that means I need to build something.

    The idea here is pretty simple, within SES identities you can set up a notification. So, I created an SNS topic and subscribed all delivery notifications to the topic. Then, subscribe a Lambda function to the topic. The lambda function acts as the processor for the records then formats them in a usable way and puts them into DynamoDB. I used the identity as the primary key. The result is a simple application architecture like the below image.

    Every time an email is delivered the lambda function processes the event and checks the DynamoDB table to see if we have an existing record. If the identity is already present in the table it returns the “count” value so that we can increment the value. The “destination” value appends the destination of the email being sent. Below is a sample of the code I used to put the object into the DynamoDB Table.

    def put_dynamo_object(dynamo_object):
        count = str(dynamo_get_item(dynamo_object))
        if count == None or count == 0:
            count = str(1)
        else:
            count = int(count) + 1
        # get email address from the long string
        source_string = dynamo_object['source']
        email_match = match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', source_string)
        email = match.group(0)
        try:
            table.update_item(
                Key={
                        'identity': email
                        
                    },
                AttributeUpdates={
                    'details': {
                        'Value': {
                                'caller_identity': dynamo_object['caller_identity'],
                                'source': dynamo_object['source'],
                                'destination': dynamo_object['destination'],
                                'count': str(count)
                        }
                        
                    }
                }
            )
            return True
        except ClientError as e:
            print("Failed to put record")
            print(e)
            return False

    If you want to use this code feel free to reach out to me and I will share with you the Terraform to deploy the application and as always, reach out with questions or feedback!

  • Building a Discord Bot with Python and AWS

    I’m a member of a lot of Discord servers. The one I participate in most is one with my brothers and our friends. In this server, we joke around a lot about people posting off-topic messages in the various text channels and we give them fake “warnings”. I decided to take this a step further and create a bot where we could track the warnings and then present them in a leaderboard.

    The Discord bot API documentation is great and allowed me to quickly get a proof of concept up and running. I then relied on my Python, Terraform, and AWS skills to get the bot up and running quickly. Below is a simple architecture diagram that I started and will most likely be adding to as the members of the server request more features.

    We have three current commands, !warning, !feature, !leaderboard. The !warning command takes input of a tagged user. It then uses the Boto3 library for Python and adds the attribute to the user in the table. Here is the code:

    # Adds an attribute to a user
        def add_warning_to_user(username, attribute):
            client = boto3.resource("dynamodb", region_name="us-west-2",
                                    aws_access_key_id=os.getenv('AWS_KEY'),
                                    aws_secret_access_key=os.getenv('AWS_SECRET'))
            table = client.Table(table_name)
            print("adding", attribute, "to", str(username))
    
            try:
                response = table.update_item(
                    Key={'username': str(username)},
                    AttributeUpdates={attribute: {
                        'Value': str(dynamodb.get_warning_count_of_user(username, attribute) + 1)
                        }
                    }
                )
                print(response)
            except ClientError as e:
                print("Failed to update count")
                print(e)
                return False
            return True
    
    

    I have another function within this code that will call out to the DynamoDB table and gets the user’s current value so that we can increment the count.

    The !leaderboard command takes input of an “attribute” I built it this way so that we can have future attributes added to users without having to rebuild everything from scratch. To get the data I used the DynamoDB scan function to retrieve all of the data for all the users and then filter within the Python application on just the attribute that we are requesting the leaderboard for. I then have a function that formats the leaderboard into something that the bot can publish back to the server.

        def create_table(data, attribute):
            if attribute == "warning_count":
                attribute = "Warnings"
            table = ""
            rows = []
            rows.append("``` ")
            rows.append(f"{attribute}: Leaderboard")
            for key, value in data.items():
                rows.append(f"{key}: {str(value)}")
            rows.append("``` ")
            for row in rows:
                table += " " + row + "\n "
            return table

    This code I want to revisit to make the formatting cleaner as the list gets longer. But for now it works as intended.

    The last function I created so that the users could submit feature requests. The code is very simple and the command !feature takes the input of all text following the command and passes it to an SNS function I wrote which sends an email to me containing the user’s feature request. I have hopes that I can transition this to create some sort of Jira task or other workflow. Below is the bot’s code to handle this interaction:

    @client.command(name="feature", help="sends a feature request")
    async def send_feature_request(ctx, *, args):
        print("THIS IS THE FEATURE REQUEST", args)
        if sns.send_message(args) == True:
            await ctx.send("Your request has been sent")
        else:
            await ctx.send("Failed to send your request. Plz try again later.")
    
    

    Right now the bot is running inside a Docker container within my homelab. I need to create better logging and implement some sort of logging server so that I can better handle errors as well as monitoring in case of any outages.

    If you have questions about building Discord bots or AWS and its various components feel free to reach out to me at any time. This was a great project that I worked on over a few days and it was great to see it come together quickly!

  • Deleting many files from the Linux Command Line

    I’ll admit that this post is more for me than any of my readers. I have this command that is buried in my notes and always takes me forever to dig back out. I figured I’d publish it on my blog so that I would maybe commit it to memory.

    Let’s say that you have a directory with so many files that a simple “rm *” will always fail. I’ve encountered this with many WordPress logging plugins that don’t have log purging setup.

    Enter this simple Linux command line command:

    find <path> -type f -exec rm '{}' \;

    What this will do is find all the files in your path and delete them. You can modify this command with a bunch of other flags like:

    find <path> -type f -mtime 30 -exec rm '{}' \;

    Which will only delete files that haven’t been modified in the last 30 days.

    I’m sure there are many other flags and conditions you could check to create an even more fine-grained delete script but this has been useful for me!

    If this helps you, please share this with your friends!

  • Subscribing All SES Identities to an SNS Topic

    I recently ran across an issue where I was experiencing many bounced emails on my Amazon SES account. So much so that Amazon reached out and put me on a warning notice.

    I realized that I had no logging in place to handle this. In order to create a logging mechanism I decided to send all “Bounce” notifications to a Slack channel so that I could better understand what was going on.

    To accomplish this I first had to subscribe an SNS topic to a Slack channel. There are a multitude of ways that you can do this so I won’t go into detail here. If you have questions please reach out.

    I wrote a simple function to loop through all of my identities in SES and then subscribe them to my SNS topic. Here is the code:

    import boto3
    ses = boto3.client('ses')
    response = ses.list_identities()
    
    for id in response['Identities']:
        update = ses.set_identity_notification_topic(
            Identity=id,
            NotificationType='Bounce',
            SnsTopic='<your SNS ARN here>'
        )
        print(update)

    You can see this is a pretty straight forward loop that utilizes the Boto3 library in order to collect all of the identities.

    Feel free to use this code however you want and if you have any questions reach out via email or social media!

  • Setting the Starting Directory for Windows Subsystem for Linux

    I use Windows Subsystem for Linux almost every day. I run Ubuntu 20.04 for almost all of my development work. I recently re-installed Windows because I upgraded my PC after many years. One thing that has always bothered me is that when you launch WSL for the first time it doesn’t put you into your user’s home directory. But rather your Windows home directory. The fix for this is really quite simple.

    First, navigate to the settings for Microsoft Terminal:

    I use Visual Studio Code to do editing. Find the section that contains your WSL installation:

    Just below the “source” line, add the following:

    "startingDirectory": "//wsl$/Ubuntu-20.04/home/<user>",

    Replace “Ubuntu-20.04” with your distro name and “<user>” with your username.

    Save and exit!

  • Pandas & NumPy with AWS Lambda

    Fun fact: Pandas and NumPy don’t work out of the box with Lambda. The libraries that you might download from your development machine probably won’t work either.

    The standard Lambda Python environment is very barebones by default. There is no point in loading in a bunch of libraries if they aren’t needed. This is why we package our Lambda functions into ZIP files to be deployed.

    My first time attempting to use Pandas on AWS Lambda was in regards to concatenating Excel files. The point of this was to take a multi-sheet Excel file and combine it into one sheet for ingestion into a data lake. To accomplish this I used the Pandas library to build the new sheet. In order to automate the process I setup an S3 trigger on a Lambda function to execute the script every time a file was uploaded.

    And then I ran into this error:

    [ERROR] Runtime.ImportModuleError: Unable to import module 'your_module':
    IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!
    Importing the numpy c-extensions failed.

    I had clearly added the NumPy library into my ZIP file:

    So what was the problem? Well, apparently, the version of NumPy that I downloaded on both my Macbook and my Windows desktop is not compatible with Amazon Linux.

    To resolve this issue, I first attempted to download the package files manually from PyPi.org. I grabbed the latest “manylinux1_x86_x64.whl” file for both NumPy and Pandas. I put them back into my ZIP file and re-uploaded the file. This resulted in the same error.

    THE FIX THAT WORKED:

    The way to get this to work without failure is to spin up an Amazon Linux EC2 instance. Yes this seems excessive and it is. Not only did I have to spin up a new instance I had to install Python 3.8 because Amazon Linux ships with Python 2.7 by default. But, once installed you can use Pip to install the libraries to a directory by doing:

    pip3 install -t . <package name>

    This is useful for getting the libraries in the same location to ZIP back up for use. You can remove a lot of the files that are not needed by running:

    rm -r *.dist-info __pycache__

    After you have done the cleanup, you can ZIP up the files and move them back to your development machine, add your Lambda function and, upload to the Lambda console.

    Run a test! It should work as you intended now!

    If you need help with this please reach out to me on social media or leave a comment below.

  • A File Management Architecture

    A File Management Architecture

    This post is a continuation of my article: “A File Extraction Project”. This project has been a great learning experience for both frontend and backend application architecture and design. Below you will find a diagram and an explanation of all the pieces that make this work.

    1. The entire architecture is powered by Flask on an EC2 instance. When I move this project to production I intend to put an application load balancer in front to manage traffic. The frontend is also secured by Google Authentication. This provides authentication against the users existing GSuite deployment so that only individuals within the organization can access the application.
    2. The first Lambda function processes the upload functions. I am allowing for as many files as needed by the customer. The form also includes a single text field for specifying the value of the object tag. The function sends the objects into the first bucket which is object #4.
    3. The second Lambda function is the search functionality. This function allows the user to provide a tag value. The function queries all objects in bucket #4 and creates a list of objects that match the query. It then moves the objects to bucket #5 where it packages them up and presents them to the user in the form of a ZIP file.
    4. The first bucket is the storage for all of the objects. This is the bucket where all the objects are uploaded to from the first Lambda function. It is not publicly accessible.
    5. The second bucket is a temporary storage for files requested by the user. Objects are moved into this bucket from the first bucket. This bucket has a deletion policy that only allows objects to live inside it for 24 hours.

    Lambda Function for File Uploading:

    def upload():
        if request.method == 'POST':
            tag = request.form['tag']
            files = request.files.getlist('file')
            print(files)
            for file in files:
    
                print(file)
                if file:
                        filename = secure_filename(file.filename)
                        file.save(filename)
                        s3.upload_file(
                            Bucket = BUCKET_NAME,
                            Filename=filename,
                            Key = filename
                        )
                        
                        s3.put_object_tagging(
                            Bucket=BUCKET_NAME,
                            Key=filename,
                            Tagging={
                                'TagSet': [
                                    {
                                        'Key': 'Tag1',
                                        'Value': tag
                                    },
                                    {
                                        'Key': 'Tag2',
                                        'Value': 'Tag-value'
                                    },
                                ]
                            },
                        )
            msg = "Upload Done ! "

    The function lives within the Flask application. I have AWS permissions setup on my EC2 instance to allow the “put_object” function. You can assign tags as needed. The first tag references the $tag variable which is provided by the form submission.

    For Google Authentication I utilized a project I found on Github here. In the “auth” route that is created I modified it to authenticate against the “hd” parameter passed by the processes. You can see how this works here:

    @app.route('/auth')
    def auth():
        token = oauth.google.authorize_access_token()
        user = oauth.google.parse_id_token(token)
        session['user'] = user
        if "hd" not in user:
            abort(403)
        elif user['hd'] != 'Your hosted domain':
            abort(403)
        else:
            return redirect('/')

    If the “hd” parameter is not passed through the function it will abort with a “403” error.

    If you are interested in this project and want more information feel free to reach out and I can provide more code examples or package up the project for you to deploy on your own!

    If you found this article helpful please share it with your friends.

  • A File Extraction Project

    I had a client approach me regarding a set of files they had. The files were a set of certificates to support their products. They deliver these files to customers in the sales process.

    The workflow currently involves manually packaging the files up into a deliverable format. The client asked me to automate this process across their thousands of documents.

    As I started thinking through how this would work, I decided to create a serverless approach utilizing Amazon S3 for document storage and Lambda to do the processing and Amazon S3 and Cloudfront to generate a front end for the application.

    My current architecture involves two S3 buckets. One bucket to store the original PDF documents and one to pull in the documents that we are going to package up for the client before sending.

    The idea is that we can tag each PDF file with its appropriate lot number supplied by the client. I will then use a simple form submission process to supply input into the function that will collect the required documents.

    Here is the code for the web frontend:

    <!DOCTYPE html>
    <html>
    <head>
        <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.4/jquery.min.js"></script>
        <script type="text/javascript">
            $(document).ready(function() {
    
                $("#submit").click(function(e) {
                    e.preventDefault();
    
                    var lot = $("#lot").val();
    
                    $.ajax({
                        type: "POST",
                        url: 'API_URLHERE',
                        contentType: 'application/json',
                        data: JSON.stringify({
                            'body': lot,
                        }),
                        success: function(res){
                            $('#form-response').text('Query Was processed.');
                        },
                        error: function(){
                            $('#form-response').text('Error.');
                        }
                    });
    
                })
    
            });
        </script>
    </head>
    <body>
    <form>
        <label for="lot">Lot</label>
        <input id="lot">
        <button id="submit">Submit</button>
    </form>
    <div id="form-response"></div>
    </body>
    </html>

    This is a single field input form that sends a string to my Lambda function. Once the string is received we will convert it into a JSON object and then use that to find our objects within Amazon S3.

    Here is the function:

    import boto3
    import json
    
    
    def lambda_handler(event, context):
        form_response = event['body']
        tag_list = json.loads(form_response)
        print(tag_list)
        tag_we_want = tag_list['body']
        
        
        
        s3 = boto3.client('s3')
        bucket = "source_bucket"
        destBucket = "destination_bucket"
        download_list = []
        #get all the objects in a bucket
        get_objects = s3.list_objects(
            Bucket= bucket,
        )
    
        object_list = get_objects['Contents']
    
        object_keys = []
        for object in object_list:
            object_keys.append(object['Key'])
    
        object_tags = []
        for key in object_keys:
            object_key = s3.get_object_tagging(
                Bucket= bucket,
                Key=key,
            )
    
            object_tags.append(
                {
                'Key': key,
                'tags': object_key['TagSet'][0]['Value']
                }
            )
    
        for tag in object_tags:
    
            if tag['tags'] == tag_we_want:
                object_name = tag['Key']
                s3.copy_object(
                    Bucket= destBucket,
                    CopySource= {
                        'Bucket': bucket,
                        'Key': object_name,
                    },
                    Key= object_name,
                )
                download_list.append(object_name)
    
        return download_list, tag_we_want

    In this code, we define our source and destination buckets first. With the string from the form submission, we first gather all the objects within the bucket and then iterate over each object to find matching tags.

    Once we gather the files we want for our customers we then transfer these files to a new bucket. I return the list of files out of the function as well as the tag name.

    My next step is to package all the files required into a ZIP file for downloading. I first attempted to do this in Lambda but quickly realized you cannot use Lambda to generate files as the file system is read only.

    Right now, I am thinking of utilizing Docker to spawn a worker which will generate the ZIP file, place it back into the bucket and provide a time-sensitive download link to the client.

    Stay tuned for more updates on this project.

  • A Self Hosted Server Health Check

    I’m not big on creating dashboards. I find that I don’t look at them enough to warrant hosting the software on an instance and having to have the browser open to the page all the time.

    Instead, I prefer to be alerted via Slack as much as possible. I wrote scripts to collect DNS records from Route53. I decided that I should expand on the idea and create a scheduled job that would execute at a time interval. This way my health checks are fully automated.

    Before we get into the script, you might ask me why I don’t just use Route53 health checks! The answer is fairly simple. First, the cost of health checks for HTTPS doesn’t make sense for the number of web servers that I am testing. Second, I don’t want to test Route53 or any AWS resource from within AWS. Rather, I would like to use my own network to test as it is not connected to AWS.

    You can find the code and the Lambda function hosted on GitHub. The overall program utilizes a few different AWS products:

    • Lambda
    • SNS
    • CloudWatch Logs

    It also uses Slack but that is an optional piece that I will explain. The main functions reside in “main.py”. This piece of code follows the process of:

    1. Iterating over Route53 Records
    2. Filtering out “A” records and compiling a list of domains
    3. Testing each domain and processing the response code
    4. Logging all of the results to CloudWatch Logs
    5. Sending errors to the SNS topic

    I have the script running on a CRON job every hour.

    The second piece of this is the Lambda function. The function is all packaged in the “lambda_function.zip” but, I also added the function outside of the ZIP file for editing. You can modify this function to utilize your Slack credentials.

    The Lambda function is subscribed to your SNS topic so that whenever a new message appears, that message is sent to your specified Slack channel.

    I have plans to test my Terraform skills to automate the deployment of the Lambda function, SNS topic, CloudWatch Logs, and the primary script in some form.

    If you have any comments on how I could improve this function please post a comment here or raise an issue on GitHub. If you find this script helpful in anyway feel free to share it with your friends!

    Links:
    Server Health Check – GitHub

    Code – Main Function (main.py)

    import boto3
    import requests
    import os
    import time
    
    
    #aws variables
    sns = boto3.client('sns')
    aws = boto3.client('route53')
    cw = boto3.client('logs')
    paginator = aws.get_paginator('list_resource_record_sets')
    response = aws.list_hosted_zones()
    hosted_zones = response['HostedZones']
    time_now = int(round(time.time() * 1000))
    
    #create empty lists
    zone_id_to_test = []
    dns_entries = []
    zones_with_a_record = []
    #Create list of ZoneID's to get record sets from       
    for key in hosted_zones:
        zoneid = key['Id']
        final_zone_id = zoneid[12:]
        zone_id_to_test.append(final_zone_id)
    
    #Create ZoneID List    
    def getARecord(zoneid):
        for zone in zoneid:
            try:
                response = paginator.paginate(HostedZoneId=zone)
                for record_set in response:
                    dns = record_set['ResourceRecordSets']
                    dns_entries.append(dns)
    
            except Exception as error:
                print('An Error')
                print(str(error))
                raise
    #Get Records to test
    def getCNAME(entry):
        for dns_entry in entry:
            for record in dns_entry:
                if record['Type'] == 'A':
                    url = (record['Name'])
                    final_url = url[:-1]
                    zones_with_a_record.append(f"https://{final_url}")
    #Send Result to SNS                
    def sendToSNS(messages):
        message = messages
        try:
            send_message = sns.publish(
                TargetArn='YOUR_SNS_TOPIC_ARN_HERE',
                Message=message,
                )
        except:
            print("something didn't work")
    def tester(urls):
        for url in urls:
            try:
                user_agent = {'User-agent': 'Mozilla/5.0'}
                status = requests.get(url, headers = user_agent, allow_redirects=True)
                code = (status.status_code)
                if code == 401:
                    response = f"The site {url} reports status code: {code}"
                    writeLog(response)
                elif code == 301:
                    response = f"The site {url} reports status code: {code}"
                    writeLog(response)
                elif code == 302:
                    response = f"The site {url} reports status code: {code}"
                    writeLog(response)
                elif code == 403:
                    response = f"The site {url} reports status code: {code}"
                    writeLog(response)
                elif code !=200:
                    sendToSNS(f"The site {url} reports: {code}")
                    response = f"The site {url} reports status code: {code}"
                    writeLog(response)
                else:
                    response = f"The site {url} reports status code: {code}"
                    writeLog(response)
            except:
                sendToSNS(f"The site {url} failed testing")
                response = f"The site {url} reports status code: {code}"
                writeLog(response)
    
    def writeLog(message):
        getToken = cw.describe_log_streams(
            logGroupName='healthchecks',   
            )
        logInfo = (getToken['logStreams'])
        nextToken = logInfo[0]['uploadSequenceToken']
        response = cw.put_log_events(
            logGroupName='YOUR_LOG_GROUP_NAME',
            logStreamName='YOUR_LOG_STREAM_NAME',
            logEvents=[
                {
                    'timestamp': time_now,
                    'message': message
                },
            ],
            sequenceToken=nextToken
        )
    #Execute            
    getARecord(zone_id_to_test)
    getCNAME(dns_entries)
    tester(zones_with_a_record)
    
    

    Code: Lambda Function (lambda_function.py)

    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    import os
    from slack import WebClient
    from slack.errors import SlackApiError
    
    
    slack_token = os.environ["slackBot"]
    client = WebClient(token=slack_token)
    
    def lambda_handler(event, context):
        detail = event['Records'][0]['Sns']['Message']
        response_string = f"{detail}"
        try:
            response = client.chat_postMessage(
                channel="YOUR CHANNEL HERE",
                text="SERVER DOWN",
                blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
            )   
    
        except SlackApiError as e:
            assert e.response["error"]
        return