Collecting DNS Records from Route53

I had an idea the other day to query my web servers to get a response code and potentially alert me based on the result. Ideally, I would get a Slack notification on any response that is not “200”. I was thinking that I could run this either on a CRON so that I have real-time monitoring from within my own network rather than relying on an outside service.

I broke this down into a few stages based on how the information needed to be pulled together to generate the result.

  1. Gather all the ZoneId values
  2. Pass the ZoneId’s to get all the DNS records for each ZoneId.
  3. Filter out just the “A” records
  4. Use the Requests library to get a response from the server
  5. Send a message to Slack based on the response

To start off we need to add in all our libraries and get our list of ZoneIds. You can see the code here:

import boto3
import requests
import sys
import os
from slack import WebClient
from slack.errors import SlackApiError
aws = boto3.client('route53')
paginator = aws.get_paginator('list_resource_record_sets')

#Create list of ZoneID's to get record sets from
response = aws.list_hosted_zones()
hosted_zones = response['HostedZones']
zone_id_to_test = []
dns_entries = []
zones_with_a_record = []        
for key in hosted_zones:
    zoneid = key['Id']
    final_zone_id = zoneid[12:]
    zone_id_to_test.append(final_zone_id)

You’ll notice that for the $final_zone_id we have to modify the string to get just the ID portion. By default, it includes a path such as “/hostedzone/ZXXXXXX”.

Once we have our list of zones we want to test we can pass it through a function to get all the records. I took the results and stored them into a list.

def getAllRecords(zoneid):
    for zone in zoneid:
        try:
            response = paginator.paginate(HostedZoneId=zone)
            for record_set in response:
                dns = record_set['ResourceRecordSets']
                dns_entries.append(dns)

        except Exception as error:
            print('An Error')
            print(str(error))
            raise

This code will iterate over all the DNS records for each zone and append them to the list $dns_entries. From there we can utilize another function to filter out just our “A” records that we need to test on.

def getARecords(entry):
    for dns_entry in entry:
        for record in dns_entry:
            if record['Type'] == 'A':
                url = (record['Name'])
                final_url = url[:-1]
               zones_with_a_record.append(f"https://{final_url}")

You’ll notice we are appending the “A” record to the list $zones_with_a_record. We are actually appending the name of the record which is modified to be a URL string. This is so that in our final function we can send the URL to the Requests library for testing.

This final section is two functions. One that will notify Slack on anything that is not a response of “200” and, one that will notify Slack on failure to retrieve a response from the server. This is important in the event that a server does not respond or the request times out.

def status_collector(urls):
    for url in urls:
        try:
            slack_token = "XXXXXXXXXXXXXX"
            client = WebClient(token=slack_token)   
            user_agent = {'User-agent': 'Mozilla/5.0'}
            status = requests.get(url, headers = user_agent, allow_redirects=True)
            code = (status.status_code)
            if code != 200:
                try:
                    response_string = f"The site {url} is down. Status code: {code}"
                    response = client.chat_postMessage(
                        channel="XXXXXX",
                        text="SERVER DOWN",
                        blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
                            )

                except SlackApiError as e:
                    assert e.response["error"]

            else:
                print(f"Server: {url} reports: {code}")
        except:
            slackFailure(url)
            
def slackFailure(url):
    try:
        slack_token = "XXXXXXXXXXX"
        client = WebClient(token=slack_token)
        response_string = f"The site {url} testing failed"
        response = client.chat_postMessage(
            channel="XXXXXXXXX",
            text="SERVER DOWN",
            blocks = [{"type": "section", "text": {"type": "plain_text", "text": response_string}}]
                )
    except SlackApiError as e:
        print(e.response["error"])

If you re-use this code you will need to add in your own Slack Token and channel string. It is also important to note the requirement of establishing a User-Agent for your requests. Without this, your server will likely reject the request and you will have repeated errors.

If you found this code helpful please share it with your friends. If you have suggestions on how to make it better please shoot me a message or a comment!

Boto3 Library
Requests Library


Posted

in

,

by

Tags:

Comments

One response to “Collecting DNS Records from Route53”

  1. […] I prefer to be alerted via Slack as much as possible. I wrote scripts to collect DNS records from Route53. I decided that I should expand on the idea and create a scheduled job that would execute at a time […]

Leave a Reply