"""Library for interacting with Control Web Panel .. data:: cwp.HAS_CWP: Whether CWP is installed *(bool)* .. data:: cwp.MYSQL_SOCK: MySQL socket path *(str)* """ import os from pathlib import Path import pwd from typing import Literal, Union, overload import pymysql from pymysql.cursors import Cursor HAS_CWP = ( not Path('/usr/local/cpanel/cpsrvd').is_file() and Path('/usr/local/cwpsrv/bin/cwpsrv').is_file() ) if Path('/etc/redhat-release').exists(): MYSQL_SOCK = '/var/lib/mysql/mysql.sock' else: MYSQL_SOCK = '/run/mysqld/mysqld.sock' def all_domains(user: str) -> set[str]: """Get all domains for a CWP user Args: user (str): CWP username Raises: pymysql.Error: any issue querying the root_cwp database Returns: set[str]: domain names for this user (primary + domains + subdomains) """ assert HAS_CWP sql = """SELECT `domain` FROM `user` WHERE `username`=%s UNION SELECT `domain` FROM `domains` WHERE `user`=%s UNION SELECT CONCAT(`subdomain`, '.', `domain`) FROM subdomains WHERE user=%s""" args = (user, user, user) with root_cwp() as conn, conn.cursor() as cur: cur: Cursor cur.execute(sql, args) return {x[0] for x in cur.fetchall()} def get_docroots(user: str) -> dict[str, str]: """Get site document roots for a CWP user Args: user (str): CWP username Raises: pymysql.Error: any issue querying the root_cwp database KeyError: if the requested user does not exist Returns: dict[str, str]: domains and document root paths """ assert HAS_CWP public_html = os.path.join(pwd.getpwnam(user).pw_dir, 'public_html') sql = """SELECT `domain`, NULL FROM `user` WHERE `username`=%s UNION SELECT `domain`, `path` FROM `domains` WHERE `user`=%s UNION SELECT CONCAT(`subdomain`, '.', `domain`), `path` FROM subdomains WHERE user=%s""" args = (user, user, user) docroots = {} with root_cwp() as conn, conn.cursor() as cur: cur: Cursor cur.execute(sql, args) rows = cur.fetchall() for row in rows: dom, path = row if path is None: # path is NOT NULL in the subdomain and domains schemas, so this # NULL from the primary domain in the user table docroots[dom] = public_html else: docroots[dom] = path return docroots @overload def all_users(owners: Literal[False] = False) -> list[str]: ... @overload def all_users(owners: Literal[True] = True) -> dict[str, str]: ... def all_users(owners: bool = False) -> Union[dict[str, str], list[str]]: """Returns CWP users Args: owners: whether to return users as a dict with owners as the values Raises: pymysql.Error: any issue querying the root_cwp database Returns: either a list of all users, or a dict of users (keys) to owners (vals) """ with root_cwp() as conn, conn.cursor() as cur: cur: Cursor cur.execute('SELECT username, reseller FROM user') data = dict(cur.fetchall()) if owners: return { user: owner if owner in data else 'root' for user, owner in data.items() if user != 'root' } return [x for x in data if x != 'root'] def root_cwp(**kwargs) -> pymysql.Connection: """Open and return a MySQL connection to the root_cwp database Returns: pymysql.Connection: Connection to the root_cwp database """ return pymysql.connect( database='root_cwp', read_default_file='/root/.my.cnf', unix_socket=MYSQL_SOCK, **kwargs, ) def vmail_paths(user: str, check_exists: bool = False) -> list[Path]: """Get /etc/vmail/{domain} paths for a user Args: user (str): CWP username check_exists (bool): if True, only return paths that exist. Paths are only created when there's at least one email account for that domain Raises: pymysql.Error: any issue querying the root_cwp database Returns: list[Path]: path objects for each vmail directory """ paths = [] for domain in all_domains(user): if domain.startswith('..') or domain.startswith('/'): # Shouldn't ever happen - .. and / are not valid in domains. # Check anyways in case of malicious injection. raise ValueError(f"{domain=}") path = Path('/var/vmail', domain) if not check_exists or path.exists(): paths.append(path) return paths