#!/opt/imh-python/bin/python3 """ Module used for authenticated cwp frontend actions """ import sys import requests import urllib3 import re import subprocess import secrets import socket from contextlib import contextmanager urllib3.disable_warnings() def run(cmd): """Runs a command in a subprocess""" result = subprocess.run( cmd, shell=True, capture_output=True, encoding="utf-8" ) return result def get_hash(): """get root pw hash""" get_hash = run("getent shadow root") if get_hash.returncode != 0: print("Failed to fetch root hash") print(get_hash.stderr) sys.exit(1) assert "root:" in get_hash.stdout return get_hash.stdout.split(":")[1] def at_job(cmd): result = subprocess.run( args=["at", "-M", "now", "+", "5", "minutes"], input=cmd, capture_output=True, encoding="utf-8", ) if result.returncode == 0: # at returns jid in std for line in result.stderr.splitlines(): if line.startswith("job"): jid = int(line.split()[1]) return int(jid) else: print("Warning: unable to schedule password restore.") return None def temp_root(): """ Temporarily change root password. Pause LFD. Configure 'at' job to resume in case of script failure """ backup = get_hash() new_pass = secrets.token_urlsafe(24) run(f"echo 'root:{new_pass}' | chpasswd -c SHA512 -s 0") new_hash = get_hash() restore_cmd = f"(echo 'root:{backup}' | chpasswd -e); (pgrep lfd | xargs kill -SIGCONT)" # grep for new hash in case the user changed it, we don't want to overwrite that. at_jid = at_job(f"grep -qF '{new_hash}' && {restore_cmd}") if isinstance(at_jid, int): restore_cmd = f"at -r {at_jid}; {restore_cmd}" # pause LFD to disable alerts run("(pgrep lfd | xargs kill -SIGSTOP)") return {"pass": new_pass, "restore": restore_cmd, "jid": at_jid} @contextmanager def login(): """ Temporarily change root password. Pause LFD. Configure at job to resume in case of script failure Return authenticated requests.Session, admin_url in tuple. """ temproot = temp_root() password = temproot["pass"] try: sess = requests.Session() sess.verify = False url = f"https://{socket.gethostname()}:2087" response = sess.post( f"{url}/login/index.php", data=dict(username="root", password=password, commit="Login"), ) if response.status_code != 200: print(f"Failed to login: HTTP {response.status_code} returned") return None if "/admin" not in response.url: raise Exception("Session login didn't return admin URL") cpsess_url = re.search(r"^(.+/cwp_[^/]+/admin)", response.url) if not cpsess_url: raise Exception("Admin url doesn't look valid") yield (sess, cpsess_url.group(1)) finally: run(temproot["restore"]) print("Restored original password")