#!/opt/imh-python/bin/python3 """Create CWP user""" import base64 import http.client as http_client import json import logging import secrets import socket import string from pathlib import Path from time import sleep import yaml import rads from provision import run_cmd, localdir, cwp_mysql import click import requests CWP_API_DEBUG = False CWP_API_RETRY_COUNT = 10 CWP_API_RETRY_DELAY = 10 PASSWORD_ALLOWED_CHARS = string.digits + string.ascii_letters def get_logger(): """Get the 'provision' logger""" return logging.getLogger("provision") def cwp_api_request(host: str, endpoint: str, data: dict[str, str]): """make a request to cwp api v1""" url = f"https://{host}:2304/v1/{endpoint}" logger = get_logger() if CWP_API_DEBUG: http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True logger.info("API Request to %s data %s", url, data) else: if hasattr(requests.packages, 'urllib3'): requests.packages.urllib3.disable_warnings() retry_count = 0 while retry_count < CWP_API_RETRY_COUNT: try: session = requests.Session() session.mount( "https://", requests.adapters.HTTPAdapter(max_retries=10) ) result = session.post(url, data=data, verify=False) # Throw an exception if the status is invalid result.raise_for_status() except requests.RequestException: retry_count += 1 if retry_count < CWP_API_RETRY_COUNT: logger.warning( "API call to %s failed %s times, sleeping for %s " "seconds then trying again.", url, retry_count, CWP_API_RETRY_DELAY, ) sleep(CWP_API_RETRY_DELAY) continue break def cwp_login_request(host: str, username: str, password: str): """Perform a login request""" url = f"https://{host}:2083/index.php?acc=validate" data = { "username": username, "password": base64.b64encode(bytes(password, "ascii")).decode("utf-8"), "sessioning": 0, "userlang": "", } logger = get_logger() if CWP_API_DEBUG: http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True logger.info("API Request to %s data %s", url, data) else: if hasattr(requests.packages, 'urllib3'): requests.packages.urllib3.disable_warnings() retry_count = 0 while retry_count < CWP_API_RETRY_COUNT: try: session = requests.Session() session.mount( "https://", requests.adapters.HTTPAdapter(max_retries=10) ) result = session.post(url, data=data, verify=False) # Throw an exception if the status is invalid result.raise_for_status() except requests.RequestException: retry_count += 1 if retry_count < CWP_API_RETRY_COUNT: logger.warning( "API call to %s failed %s times, sleeping for %s seconds " "then trying again.", url, retry_count, CWP_API_RETRY_DELAY, ) sleep(CWP_API_RETRY_DELAY) continue raise break def write_line_to_file(path: str, line: str, truncate: bool = True): """write or append to a file""" p = Path(path) p.parent.mkdir(exist_ok=True, parents=True) p.touch(exist_ok=True) p.chmod(0o600) with p.open("w" if truncate else "a", encoding='utf-8') as file: file.write(line) def random_password(length: int) -> str: """Generate a password""" return "".join( secrets.choice(PASSWORD_ALLOWED_CHARS) for i in range(length) ) def update_mysql_pw() -> str: """Change MySQL root password""" mysqlpw = random_password(16) run_cmd( "/usr/local/cwpsrv/htdocs/resources/scripts/mysql_pwd_reset", input_str=mysqlpw, ) return mysqlpw def update_root_pw() -> str: """Change root password""" root_pw = random_password(16) run_cmd("/usr/bin/passwd", "root", "--stdin", input_str=root_pw) return root_pw def add_api_key() -> str: """Setup root api keys""" apikey = random_password(72) apientry = CWP_API_TOKEN.format(apikey) Path("/root/.conf/").mkdir(exist_ok=True) write_line_to_file("/root/.conf/apikey", apientry) Path("/usr/local/cwp/.conf/").mkdir(exist_ok=True, parents=True) write_line_to_file("/usr/local/cwp/.conf/api_key.conf", apikey) # Install the API httpd config into cwpsrv run_cmd("/scripts/install_api") cwpapikey = Path("/opt/cwp/cwpapikeys") cwpapikey.parent.mkdir(exist_ok=True, parents=True) cwpapikey.touch(exist_ok=True) cwpapikey.chmod(0o600) with cwpapikey.open("a", encoding='utf-8') as file: file.write(f"{apikey}\n") return apikey def mail_user(email: str, hostname: str, username: str, password: str): """Send user username/password info in an email""" mailbody = f"""Hello {username}, Your CWP installation has completed. You can log in to the user panel at https://{hostname}:2083 Your username is: {username} Your password is: {password}""" try: rads.send_email( to_addr=email, subject=f"CWP Setup on {hostname}", body=mailbody, errs=True, ) except OSError as exc: get_logger().warning("Failed to send email: %s", exc) def update_backups(): """Setup account backups""" # set default backups settings cwp_mysql(QUERIES['update_backups']) def create_user( hostname: str, apikey: str, domain: str, username: str, userpw: str, email: str, ip: str, ): """Create CWP user""" # https://docs.control-webpanel.com/docs/developer-tools/api-manager/functions/function-api-account/add data = { "key": apikey, "action": "add", "domain": domain, "user": username, "pass": userpw, "email": email, "package": "Primary Reseller", "inode": "0", "limit_nproc": "450", "limit_nofile": "150", "server_ips": ip, } cwp_api_request(ip, "account", data) cwp_login_request(hostname, username, userpw) cwp_mysql("UPDATE user SET reseller='1' WHERE id=1") run_cmd("/usr/bin/chsh", "-s", "/bin/bash", username) def update_email_alerts(email: str, hostname: str): """Set the admin alert email address""" cwp_mysql("UPDATE settings set root_email=%s", (email,)) run_cmd( "sed", "-i", f's#LF_ALERT_TO = ""#LF_ALERT_TO = "{email}"#', "/etc/csf/csf.conf", ) notif_dir = Path( "/usr/local/cwpsrv/htdocs/resources/admin/include/libs/notifications/" ) notif_config = notif_dir / "config.ini" notif_config.write_text( "\n".join( [ f"email={email}", f"from_email=notifications@{hostname}", "email_info=0", "email_warning=0", "email_danger=1", ] ) ) notif_template = notif_dir / "template.tpl" notif_template.write_text( """You've received a new %level% notification: %subject% %message% %url""" ) @click.command() @click.option( "--vps", default=False, is_flag=True, help="True if provisioning on VPS" ) @click.option("--debug", default=False, type=int, help="Enable debug") @click.option("--username", required=True, help="Username to provision") @click.option("--domain", required=True, help="Domain to provision") @click.option("--email", required=True, help="Username to provision") def main(debug: int, username: str, domain: str, email: str, vps: str): """ CWP provisioning script to generate first user """ if debug > 1: global CWP_API_DEBUG # pylint:disable=global-statement CWP_API_DEBUG = True domain = domain.lower() username = username.lower() apikey = add_api_key() userpw = random_password(16) mysqlpw = update_mysql_pw() rootpw = update_root_pw() hostname = socket.gethostname() ip = run_cmd("/usr/bin/hostname", "-I").replace("127.0.0.1", "").strip() reseller_pkg_ct = cwp_mysql(QUERIES['check_reseller_package'])[0][0] if reseller_pkg_ct < 1: raise RuntimeError("The Primary Reseller package wasn't found. " "/opt/cwprads/post-provision-settings.py normally sets this up. " "This user will have a bad time if that hasn't run") update_backups() create_user( hostname, apikey, domain, username, userpw, email, ip, ) update_email_alerts(email, hostname) if not vps: mail_user(email, hostname, username, userpw) ret = { "username": username, "userpw": userpw, "rootpw": rootpw, "mysqlpw": mysqlpw, "ip": ip, "hostname": hostname, "panelurl": f"https://{hostname}:2087/", } print(json.dumps(ret, indent=2)) def read_constants() -> tuple[str, dict[str, str]]: """Read constants.yaml""" with open(localdir / "constants.yaml", encoding='utf-8') as file: data = yaml.load(file, Loader=yaml.SafeLoader) assert isinstance(data, dict) token_perms = ",".join(data["api_perms"]) token = f"rootaccess|@|%|@|{{}}|@|{token_perms}|@|JSON|@|1|@|root" return token, data['queries'] CWP_API_TOKEN, QUERIES = read_constants() if __name__ == "__main__": # @click handles args main() # pylint:disable=no-value-for-parameter