import requests import json import re from datetime import datetime CONFIG_DIR="/opt/imh-cwp-dns/config" DNS_AUTHORITY_SERVER="ash-sys-pro-dns4.inmotionhosting.com" API_DEBUG=False ZONE_DIR="/var/named" if API_DEBUG: import logging import http.client as http_client http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True def getAPIAuth(): with open(f"{CONFIG_DIR}/key", "r") as f: return f.readline().strip() def imh_api_call(function, data): url = f"https://{DNS_AUTHORITY_SERVER}/api/{function}" headers = { "Accept": "application/json", "Content-type": "application/json", "API-Auth": "{}".format(getAPIAuth()) } if API_DEBUG: print(f"API Request to {url} data {data}") try: session = requests.Session() session.mount("https://", requests.adapters.HTTPAdapter(max_retries=10)) result = session.post(url, json=data, headers=headers) result.raise_for_status() # Throw an exception if the status is invalid except: if API_DEBUG: print(f"Failed API request to endpoint {function} data {data} failed with code {result.status_code}") return {"status": 1, "message": f"Failed API request to endpoint {function} data {data} failed with code {result.status_code}"} else: if API_DEBUG: print(f"Got API response {result.text}") return result.text def removeZone(zone): imh_api_call("remove_zone",{"zone": zone}) def parse_serial(data): """ Parse SOA record out of zone file Return serial number. """ # this hacky removal of comments may remove data in a txt record # which does not matter. no_comments = re.sub(r";.*", "", data) zone_lines = no_comments.splitlines() soa_lines = None # Identify the SOA record for i, line in enumerate(zone_lines): record = line.split() open_paren_found = False close_paren_found = False if len(record) > 1 and "SOA" in record[1:4]: soa_lines = [] # this is an soa record for n in range(i, len(zone_lines)): soa_line = zone_lines[n] soa_lines.append(soa_line) if "(" in soa_line: open_paren_found = True if ")" in zone_lines[n]: close_paren_found = True break if open_paren_found and close_paren_found: break else: soa_lines = None if not soa_lines: return None soa_record = [] for line in soa_lines: soa_record += line.split() return soa_record[soa_record.index("(") + 1] def updateZone(zone): zone_file_path = f"{ZONE_DIR}/{zone}.db" # Read the zone file with open(zone_file_path, "r") as f: data = f.read() original_serial_s = parse_serial(data) original_serial = int(original_serial_s) # calculate the new 'base' number for today min_serial = int(datetime.now().strftime("%Y%m%d") + "00") serial_boundary = min_serial + 99 if original_serial < min_serial: # 2024010203 < 2024020200 new_serial = min_serial elif original_serial > serial_boundary: # 2024010203 > 2024020299 new_serial = serial_boundary elif original_serial == serial_boundary: # 2024010299 == 2024010299 new_serial = min_serial # rotate back to min_seria else: # serial is in range new_serial = original_serial + 1 data = re.sub( original_serial_s, f"{new_serial}", data, ) # Write the updated data back to the zone file with open(zone_file_path, "w") as f: f.write(data) # Make the API call imh_api_call("update_zone", {"zone": zone, "data": data}) def addZone(zone): if zoneAvailable(zone): updateZone(zone) def zoneAvailable(zone): result=json.loads(imh_api_call("is_zone_available", {"zone": zone})) if result["status"] == None or result["available"] == None: return False if result["status"] != 0: return False return result["available"] == True def getZone(zone): result = json.loads(imh_api_call("get_zone", {"zone": zone })) if result["status"] == 0: return result["zone_data"] return -1