#!/opt/imh-python/bin/python3 """Provision a CWP server""" import platform import shlex from subprocess import run, CalledProcessError import logging import socket from pathlib import Path import json from typing import Union import click import pymysql # pylint:disable=line-too-long NOTIFICATION_REMOVE_LIST = [ "CSF/LFD Firewall - SECURITY ISSUE", "Firewall - SECURITY ISSUE", "Hidden Processes - SECURITY ISSUE", "CWP Secure Kernel - SECURITY ISSUE", ] NOTIFICATION_JSON_PATH = ( "/usr/local/cwpsrv/htdocs/resources/admin" "/include/libs/notifications/notifications.json" ) LOG_DIR = "/var/log/" LOG_FILE = f"{LOG_DIR}/cwp_provision.log" LOG_FORMAT = "%(asctime)s:%(levelname)s:%(message)s" localdir = Path(__file__).parent.absolute() def setup_logging(): """Setup the "provision" named logger""" logger = logging.getLogger("provision") loghandler = logging.FileHandler(LOG_FILE) logformatter = logging.Formatter(LOG_FORMAT) loghandler.setFormatter(logformatter) logger.addHandler(loghandler) # stderr logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.INFO) def run_cmd( *args, input_str: Union[str, None] = None, required: bool = True ) -> str: """run a subprocess, logging any errors""" try: ran = run( list(map(str, args)), capture_output=True, input=input_str, encoding='utf-8', errors='replace', check=False, ) except CalledProcessError as exc: command = shlex.join(exc.cmd) logger = logging.getLogger("provision") logger.warning("Process call failed with error code %d", exc.returncode) logger.warning("Command: %s", command) logger.warning("stdout: %s", exc.stdout) logger.warning("stderr: %s", exc.stderr) if required: raise logger.warning("Command %s failed, but we will continue", command) return exc.stdout return ran.stdout def configure_firewall(vps: bool): """Setup CSF""" run_cmd("/usr/sbin/csf", "-e", required=False) ips = [ "198.46.90.8", # cpjump "144.208.77.66", # jumpstation "70.39.232.170", # sys4 "198.46.90.10", # kwx "209.182.214.106", # ash-sys-pro-kwx4 "198.46.90.7", # js2 "198.46.92.44", # js3 "173.247.250.69", # lax-sys-pro-dedchecker2 "70.39.146.168", # ash-sys-pro-dedchecker2 "209.182.212.187", # ash-sys-pro-dedchecker1 "173.247.250.216", # lax-sys-pro-dedchecker1 "213.165.240.142", # ams-sys-pro-dedchecker1 "213.165.240.144", # ams-sys-pro-dedchecker2 ] if not vps: ips.append("198.46.81.133") allow_file = Path("/etc/csf/csf.allow") allow_file_text = allow_file.read_text(encoding='utf-8') lines = [] for ip in ips: if ip not in allow_file_text: lines.append(f"{ip} # inmotion") if lines: with allow_file.open("a", encoding='utf-8') as file: file.write("\n" + "\n".join(lines)) csf_err = Path("/etc/csf/csf.error") if csf_err.is_file(): csf_err.unlink() def install_cwp(version: str): """Run install_cwp.sh""" run_cmd(f"{localdir}/install_cwp.sh", version, required=True) def cwp_mysql(query: str, args: Union[tuple, None] = None): """Runs a MySQL query on the root_cwp database""" with pymysql.connect( read_default_file='/root/.my.cnf', database='root_cwp' ) as conn: with conn.cursor() as cur: cur.execute(query, args) return cur.fetchall() def set_ns(ns1: str, ns2: str): """Setup nameservers in the root_cwp database""" ns1_ip = socket.gethostbyname(ns1) ns2_ip = socket.gethostbyname(ns2) cwp_mysql( "UPDATE nameserver SET ns1_name=%s, ns1_ip=%s, ns2_name=%s, ns2_ip=%s", (ns1, ns1_ip, ns2, ns2_ip), ) def brand_install(ns1: str, ns2: str): """Setup branding""" set_ns(ns1, ns2) # install dnsadmin run_cmd("/usr/bin/yum", "install", "imh-cwp-dns", "-y") # disable terminal path = "/usr/local/cwpsrv/htdocs/admin/design/css/custom.css" with open(path, "a", encoding='utf-8') as file: file.write( "li.dropdown { display: none !important; }' >> " "# Disable 'Terminal' option in cwp" ) def remove_notifications(): """Remove items in NOTIFICATION_REMOVE_LIST FROM NOTIFICATION_JSON_PATH""" with open(NOTIFICATION_JSON_PATH, "r+", encoding='ascii') as file: notifications = json.load(file) notifications = [ notification for notification in notifications if notification["subject"] not in NOTIFICATION_REMOVE_LIST ] json.dumps(notifications) file.seek(0) file.write(json.dumps(notifications)) file.truncate() def fix_sendmail_symlink(): """Symlink /usr/sbin/sendmail to /usr/sbin/sendmail.postfix""" if not Path("/usr/sbin/sendmail").exists(): run_cmd("ln", "-s", "/usr/sbin/sendmail.postfix", "/usr/sbin/sendmail") def renew_service_ssl(hostname: str): """Setup Service SSL""" try: run_cmd("/scripts/install_acme") run_cmd("/scripts/generate_hostname_ssl") # fmt: off run_cmd( "/usr/bin/bash", "/root/.acme.sh/acme.sh", "--issue", "--cert-home", "/root/.acme.sh/cwp_certs", "-d", hostname, "-w", "/usr/local/apache/autossl_tmp/", "--certpath", "/etc/pki/tls/certs/hostname.cert", "--keypath", "/etc/pki/tls/private/hostname.key", "--fullchainpath", "/etc/pki/tls/certs/hostname.bundle", "--keylength", "2048", "--force", "--renew-hook", "/scripts/hostname_ssl_restart_services", "--log", ) # fmt: on run_cmd("/usr/bin/systemctl", "restart", "cwpsrv") except CalledProcessError: logger = logging.getLogger("provision") logger.warning("Setting Service SSL did not immediately succeed.") def enable_monit(): """Run setup_monit.sh""" run_cmd("/usr/bin/bash", "/opt/cwprads/setup_monit.sh") def tune_config(vps: bool, version: str): """Run tune-config.sh""" plat = "vps" if vps else "dedicated" # make extra adjustments to system config before handoff run_cmd("/usr/bin/bash", str(localdir / "tune-config.sh"), plat, version) def install_account_hooks_package(): """Install imh-cwp-account-hooks""" # install imh-cwp-account-hooks run_cmd("/usr/bin/yum", "install", "imh-cwp-account-hooks", "-y") @click.group() def cli(): """Placeholder""" @cli.command() @click.option( "--vps", default=False, is_flag=True, help="True if provisioning on VPS" ) @click.option("--version", required=True) @click.option("--ns1", default="ns1.inmotionhosting.com", help="nameserver 1") @click.option("--ns2", default="ns2.inmotionhosting.com", help="nameserver 2") @click.option( "do_service_ssl", "--service-ssl/--no-service-ssl", default=True, is_flag=True, help="Renew service SSL (default)", ) def cwp(vps: bool, version: str, ns1: str, ns2: str, do_service_ssl: bool): """Setup CWP""" setup_logging() install_cwp(version) configure_firewall(vps=vps) brand_install(ns1, ns2) install_account_hooks_package() enable_monit() fix_sendmail_symlink() remove_notifications() if do_service_ssl: renew_service_ssl(platform.node()) tune_config(vps, version) @cli.command(help="Renew service SSL") def service_ssl(): """Renew service SSL""" renew_service_ssl(platform.node()) @cli.command(help="Set default nameservers") @click.option("--ns1", default="ns1.inmotionhosting.com", help="nameserver 1") @click.option("--ns2", default="ns2.inmotionhosting.com", help="nameserver 2") def ns(ns1: str, ns2: str): """Set default nameservers""" set_ns(ns1, ns2) if __name__ == "__main__": cli()