"""VZ / HA functions""" import enum import json import shlex import subprocess from pathlib import Path from typing import Optional, TypedDict, Union from collections.abc import Iterable import distro class CT: """Create a CT object representing a VZ Container Args: ctid (str): container ID to collect information from. All vzctl/prlctl actions will interact with this container. Raises: VZError: if vzlist fails to run """ def __init__(self, ctid: str): self.ctid = str(ctid) # List of options for vzlist opts = [ "cpulimit", "cpus", "cpuunits", "ctid", "description", "device", "disabled", "diskinodes", "diskspace", "hostname", "ip", "laverage", "name", "netfilter", "numiptent", "numproc", "onboot", "ostemplate", "physpages", "private", "root", "status", "swappages", "uptime", "uuid", "veid", ] # Build vzlist command command = ["vzlist", "-jo", ",".join(opts), self.ctid] # Execute vzlist command try: result = subprocess.run( command, capture_output=True, encoding='utf-8', check=True ) except subprocess.CalledProcessError as exc: raise VZError(f"{exc.stderr}") from exc except OSError as exc: raise VZError(str(exc)) from exc # Parse vzlist command result data: dict = json.loads(result.stdout)[0] self.cpulimit: int = data.get("cpulimit", 0) self.cpus: int = data.get("cpus", 0) self.cpuunits: int = data.get("cpuunits", 0) self.ctid: Union[int, str] = data.get("ctid", "") self.description: str = data.get("description", "") self.device: str = data.get("device", "") self.disabled: bool = data.get("disabled", False) self.diskinodes: dict = data.get("diskinodes", {}) self.diskspace: dict = data.get("diskspace", {}) self.hostname: str = data.get("hostname", "") self.ip: list[str] = data.get("ip", []) self.laverage: list[float] = data.get("laverage", []) self.name: str = data.get("name", "") self.netfilter: str = data.get("netfilter", "") self.numiptent: dict = data.get("numiptent", {}) self.numproc: dict = data.get("numproc", {}) self.onboot: bool = data.get("onboot", False) self.ostemplate: str = data.get("ostemplate", "") self.physpages: dict = data.get("physpages", {}) self.private: str = data.get("private", "") self.root: str = data.get("root", "") self.status: str = data.get("status", "") self.swappages: dict = data.get("swappages", {}) self.uptime: float = data.get("uptime", 0.0) self.uuid: str = data.get("uuid", "") self.veid: str = data.get("veid", "") def __repr__(self): attrs = ", ".join( f"{key}={repr(getattr(self, key))}" for key in self.__dict__ ) return f"CT({attrs})" def start(self) -> subprocess.CompletedProcess: """Starts the container using 'vzctl start' Returns: subprocess.CompletedProcess: result of the 'vzctl start' command """ command = ["vzctl", "start", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def stop(self) -> subprocess.CompletedProcess: """Stops the container using 'vzctl stop' Returns: subprocess.CompletedProcess: result of the 'vzctl stop' command """ command = ["vzctl", "stop", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def restart(self) -> subprocess.CompletedProcess: """Restarts the container using 'vzctl restart' Returns: subprocess.CompletedProcess: result of the 'vzctl restart' command """ command = ["vzctl", "restart", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def mount(self) -> subprocess.CompletedProcess: """Mounts the container's filesystem using the 'vzctl mount' command Returns: subprocess.CompletedProcess: result of the 'vzctl mount' command """ command = ["vzctl", "mount", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def umount(self) -> subprocess.CompletedProcess: """Unmounts the container's filesystem using the 'vzctl umount' command Returns: subprocess.CompletedProcess: result of the 'vzctl umount' command """ command = ["vzctl", "umount", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def destroy(self) -> subprocess.CompletedProcess: """Destroys the container using the 'vzctl destroy' command Returns: subprocess.CompletedProcess: result of the 'vzctl destroy' command """ command = ["vzctl", "destroy", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def register(self, dst: str) -> subprocess.CompletedProcess: """Registers the container at the specified destination using the 'vzctl register' command Args: dst (str): The destination path where the container is to be registered Returns: subprocess.CompletedProcess: result of the 'vzctl register' command """ command = ["vzctl", "register", dst, self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def unregister(self) -> subprocess.CompletedProcess: """Unregisters the container using the 'vzctl unregister' command Returns: subprocess.CompletedProcess: result of the 'vzctl unregister' command """ command = ["vzctl", "unregister", self.ctid] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def clone(self, name: str) -> subprocess.CompletedProcess: """Clones the container with the specified name using the 'prlctl clone' Args: name (str): The name for the new cloned container Returns: subprocess.CompletedProcess: result of the 'prlctl clone' command """ command = ["prlctl", "clone", self.ctid, "--name", str(name)] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def exec2(self, cmd: str) -> subprocess.CompletedProcess: """Executes the specified command within the container using the 'vzctl exec2' command Args: cmd (str): The command to execute within the container. If args for the command come from user input, you may want to use shlex.join or shlex.quote on them Returns: subprocess.CompletedProcess: result of the 'vzctl exec2' command """ command = ["vzctl", "exec2", self.ctid, *shlex.split(cmd)] return subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) def set(self, **kwargs: str) -> subprocess.CompletedProcess: """Sets various configuration options for the container This method constructs a command to modify the container's configuration using the `vzctl set` command. It accepts keyword arguments where each key-value pair represents an option and its corresponding value to be set. The method appends "--save" to the command to save the changes permanently. After running the command, the method reinitializes the object to reflect the updated configuration. Args: **kwargs: Arbitrary keyword arguments representing configuration options and values. Returns: subprocess.CompletedProcess: The result of the `subprocess.run` call, containing information about the execution of the command """ arg_list = [] for key, value in kwargs.items(): arg_list.extend([f"--{key}", f"{value}"]) arg_list.append("--save") command = ["vzctl", "set", self.ctid, *arg_list] result = subprocess.run( command, capture_output=True, encoding='utf-8', check=False ) # re-initialize attributes self.__init__(self.ctid) # pylint:disable=unnecessary-dunder-call return result def fix_ctid(self, start: bool = True) -> None: """A tool for offline fixing CTID fields of VZ7 containers. Re-registers the container while offline. This method stops the container, unregisters it, renames the container directory from the current CTID to the container name, updates the instance attributes accordingly, and re-registers the container with the new details. Optionally, it can restart the container if the `start` parameter is set to True. Args: start (bool): If True, the container will be restarted after the CTID fix is applied. Defaults to True Returns: None """ self.stop() self.unregister() src = Path(f"/vz/private/{self.ctid}") dst = Path(f"/vz/private/{self.name}") src.rename(dst) self.private = str(dst) self.root = f"/vz/root/{self.name}" self.ctid = self.veid = self.name self.register(self.private) if start: self.start() class ListCmd(enum.Enum): """vz list base commands""" VZLIST = ["/usr/sbin/vzlist", "-H"] PRLCTL = ["/usr/bin/prlctl", "list", "-H"] class VZError(Exception): """Raised for errors with VZ and OpenVZ""" def is_vz() -> bool: """Checks if host is a Virtuozzo node""" return bool("Virtuozzo" in distro.name()) def is_vz7() -> bool: """Check if host is a Virtuozzo 7 node""" return bool(is_vz() and distro.major_version() == "7") def is_vps() -> bool: """Check if host is a Virtuozzo container""" try: with open("/proc/vz/veinfo", encoding="ascii") as handle: ve_data = handle.read().strip() except OSError: return False # if veinfo doesn't exist this can't be a vps if ve_data.count("\n") != 0: return False try: veid = int( ve_data.split()[0] ) # if veinfo contains >1 line, this is a CL or VZ node except ValueError: return True # veinfo contains a UUID return veid != 0 class VeInfoDict(TypedDict): """Values used in the return type of rads.vz.veinfo(). procs is the number of processes in the container; ips is a list of ip addresses""" procs: int ips: list[str] def veinfo() -> dict[str, VeInfoDict]: """Read running containers from /proc/vz/veinfo. VMs are not listed. Returns: dict[str, VeInfoDict]: container IDs mapped to dicts. Each dict contains "procs" (the number of processes in the container) and "ips" (all ip addresses assigned to the container) """ ret = {} # VZ Docs say the format of each line is # CT_ID reserved number_of_processes IP_address [IP_address ...] # where reserved isn't used and will always be 0 with open("/proc/vz/veinfo", encoding="ascii") as handle: for line in handle: ctid, _, procs, *ips = line.split() if ctid == '0': continue ret[ctid] = {'procs': int(procs), 'ips': ips} return ret def _exec(cmd: Iterable) -> subprocess.CompletedProcess[str]: """For executing prlctl or vzlist""" try: ret = subprocess.run( cmd, capture_output=True, encoding="utf-8", check=False ) except FileNotFoundError as exc: raise VZError(exc) from exc if ret.returncode: # nonzero raise VZError(f"Error running {cmd!r}. stderr={ret.stderr!r}") return ret def is_ct_running(ctid: Union[str, int]) -> bool: """Checks if a container is running Args: ctid: container ID to check Returns: True if the container is running on this node, False if it isn't or if some other error occurs """ try: ret = subprocess.run( ["/usr/bin/prlctl", "list", "-H", "-o", "status", str(ctid)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, encoding="utf-8", check=True, ) except FileNotFoundError: pass # retry with vzlist except subprocess.CalledProcessError: return False else: return ret.stdout.split()[0] == "running" try: ret = _exec(["/usr/sbin/vzlist", "-H", "-o", "status", str(ctid)]) except VZError: # CTID probably doesn't exist return False return ret.stdout.split()[0] == "running" def uuid2ctid(uuid: str) -> str: """get the legacy CTID of a container Args: uuid: VZ UUID to find the legacy CTID for Raises: VZError: if the prlctl command fails """ ret = _exec(["/usr/bin/prlctl", "list", "-H", "-o", "name", uuid]) return ret.stdout.split()[0] def ctid2uuid(ctid: Union[int, str]) -> str: """Obtain the UUID of a container from its legacy CTID Warning: This does not work on VZ4 Args: ctid: Legacy CTID to get the UUID for Raises: VZError: if the prlctl command fails """ ret = _exec(["/usr/bin/prlctl", "list", "-H", "-o", "uuid", str(ctid)]) return ret.stdout.split()[0].strip(r"{}") def get_envid(ctid: Union[int, str]) -> str: """Obtain the EnvID of a container Note: This determines what the subdirectory of /vz/root and /vz/private will be. This also has to run on VZ4 which lacks the envid field or prlctl, so we just return the CTID Args: ctid: legacy CTID to find the envid for Raises: VZError: if the prlctl command fails or /etc/virtuozzo-release is missing """ try: with open("/etc/virtuozzo-release", encoding="utf-8") as handle: if "Virtuozzo release 4" in handle.read(): return str(ctid) except FileNotFoundError as exc: raise VZError(exc) from exc ret = _exec(["/usr/bin/prlctl", "list", "-H", "-o", "envid", str(ctid)]) return ret.stdout.split()[0] def _list_cmd( opts: list, args: Optional[list], list_cmd: Optional[ListCmd] ) -> tuple[ListCmd, list[str]]: """Deterines the cmd to run based on VZ version for get_cts() Args: opts: items to send into ``-o/--output`` args: optional params to send such as ``--all`` list_cmd (ListCmd): set this to ListCmd.VZLIST to switch to that command """ if list_cmd == ListCmd.VZLIST: conv_opts = {x: ("veid" if x == "envid" else x) for x in opts} else: # prctl refers to 'ctid' as 'name' conv_opts = {x: ("name" if x == "ctid" else x) for x in opts} cmd = list_cmd.value.copy() if args is not None: cmd.extend(args) # forces opts's vals to be in the same order as args cmd_opts = ",".join([conv_opts[x] for x in opts]) cmd.extend(["-o", cmd_opts]) return list_cmd, cmd def _read_row( list_cmd: ListCmd, cmd: list[str], row: list[str], opts: list[str] ) -> dict[str, str]: # if number of rows matches requested options, return normally if len(row) == len(opts): return {x: row[i] for i, x in enumerate(opts)} # handle an edge case: prlctl can print missing ostemplates as '' while # vzlist prints it as '-', making the prlctl one harder to parse if ( list_cmd == ListCmd.PRLCTL and len(row) == len(opts) - 1 and "ostemplate" in opts ): opts = opts.copy() opts.remove("ostemplate") ret = {x: row[i] for i, x in enumerate(opts)} ret["ostemplate"] = "-" return ret raise VZError( f"{shlex.join(cmd)} expected {len(opts)} columns," f" but got {len(row)}: {row}" ) def get_cts( opts: Optional[list] = None, args: Optional[list] = None, list_cmd: ListCmd = ListCmd.PRLCTL, ) -> list[dict[str, str]]: """Returns containers according to platform as a list of dicts Args: opts: items to send into -o/--output (will default to ['ctid'] if None) args: optional params to send such as --all list_cmd (ListCmd): set this to ListCmd.VZLIST force using that command. Otherwise, we use prlctl. Raises: VZError: if the prlctl or vzlist command fails """ if not opts: opts = ["ctid"] ret = [] if not args and list_cmd == ListCmd.VZLIST and opts == ['ctid']: # if requesting just running ctids with vzlist, we can get that faster # from /proc/vz/veinfo for ctid in veinfo(): ret.append({'ctid': ctid}) return ret # process each line as a dict where keys are the arg and vals are the result list_cmd, cmd = _list_cmd(opts, args, list_cmd) for row in _exec(cmd).stdout.splitlines(): row = row.strip() if not row: continue # blank line ret.append(_read_row(list_cmd, cmd, row.split(), opts)) return ret