"""Functions for fetching basic info on user accounts""" from pathlib import Path import pwd import grp import re import os import tarfile from typing import Literal, Optional, Union, overload import yaml from ._yaml import DumbYamlLoader from . import SYS_USERS, STAFF_GROUPS, OUR_RESELLERS class CpuserError(Exception): """Raised when there's something wrong collecting cPanel user info""" __module__ = 'rads' def get_login() -> str: """Obtain which user ran this script Returns: username """ try: blame = os.getlogin() except OSError: blame = pwd.getpwuid(os.geteuid()).pw_name return blame get_login.__module__ = 'rads' def is_cpuser(user: str) -> bool: """Checks if a user is a valid cPanel user. Warning: This only checks if the user exists and will also be true for restricted cPanel users. Use ``cpuser_safe`` instead if you need to check for those Args: user: cPanel username to check Returns: Whether the cPanel user exists """ try: homedir = pwd.getpwnam(user).pw_dir except KeyError: return False return all( ( os.path.isdir(homedir), os.path.isfile(os.path.join('/var/cpanel/users', user)), os.path.isdir(os.path.join('/var/cpanel/userdata', user)), ) ) is_cpuser.__module__ = 'rads' @overload def all_cpusers(owners: Literal[False] = False) -> list[str]: ... @overload def all_cpusers(owners: Literal[True] = True) -> dict[str, str]: ... def all_cpusers(owners: bool = False) -> Union[dict[str, str], list[str]]: """Returns cPanel users from /etc/trueuserowners Args: owners: whether to return users as a dict with owners as the values Raises: CpuserError: if /etc/trueuserowners is invalid Returns: either a list of all users, or a dict of users (keys) to owners (vals) """ with open('/etc/trueuserowners', encoding='utf-8') as userowners: userdict = yaml.load(userowners, DumbYamlLoader) if not isinstance(userdict, dict): raise CpuserError('/etc/trueuserowners is invalid') if owners: return userdict return list(userdict.keys()) all_cpusers.__module__ = 'rads' def main_cpusers() -> list: """Get a all non-child, non-system, "main" cPanel users Raises: CpuserError: if /etc/trueuserowners is invalid""" return [ user for user, owner in all_cpusers(owners=True).items() if owner in OUR_RESELLERS or owner == user ] main_cpusers.__module__ = 'rads' def get_owner(user: str) -> str: """Get a user's owner (even if the account has reseller ownership of itself) Warning: the owner may be root, which is not a cPanel user Hint: If looking this up for multiple users, use ``get_cpusers(owners=True)`` instead Args: user: cPanel username to find the owner for Raises: CpuserError: if /etc/trueuserowners is invalid or the requested user is not defined in there """ try: return all_cpusers(owners=True)[user] except KeyError as exc: raise CpuserError(f'{user} is not in /etc/trueuserowners') from exc get_owner.__module__ = 'rads' def is_child(user: str) -> bool: """Check if a cPanel user is not self-owned and not owned by a system user Args: user: cPanel username to check Raises: CpuserError: if /etc/trueuserowners is invalid or the requested user is not defined in there """ owner = get_owner(user) return owner not in OUR_RESELLERS and owner != user is_child.__module__ = 'rads' def get_children(owner: str) -> list[str]: """Get a list of child accounts for a reseller Args: owner: cPanel username to lookup Returns: all child accounts of a reseller, excluding itself Raises: CpuserError: if /etc/trueuserowners is invalid """ return [ usr for usr, own in all_cpusers(owners=True).items() if own == owner and usr != own ] get_children.__module__ = 'rads' def cpuser_safe(user: str) -> bool: """Checks whether the user is safe for support to operate on - The user exists and is a valid cPanel user - The user is not a reserved account - The user is not in a staff group Args: user: cPanel username to check """ # SYS_USERS includes SECURE_USER if user in SYS_USERS or user in OUR_RESELLERS or not is_cpuser(user): return False for group in [x.gr_name for x in grp.getgrall() if user in x.gr_mem]: if group in STAFF_GROUPS: return False return True cpuser_safe.__module__ = 'rads' def cpuser_suspended(user: str) -> bool: """Check if a user is currently suspended Warning: This does not check for pending suspensions Args: user: cPanel username to check """ return os.path.exists(os.path.join('/var/cpanel/suspended', user)) cpuser_suspended.__module__ = 'rads' def get_homedir(user: str): """Get home directory path for a cPanel user Args: user: cPanel username to check Raises: CpuserError: if the user does not exist or the home directory path found does not match the expected pattern """ try: homedir = pwd.getpwnam(user).pw_dir except KeyError as exc: raise CpuserError(f'{user}: no such user') from exc if re.match(r'/home[0-9]*/\w+', homedir) is None: # Even though we fetched the homedir successfully from /etc/passwd, # treat this as an error due to unexpected output. If the result was # '/' for example, some calling programs might misbehave or even # rm -rf / depending on what it's being used for raise CpuserError(f'{user!r} does not match expected pattern') return homedir get_homedir.__module__ = 'rads' def get_primary_domain(user: str) -> str: """Get primary domain from cpanel userdata Args: user: cPanel username to check Raises: CpuserError: if cpanel userdata cannot be read or main_domain is missing """ userdata_path = os.path.join('/var/cpanel/userdata', user, 'main') try: with open(userdata_path, encoding='utf-8') as userdata_filehandle: return yaml.safe_load(userdata_filehandle)['main_domain'] except (yaml.YAMLError, KeyError, OSError) as exc: raise CpuserError(exc) from exc get_primary_domain.__module__ = 'rads' def whoowns(domain: str) -> str: """ Get the cPanel username that owns a domain Args: domain: Domain name to look up Returns: The name of a cPanel user that owns the domain name, or None on failure """ try: with open('/etc/userdomains', encoding='utf-8') as file: match = next(x for x in file if x.startswith(f'{domain}: ')) return match.rstrip().split(': ')[1] except (OSError, FileNotFoundError, StopIteration): return None whoowns.__module__ = 'rads' def get_plan(user: str) -> Optional[str]: """ Retrieves the hosting plan name for a given cPanel user. This function reads the user's configuration file from /var/cpanel/users and extracts the value assigned to the PLAN variable, if present. Parameters: user (str): The cPanel username. Returns: Optional[str]: The plan name if found, otherwise None. """ path = Path(f"/var/cpanel/users/{user}") if not path.exists(): return None for line in path.read_text(encoding="utf-8").splitlines(): if line.startswith("PLAN="): return line.split("=", 1)[1].strip() return None get_plan.__module__ = 'rads' class UserData: """Object representing the data parsed from userdata Args: user: cPanel username to read cPanel userdata for. Required if pkgacct is not set. data_dir: override this to read /var/cpanel/userdata from some other directory, such as from a restored backup. Ignored if pkgacct is set all_subs: if True, list all subdomains, even those which have were created so an addon domain can be parked on them pkgacct: Don't set this manually. See UserData.from_pkgacct instead. tar: Don't set this manually. See UserData.from_pkgacct instead. Raises: CpuserError: if cPanel userdata is invalid Attributes: user (str): username primary (UserDomain): UserDomain object for the main domain addons (list): UserDomain objects for addon domains parked (list): UserDomain objects for parked domains subs (list): UserDomain objects for subdomains Hint: Use vars() to view this ``UserData`` object as a dict """ user: str primary: 'UserDomain' addons: list['UserDomain'] parked: list['UserDomain'] subs: list['UserDomain'] __module__ = 'rads' def __init__( self, user: Union[str, None] = None, data_dir: str = '/var/cpanel/userdata', all_subs: bool = False, pkgacct: Union[str, None] = None, tar: Union[tarfile.TarFile, None] = None, ): """Initializes a UserData object given a cPanel username""" self.pkgacct = pkgacct if user is None and pkgacct is None: raise TypeError("either user or pkgacct must be set") if user is not None and pkgacct is not None: raise TypeError("user cannot be set if pkgacct is set") if pkgacct is not None and tar is None: raise TypeError( "tar must be set if pkgacct is set; " "use the UserData.from_pkgacct alias instead" ) if pkgacct: filename = Path(pkgacct).name file_re = re.compile(r'(?:cpmove|pkgacct)-(.*).tar.gz$') if match := file_re.match(filename): self.user = match.group(1) else: raise CpuserError( f"{filename} does not follow the expected cpmove/pkgacct " "filename pattern." ) else: self.user = user main_data = self._read_userdata( user=self.user, data_dir=data_dir, pkgacct=pkgacct, domfile='main', required={ 'main_domain': str, 'addon_domains': dict, 'parked_domains': list, 'sub_domains': list, }, tar=tar, ) dom_data = self._read_userdata( user=self.user, domfile=main_data['main_domain'], required={'documentroot': str}, data_dir=data_dir, pkgacct=pkgacct, tar=tar, ) # populate primary domain self.primary = UserDomain( domain=main_data['main_domain'], has_ssl=dom_data['has_ssl'], docroot=dom_data['documentroot'], ) # populate addon domains self.addons = [] addon_subs = set() for addon, addon_file in main_data['addon_domains'].items(): addon_subs.add(addon_file) addon_data = self._read_userdata( user=self.user, domfile=addon_file, required={'documentroot': str}, data_dir=data_dir, pkgacct=pkgacct, tar=tar, ) self.addons.append( UserDomain( subdom=addon_file, domain=addon, has_ssl=addon_data['has_ssl'], docroot=addon_data['documentroot'], ) ) # populate parked domains self.parked = [] for parked in main_data['parked_domains']: self.parked.append( UserDomain( domain=parked, has_ssl=False, docroot=self.primary.docroot ) ) # populate subdomains self.subs = [] for sub in main_data['sub_domains']: if all_subs or sub not in addon_subs: sub_data = self._read_userdata( user=self.user, domfile=sub, required={'documentroot': str}, data_dir=data_dir, pkgacct=pkgacct, tar=tar, ) self.subs.append( UserDomain( domain=sub, has_ssl=sub_data['has_ssl'], docroot=sub_data['documentroot'], ) ) @staticmethod def from_pkgacct(path: str) -> 'UserData': """Alternate constructor to read userdata from a pkgacct/cpmove file""" try: with tarfile.open(path, 'r:gz') as tar: return UserData(pkgacct=path, tar=tar) except FileNotFoundError as exc: raise CpuserError(exc) from exc @classmethod def _read_userdata( cls, user: str, data_dir: str, pkgacct: Union[dict, None], domfile: str, required: dict, tar: Union[tarfile.TarFile, None], ): if pkgacct: return cls._read_from_pkgacct(pkgacct, domfile, required, tar) return cls._read_userdata_file(user, domfile, required, data_dir) @staticmethod def _tar_extract(tar: tarfile.TarFile, path: str): # docs say non-file members return None and missing files raise KeyError # This makes it return None in both error cases try: return tar.extractfile(path) except KeyError: return None @classmethod def _read_from_pkgacct( cls, tar_path: str, domfile: str, required: dict, tar: tarfile.TarFile ) -> dict: prefix = Path(tar_path).name[:-7] path = f"{prefix}/userdata/{domfile}" contents = cls._tar_extract(tar, path).read() has_ssl = cls._tar_extract(tar, f"{path}_SSL") is not None if not contents: raise CpuserError( f"{path} was not a file in the contents of {tar_path}" ) try: data = yaml.load(str(contents, 'utf-8'), Loader=yaml.SafeLoader) if not isinstance(data, dict): raise ValueError except ValueError as exc: raise CpuserError( f'{path} inside {tar_path} could not be parsed' ) from exc for key, req_type in required.items(): if key not in data: raise CpuserError(f'{path} is missing {key!r}') if not isinstance(data[key], req_type): raise CpuserError(f'{path} contains invalid data for {key!r}') data['has_ssl'] = has_ssl return data def __repr__(self): if self.pkgacct: return f'UserData(pkgacct={self.pkgacct!r})' return f'UserData({self.user!r})' @property def __dict__(self): return { 'user': self.user, 'primary': vars(self.primary), 'addons': [vars(x) for x in self.addons], 'parked': [vars(x) for x in self.parked], 'subs': [vars(x) for x in self.subs], } @property def all_roots(self) -> list[str]: """All site document roots (list)""" all_dirs = {self.primary.docroot} all_dirs.update([x.docroot for x in self.subs]) all_dirs.update([x.docroot for x in self.addons]) return list(all_dirs) @property def merged_roots(self) -> list[str]: """Merged, top-level document roots for a user (list)""" merged = [] for test_path in sorted(self.all_roots): head, tail = os.path.split(test_path) while head and tail: if head in merged: break head, tail = os.path.split(head) else: if test_path not in merged: merged.append(test_path) return merged @staticmethod def _read_userdata_file( user: str, domfile: str, required: dict, data_dir: str ) -> dict: """Internal helper function for UserData to strictly parse YAML files""" path = os.path.join(data_dir, user, domfile) try: with open(path, encoding='utf-8') as handle: data = yaml.load(handle, Loader=yaml.SafeLoader) if not isinstance(data, dict): raise ValueError except OSError as exc: raise CpuserError(f'{path} could not be opened') from exc except ValueError as exc: raise CpuserError(f'{path} could not be parsed') from exc for key, req_type in required.items(): if key not in data: raise CpuserError(f'{path} is missing {key!r}') if not isinstance(data[key], req_type): raise CpuserError(f'{path} contains invalid data for {key!r}') data['has_ssl'] = os.path.isfile(f'{path}_SSL') return data class UserDomain: """Object representing a cPanel domain in ``rads.UserData()`` Attributes: domain (str): domain name has_ssl (bool): True/False if the domain has ssl docroot (str): document root on the disk subdom (str|None): if this is an addon domain, this is the subdomain it's parked on which is also its config's filename Hint: vars() can be run on this object to convert it into a dict """ __module__ = 'rads' def __init__( self, domain: str, has_ssl: bool, docroot: str, subdom: Union[str, None] = None, ): self.domain = domain self.has_ssl = has_ssl self.docroot = docroot self.subdom = subdom def __repr__(self): if self.subdom: return ( f"UserDomain(domain={self.domain!r}, has_ssl={self.has_ssl!r}, " f"docroot={self.docroot!r}, subdom={self.subdom!r})" ) return ( f"UserDomain(domain={self.domain!r}, has_ssl={self.has_ssl!r}, " f"docroot={self.docroot!r})" ) @property def __dict__(self): myvars = {} for attr in ('domain', 'has_ssl', 'docroot'): myvars[attr] = getattr(self, attr) if self.subdom is not None: myvars['subdom'] = self.subdom return myvars