Your cart is currently empty!
Tag: lambda
-
An AI Fantasy Football Draft Assistant
Last year I attempted to program a Fantasy Football draft assistant which took live data from ESPN’s Fantasy Platform. Boy was that a mistake…
First of all, shame on ESPN for not having an API for their Fantasy sports applications. The reverse engineered methods were not fast enough nor were they reliable. So, this year I took a new approach to building out a system for getting draft pick recommendations for my team.
I also wanted to put to use the example architecture and code I wrote the other day for the Strands SDK to work so I utilized it to build an API which would utilize the AWS Bedrock platform to analyze data and and ultimately return the best possible picks.
Here is a simple workflow of how the tool works:I generated this with Claude AI. It is pretty OK. The first problem I encountered was getting data. I needed two things:
1. Historical data for players
2. Projected fantasy data for the upcoming season
The historical data provides information about the players past season and the projections are for the upcoming season, obviously. The projections are useful because of any incoming rookies.
In the repository I link below I put a scripts to scrape FantasyPros for both the historical and projected data. It stores them in separate files in case you want to utilize them in a different way. There is also a script to combine them into one data source and ultimately load them into a DynamoDB table.
The most important piece of the puzzle was actually simulating the draft. I needed to create a program that would be able to track the other team’s draft picks as well as give me the suggestions and track my teams picks. This is the heart of the repository and I will be using it to get suggestions and track the draft for this coming season.Through the application, when you issue the “next” command the application will send a request to the API with the current situation of the draft. The payload looks like this:
payload = { "team_needs": team_needs, "your_roster": your_roster, "already_drafted": all_drafted_players, "scoring_format": self.session.scoring_format if self.session else "ppr", "league_size": self.session.league_size if self.session else 12 }
The “team_needs” key represents the current number of players remaining for each position. The “your_roster” position is all of the current players on my team. The other important key is “already_drafted”. This key sends all of the drafted players to the AI agent so it knows who NOT to recommend.
The application goes through all of the picks and you are able to manually enter each of the other teams picks until the draft is complete.
I’ll post an update after my draft on August 24th with the team I end up with! I still will probably lose in my league but this was fun to build. I hope to add in some sort of week-to-week management of my team as well as a trade analysis tool in the future. It would also be cool to add in some sort of analysis that could send updates to my Slack or Discord.
If you have other ideas message me on any platform you can find me on!
GitHub: https://github.com/avansledright/fantasy-football-agent -
Deploying a Strands Agent on AWS Lambda using Terraform
Recently I’ve been exploring the AI space a lot more as I’m sure a lot of you are doing as well. I’ve been looking at the Strands Agent SDK. I see this SDK as being very helpful in building out agents in the future (follow the blog to see what I come up with!).
One thing that is not included in the SDK is the ability to deploy with Terraform. The SDK includes examples of how to package and deploy with Amazon Web Services CDK so I adapted that to utilize Terraform.
I took my adaptation a step further and added an API Gateway layer so that you have the beginnings of a very simple AI agent deployed with the Strands SDK.
Check out the code here: https://github.com/avansledright/terraform-strands-agent-api
The code in the repository is fairly simple and includes everything you need to build an API Gateway, Lambda function, and some other useful resources just to help out.
The key to all of this is packaging the required dependencies inside of the Lambda Layer. Without this the function will not work.
File structure:
terraform-strands-agent-api/
└── lambda_code/
│ ├── lambda_function.py # Your Strands agent logic
│ └── requirements.txt # strands-agents + dependencies
├── api_gateway.tf # API Gateway configuration
├── iam.tf # IAM roles and policies
├── lambda.tf # Lambda function setup
├── locals.tf # Environment variables
├── logs.tf # CloudWatch logging
├── s3.tf # Deployment artifacts
├── variables.tf # Configurable inputs
└── outputs.tf # API endpoints and resource IDsYou shouldn’t have to change much in any of these files until you want to fully start customizing the actual functionality of the agent.
To get started follow the instructions below!
git clone https://github.com/avansledright/terraform-strands-agent-api cd terraform-strands-agent-api # Configure your settings. Add other values as needed echo 'aws_region = "us-west-2"' > terraform.tfvars # Deploy everything terraform init terraform plan terraform apply
If everything goes as planned you should see the output of a curl command which will give you the ability to test the demo code.
If you run into any issues feel free to let me know! I’d be happy to help you get this up and running.
If this has helped you in any way, please share it on your social media and with any of your friends!
-
Building out a reusable Terraform framework for Flask Applications
I find myself utilizing the same architecture for deploying demo applications on the great Python library Flask. I’ve been using the same Terraform files over and over again to build out the infrastructure.
Last weekend I decided it was time to build a reusable framework for deploying these applications. So, I began building out the repository. The purpose of this repository is to give myself a jumping off point to quickly deploy applications for demonstrations or live environments.
Let’s take a look at the features:
- Customizable Environments within Terraform for managing the infrastructure across your development and production environments
- Modules for:
- Application Load Balancer
- Elastic Container registry
- Elastic Container Service
- VPC & Networking components
- Dockerfile and Docker Compose file for launching and building the application
- Demo code for the Flask application
- Automated build and deploy for the container upon code changes
This module is built for any developer who wants to get started quickly and deploy applications fast. Using this framework will allow you to speed up your development time by being able to focus solely on the application rather than the infrastructure.
Upcoming features:
- CI/CD features using either GitHub Actions or Amazon Web Services like CodePipeline and Codebuild
- Custom Domain Name support for your application
If there are other features you would like to see me add shoot me a message anytime!
Check out the repository here:
https://github.com/avansledright/terraform-flask-module -
API For Pre-signed URLs
Pre-signed URL’s are used for downloading objects from AWS S3 buckets. I’ve used them many times in the past for various reasons but this idea was a new one. A proof of concept for an API that would create the pre-signed URL and return it to the user.
This solution utilizes an API Gateway and an AWS Lambda function. The API Gateway takes two parameters “key” and “expiration”. Ultimately, you could add another parameter for “bucket” if you wanted the gateway to be able to get objects from multiple buckets.
I used Terraform to create the infrastructure and Python to program the Lambda.
Take a look at the Lambda code below:
import boto3 import json import os from botocore.exceptions import ClientError def lambda_handler(event, context): # Get the query parameters query_params = event.get('queryStringParameters', {}) if not query_params or 'key' not in query_params: return { 'statusCode': 400, 'body': json.dumps({'error': 'Missing required parameter: key'}) } object_key = query_params['key'] expiration = int(query_params.get('expiration', 3600)) # Default 1 hour # Initialize S3 client s3_client = boto3.client('s3') bucket_name = os.environ['BUCKET_NAME'] try: # Generate presigned URL url = s3_client.generate_presigned_url( 'get_object', Params={ 'Bucket': bucket_name, 'Key': object_key }, ExpiresIn=expiration ) return { 'statusCode': 200, 'headers': { 'Access-Control-Allow-Origin': '*', 'Content-Type': 'application/json' }, 'body': json.dumps({ 'url': url, 'expires_in': expiration }) } except ClientError as e: return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) }
The Terraform will also output a Postman collection JSON file so that you can immediately import it for testing. If this code and pattern is useful for you check it out on my GitHub below.
-
Building a Generative AI Workflow with AWS Bedrock
I’ve finally been tasked with a Generative AI project to work on. I’ve done this workflow manually with ChatGPT in the past and it works quite well but, for this project, the requirement was to use Amazon Web Services’ new product “AWS Bedrock”.
The workflow takes in some code and writes a technical document to support a clear English understanding of what the code is going to accomplish. Using AWS Bedrock, the AI will write the document and output it to an S3 bucket.
The architecture involves uploading the initial code to an S3 Bucket which will then send the request to an SQS queue and ultimately trigger a Lambda to prompt the AI and fulfill the output upload to a separate S3 bucket. Because this was a proof of concept, the Lambda function was a significant compute resource however going forward I am going to look at placing this code into a Docker container so that it can scale for larger code inputs.
Here is the architecture diagram:
Let’s take a look at some of the important code. First is the prompt management. I wrote a function that will take input of the code as well as a parameter of “prompt_type”. This will allow the function to be scalable to accommodate other prompts in the future.
def return_prompt(code, prompt_type): if prompt_type == "testPrompt": prompt1 = f"Human: <your prompt>. Assistant:" return prompt1
The important thing to look at here is the format of the message. You have to include the “Human:” and the “Assistant:”. Without this formatting, your API call will error.
The next bit of code is what we use to prompt the Bedrock AI.
prompt_to_send = prompts.return_prompt(report_file, "testPrompt") body = { "prompt": prompt_to_send, "max_tokens_to_sample": 300, "temperature": 0.1, "top_p": 0.9 } accept = 'application/json' contentType = 'application/json' # Return Psuedo code bedrock_response = h.bedrock_actions.invoke_model(json.dumps(body, indent=2).encode('utf-8'), contentType, accept, modelId=modelid)
def invoke_model(body, contentType, accept, modelId): print(f"Body being sent: {body}") try: response = bedrock_runtime.invoke_model( body=body, contentType=contentType, accept=accept, modelId=modelId ) return response except ClientError as e: print("Failed to invoke Bedrock model") print(e) return False
The body of our request is what configures Bedrock to run and create a response. These values can be tweaked as follows:
max_tokens_to_sample: This specifies the number of tokens to sample in your request. Amazon recommends setting this to 4000
TopP: Use a lower value to ignore less probable options.
Top K: Specify the number of token choices the model uses to generate the next token.
Temperature: Use a lower value to decrease randomness in the response.
You can read more about the inputs here.If you want to see more of this code take a look at my GitHub repository below. Feel free to use it wherever you want. If you have any questions be sure to reach out to me!
-
Automated Lambda Testing
Look, I know there are a bunch of test frameworks that you could use for your Lambda functions. But what if you wanted something simple? I spent an afternoon putting together what I would want in a testing pipeline that returns a simple “Success/Fail” type response to me via Email.
An architecture diagram for your eyes:
The idea is to create a JSON object with a key and value pair of the name of the Lambda function and the test event to pass to the lambda. Once the file is uploaded to the S3 bucket the pipeline can be triggered where a Codebuild job will iterate through the Lambdas and their events. The Lambdas will be tested with the event and return whether or not they are successful. The results are then sent to an SNS topic to be distributed to the developers.
Going forward, I hope to automate adding new Lambda functions to the JSON file so that testing can also be scheduled.
I spent time packaging this solution up with all the appropriate Terraform files and code. If you are interested in this solution feel free to reach out and I can deliver the packaged application to you!
Sample Code: GitHub
-
SES Monitoring
I love AWS. But one thing they don’t do is build complete tools. SES is one of them. I recently started getting emails about high usage for one of the identities that I have set up for SES. I would assume that there was a way to track usage within CloudWatch but for the life of me I couldn’t find one. So I guess that means I need to build something.
The idea here is pretty simple, within SES identities you can set up a notification. So, I created an SNS topic and subscribed all delivery notifications to the topic. Then, subscribe a Lambda function to the topic. The lambda function acts as the processor for the records then formats them in a usable way and puts them into DynamoDB. I used the identity as the primary key. The result is a simple application architecture like the below image.
Every time an email is delivered the lambda function processes the event and checks the DynamoDB table to see if we have an existing record. If the identity is already present in the table it returns the “count” value so that we can increment the value. The “destination” value appends the destination of the email being sent. Below is a sample of the code I used to put the object into the DynamoDB Table.
def put_dynamo_object(dynamo_object): count = str(dynamo_get_item(dynamo_object)) if count == None or count == 0: count = str(1) else: count = int(count) + 1 # get email address from the long string source_string = dynamo_object['source'] email_match = match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', source_string) email = match.group(0) try: table.update_item( Key={ 'identity': email }, AttributeUpdates={ 'details': { 'Value': { 'caller_identity': dynamo_object['caller_identity'], 'source': dynamo_object['source'], 'destination': dynamo_object['destination'], 'count': str(count) } } } ) return True except ClientError as e: print("Failed to put record") print(e) return False
If you want to use this code feel free to reach out to me and I will share with you the Terraform to deploy the application and as always, reach out with questions or feedback!
-
Deleting MANY Lambda Function Versions
I recently came across a challenge where I wanted to purge old Lambda Function versions. Some of the functions had over 65 versions!
The code below will iterate through a text file with a Lambda function defined on each line. It will get a list of all the versions and delete any version that is not the highest version or any versions that are attached to an alias.
import boto3 from botocore.exceptions import ClientError client = boto3.client("lambda", region_name='us-west-2') def delete_function_version(function_name, version): try: response = client.delete_function( FunctionName=function_name, Qualifier=str(version) ) except ClientError as e: print("Failed to delete version of", function_name, "version number", str(version)) print(e) def get_layer_versions(function_name): try: response = client.list_versions_by_function( FunctionName=function_name ) return response['Versions'] except ClientError as e: print('failed to get info for', function_name) print(e) if __name__ == "__main__": print("Starting lambda update") with open("lambda_list.txt", "r") as text: lambda_list = text.read().splitlines() text.close() version_info_results = {} for lambda_function in lambda_list: print("Working with lambda", lambda_function) lambda_versions = get_layer_versions(lambda_function) lambda_version_list = [] for version in lambda_versions: version_number = version['Version'] if version_number == "$LATEST": pass else: lambda_version_list.append(int(version_number)) for lambda_version in lambda_version_list: if lambda_version == max(lambda_version_list): print("This is the latest version skipping", str(lambda_version)) pass else: print("Deleting version", lambda_version, "of", lambda_function) delete_function_version(lambda_function, lambda_version)
To use this function you first need to populate the text file with any Lambda function(s) that you want to evaluate and then execute the Python script.
I hope that this helps you or a coworker!
EDIT: Thanks to Alex Iliev for testing and finding some bugs in this code!
-
Moving AWS Cloudfront Logs to DynamoDB
I think its pretty obvious that I love DynamoDB. It has become one of my favorite AWS Services and I use it almost every day at work and am getting better at using it for my personal projects as well.
I had a client approach me about getting logs from a Cloudfront Distribution. Cloudfront has a native logging function that spits out .GZ files to an S3 bucket. My client doesn’t have any sort of log ingestion service so rather than build one I decided we could parse the .GZ files and store the data into a DynamoDB table. To accomplish this I created a simple lambda:
import boto3 import gzip import uuid from datetime import datetime from datetime import timedelta import time from botocore.exceptions import ClientError #Creates a time to live value def ttl_time(): now = datetime.now() ttl_date = now + timedelta(90) final = str(time.mktime(ttl_date.timetuple())) return final #Puts the log json into dynamodb: def put_to_dynamo(record): client = boto3.resource('dynamodb', region_name='us-west-2') table = client.Table('YOUR_TABLE_NAME') try: response = table.put_item( Item=record ) print(response) except ClientError as e: print("Failed to put record") print(e) return False return True def lambda_handler(event, context): print(event) s3_key = event['Records'][0]['s3']['object']['key'] s3 = boto3.resource("s3") obj = s3.Object("YOUR_BUCKET", s3_key) with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile: content = gzipfile.read() #print(content) my_json = content.decode('utf8').splitlines() my_dict = {} for x in my_json: if x.startswith("#Fields:"): keys = x.split(" ") else: values = x.split("\t") for key in keys: if key == "#Fields:": pass else: for value in values: my_dict[key] = value x = 0 for item in keys: if item == "#Fields:": pass else: my_dict[item] = values[x] x +=1 print('- ' * 20) myuuid = str(uuid.uuid4()) print(myuuid) my_dict["uuid"] = myuuid my_dict['ttl'] = ttl_time() print(my_dict) if put_to_dynamo(my_dict) == True: print("Successfully imported item") return True else: print("Failed to put record") return False
This lambda runs every time there is an S3 object created. It takes grabs the .GZ file and parses it into a dictionary that can be imported into DynamoDB. One other thing to note is that I append a UUID so that I can help track down errors.
I wrote a simple front end for the client to grab records based on date input which writes the logs to a CSV so they can parse them on their local machines. I have a feeling we will be implementing a log aggregation server soon!
If this code helps you please share it with your friends and co-workers!
-
Updating AWS Managed Prefix Lists
I was working with a customer the other day trying to come up with a way to import a bunch of IP addresses into a white list on AWS. We came up with the approach of using Managed Prefix Lists in VPC. I wrote some Python in order to grab some code from an API and then automatically put it into a prefix list.
The code takes input from an API that is managed by a 3rd party. We first use that and parse the returned values into meaningful lists. After that, we pass the IPs to the function which it will check if the entry exists or not. If it does, it will pass the IP. If it doesn’t exist it will automatically add it.
import requests import json import os import boto3 from botocore.exceptions import ClientError import ipaddress def check_for_existing(list_id, ip): client = boto3.client("ec2", region_name="us-west-2") try: response = client.get_managed_prefix_list_entries( PrefixListId=list_id, MaxResults=100, ) for entry in response['Entries']: if entry['Cidr'] == ip: return True else: pass return False except ClientError as e: print(e) def get_prefix_list_id(list_name): client = boto3.client("ec2", region_name="us-west-2") response = client.describe_managed_prefix_lists( MaxResults=100, Filters=[ { "Name": "prefix-list-name", "Values": [list_name] } ] ) for p_list in response['PrefixLists']: return {"ID": p_list['PrefixListId'], "VERSION": p_list['Version']} def update_managed_prefix_list(list_name, ip): client = boto3.client("ec2", region_name="us-west-2") if check_for_existing(get_prefix_list_id(list_name)['ID'], ip) == True: print("Rule already exists") return False else: try: response = client.modify_managed_prefix_list( DryRun=False, PrefixListId=get_prefix_list_id(list_name)['ID'], CurrentVersion=get_prefix_list_id(list_name)['VERSION'], AddEntries=[ { "Cidr": ip } ] ) return True except ClientError as e: print(e) print("Failed to update list") if __name__ == "__main__": url = "https://<my IP address URL>" headers = {} r = requests.get(url, headers=headers) json_ips = json.loads(r.content) ip = "" list_name = "" result = update_managed_prefix_list(list_name, ip) if result == True: print("Successfully Updates lists") else: print("Failed to update lists")
If you are going to use this code it will need some modifications. I ultimately did not deploy this code but I had plans to run it as a Lambda function on a schedule so the lists would always be up to date.
If this code is helpful to you please share it with your friends!
Github