"""Restic dataclasses""" from typing import Literal, Optional, Union, TypedDict, TYPE_CHECKING import os from urllib.parse import unquote as url_unquote import shlex from pathlib import PurePath from collections.abc import Generator from datetime import datetime from dataclasses import dataclass from subprocess import CompletedProcess, run as s_run import sys from cproc import Proc if sys.version_info[0] == 3 and sys.version_info[1] < 11: import dateutil ISOFORMAT_USE_DATEUTIL = True else: ISOFORMAT_USE_DATEUTIL = False if TYPE_CHECKING: from . import Restic class SQLBackupGroupDict(TypedDict): """Return format of SQLBackupGroup.serialize()""" time: int # unix time user: str type: Literal["mysql", "pgsql"] failover: bool fin: str # snapshot ID dbs: dict[str, str] # dbname -> snapshot ID class BackupDict(TypedDict): """Return format of Backup.serialize()""" failover: bool snap: str # snapshot ID time: int # unix time type: Literal["homedir", "dirs", "pkgacct"] user: str @dataclass(init=True) class ResticRepo: """Dataclass holding restic/S3 keys and bucket name Args: bucket(str): bucket name restic_pass(str): restic password access_key(str): S3 access key secret_key(str): S3 access key """ __module__ = 'restic' bucket: str restic_pass: str access_key: str secret_key: str class Snapshot: """Represents a restic snapshot Attributes: restic (Restic): restic instance this snapshot was found in id (str): short snapshot ID tags (list[str]): tags supplied when the snapshot was created datetime (datetime.datetime): backup creation time timestamp (int): backup creation time paths (list[str]): top level paths the snapshot contains """ __module__ = 'restic' def __init__(self, *, restic: 'Restic', data: dict): self.restic = restic self.id = str(data['id']) self.tags: list[str] = [ _tag_quote(x, url_unquote) for x in data.get('tags', []) ] # time uses RFC3339 / ISO8601 format # ex: 2023-01-22T11:07:33.30417099-05:00 if ISOFORMAT_USE_DATEUTIL: self.datetime: datetime = dateutil.parser.isoparse(data['time']) else: self.datetime: datetime = datetime.fromisoformat(data['time']) self.timestamp = int(self.datetime.timestamp()) self.paths: list[str] = list(data['paths']) def __repr__(self): return f'Snapshot<{self.id}>' def listdir( self, path: Union[str, PurePath] ) -> list[Union['SnapFile', 'SnapDir']]: """Returns a list of files/dirs in this snapshot Args: path (str | PurePath): full path inside the snapshot to list the contents of Raises: ValueError: requested path was not a full path ResticError: Any error listing snapshot contents from restic Returns: list[SnapFile | SnapDir]: files or directories """ return self.restic.listdir(snap=self, path=path) def scandir( self, path: Union[str, PurePath] ) -> Generator[Union['SnapFile', 'SnapDir'], None, None]: """Iterates files/dirs in this snapshot Args: path (str | PurePath): full path inside the snapshot to list the contents of Raises: ValueError: requested path was not a full path ResticError: Any error listing snapshot contents from restic Yields: Generator[SnapFile | SnapDir, None, None]: files or directories """ return self.restic.scandir(snap=self, path=path) def restore( self, *, includes: Optional[list[Union[str, PurePath]]] = None, excludes: Optional[list[Union[str, PurePath]]] = None, target: Union[str, PurePath] = '/', ) -> 'ResticCmd': """Calls a ResticCmd to restore from this snapshot Args: includes (list[str | PurePath], optional): --include paths excludes (list[str | PurePath], optional): --exclude paths target (str | PurePath): base directory prefix to restore to. Defaults to '/', which restores to the original path Returns: ResticCmd: restic command executor """ return self.restic.restore( self, includes=includes, excludes=excludes, target=target ) def dump(self, filename: Union[str, PurePath]) -> 'ResticCmd': """Crafts a ResticCmd to dump a file from this snapshot Args: filename (str | PurePath): filename to retrieve Returns: ResticCmd: restic command executor """ return self.restic.dump(self, filename) def forget(self, *, prune: bool = False, no_lim: bool = True): """Run restic forget on this snapshot Args: prune (bool): whether to automatically run prune if at least one snapshot was removed. Defaults to False no_lim (bool): do not CPU limit the command as it runs regardless of the lim arg in Restic Raises: ResticError: if the restic forget command fails """ return self.restic.forget(self.id, prune=prune, no_lim=no_lim) def _tag_quote(tag: str, func) -> str: if ':' in tag[1:-1]: prefix, suffix = tag.split(':', maxsplit=1) return f"{prefix}:{func(suffix)}" return tag class ResticCmd: """Return type of Restic's build() function. Can be cast to a str() to get a shell-escaped representation of the command. Attributes: .cmd (list[str]): Popen arguments """ __module__ = 'restic' def __init__( self, cmd, restic: 'Restic', ): self.cmd = cmd self.restic = restic def __str__(self) -> str: return shlex.join(self.cmd) def run( self, *, stdout: Union[int, None] = Proc.DEVNULL, stderr: Union[int, None] = Proc.PIPE, stdin: Union[int, None] = None, input: Union[str, None] = None, # pylint: disable=redefined-builtin check: bool = False, timeout: Union[int, float, None] = None, no_lim: bool = False, **kwargs, ) -> CompletedProcess: """Execute the restic command and return a CompletedProcess Args: stdout (int | None): stdout redirection. Defaults to DEVNULL stderr (int | None): stderr redirection. Defaults to PIPE (because ResticError will need this if you raise it using the result) stdin (int | None): stdin redirection. Defaults to None input (bytes | str | None): text to send to stdin. Do not use the stdin= kwarg if you use input= timeout (int | float | None): optional command timeout check (bool): if set True, raise CalledProcessError on non-zero exit codes. Defaults to False no_lim (bool): do not CPU limit the command as it runs regardless of the lim arg in Restic Raises: CalledProcessError: program exited with a non-zero exit code and check=True was specified TimeoutExpired: program took too long to execute and timeout= was specified Returns: CompletedProcess: process results """ kwargs.update( { 'encoding': 'UTF-8', 'env': self.restic.env, 'stdout': stdout, 'stderr': stderr, 'check': check, 'shell': False, 'timeout': timeout, } ) if input is None: kwargs['stdin'] = stdin else: kwargs['input'] = input if no_lim or self.restic.lim is None: # pylint: disable=subprocess-run-check return s_run(self.cmd, **kwargs) return Proc.run(self.cmd, lim=self.restic.lim, **kwargs) def execv(self): """runs os.execv with the given restic args/env, replacing the current process with restic""" os.environ.update(self.restic.env) os.execv(self.cmd[0], self.cmd) def proc( self, *, stdout: Union[int, None] = Proc.DEVNULL, stderr: Union[int, None] = Proc.PIPE, stdin: Union[int, None] = None, no_lim: bool = False, **kwargs, ) -> Proc: """Start and return the command Args: stdout (int | None): stdout redirection. Defaults to DEVNULL stderr (int | None): stderr redirection. Defaults to PIPE (because ResticError will need this if you raise it using the result) stdin (int | None): stdin redirection. Defaults to None no_lim (bool): do not CPU limit the command as it runs regardless of the lim arg in Restic Returns: cproc.Proc: Running process """ kwargs.update( { 'encoding': 'UTF-8', 'env': self.restic.env, 'stdout': stdout, 'stderr': stderr, 'stdin': stdin, 'shell': False, } ) if no_lim or self.restic.lim is None: # pylint: disable=subprocess-run-check,consider-using-with return Proc(self.cmd, lim=None, **kwargs) return Proc(self.cmd, lim=self.restic.lim, **kwargs) @dataclass(init=True) class SnapPath: """Base class for a remote path in a restic snapshot. When Restic instantiates this object, it'll be returned as one of its subclasses, ``SnapDir`` or ``SnapFile``""" __module__ = 'restic' # SnapPath intentionally does not subclass os.PathLike or os.DirEntry # because the path isn't mounted anywhere that normal filesystem ops # will work against it snapshot: Snapshot restic: 'Restic' name: str type: str path: str uid: int gid: int mode: Union[int, None] permissions: Union[str, None] def __new__(cls, type: str, **_): # pylint: disable=redefined-builtin if type == 'dir': return object.__new__(SnapDir) return object.__new__(SnapFile) def __str__(self): return self.path class SnapFile(SnapPath): """A remote file in a restic snapshot Attributes: snapshot (Snapshot): snapshot instance this file was found in restic (Restic): restic instance this file was found in name (str): base filename type (str): "file" path (str): full path uid (int): original UID of the file when backed up gid (int): original GID of the file when backed up mode (int | None): original file mode when backed up permissions (str | None): original file permissions when backed up """ __module__ = 'restic' def dump(self) -> 'ResticCmd': """Crafts a ResticCmd to dump this file's contents Returns: ResticCmd: restic command executor """ return self.restic.dump(snap=self.snapshot, filename=self.path) class SnapDir(SnapPath): """A remote directory in a restic snapshot Attributes: snapshot (Snapshot): snapshot instance this directory was found in restic (Restic): restic instance this directory was found in name (str): base directory name type (str): "dir" path (str): full path uid (int): original UID of the directory when backed up gid (int): original GID of the directory when backed up mode (int): original directory mode when backed up permissions (str | None): original directory permissions when backed up """ __module__ = 'restic' def listdir(self) -> list[Union['SnapFile', 'SnapDir']]: """List files/dirs found in this path Raises: ResticError: Any error listing snapshot contents from restic Returns: list[SnapFile | SnapDir]: files or directories """ return self.restic.listdir(snap=self.snapshot, path=self.path) def scandir(self) -> Generator[Union['SnapFile', 'SnapDir'], None, None]: """Iterates over files/dirs found in this path Raises: ResticError: Any error listing snapshot contents from restic Yields: Generator[SnapFile | SnapDir, None, None]: files or directories """ return self.restic.scandir(snap=self.snapshot, path=self.path) class Backup: """Object representing a restic snapshot formatted in a way specific to backups3.x's backup-runner. Args: snap (Snapshot): snapshot from Restic.snapshots() Raises: KeyError: snapshot did not have required tags (created manually?) ValueError: __new__() tried to return a sql subclass, but its start: timestamp tag was invalid Returns: (Backup, SQLBackupGroup, SQLBackupItem): depending on the type of backups 3.x snapshot Attributes: snap (Snapshot): snapshot object failover (bool): whether this is a failover copy user (str): cPanel username or root type (str): mysql or pgsql time (int): creation timestamp """ __module__ = 'restic' def __new__(cls, snap: Snapshot): if Backup.get_label(snap, 'type') in ('mysql', 'pgsql'): if 'finished' in snap.tags: return object.__new__(SQLBackupGroup) return object.__new__(SQLBackupItem) return object.__new__(cls) # normal Backup() def __init__(self, snap: Snapshot): self.snap = snap self.failover = 'failover' in snap.tags self.user = self.get_label(snap, 'user') self.type = self.get_label(snap, 'type') if self.type in ('mysql', 'pgsql'): self.time = int(self.get_label(snap, 'start')) else: self.time = snap.timestamp @staticmethod def get_label(snap: Snapshot, name: str) -> str: """Search for the value of a tag if read in the format 'name:value' Args: snap (Snapshot): snapshot object name (str): name to search for Raises: KeyError: if the label was not found Returns: str: the value portion of the 'name:value' tag """ prefix = f'{name}:' for tag in snap.tags: if tag.startswith(prefix): return tag.split(':', 1)[1] raise KeyError(name) def serialize(self) -> dict: """Used internally by Restic.get_backups() if serialize=True""" ret = { 'snap': self.snap.id, 'time': self.time, 'user': self.user, 'type': self.type, 'failover': self.failover, } return ret class SQLBackupGroup(Backup): """Holds a group of SQL snapshots created by imh-backup-client, representing one sql backup run. Backups 3.x stores each database in its own snapshot, then artificially groups them together as one. Attributes: snap (Snapshot): snapshot object for the snapshot signifying the backup's completion. This snapshot contains no SQL data. See the ``.dbs`` attribute instead for that failover (bool): whether this is a failover copy user (str): cPanel username or root type (str): mysql or pgsql time (int): creation timestamp dbs (dict[str, SQLBackupItem]): database names mapped to SQLBackupItems """ __module__ = 'restic' def __init__(self, snap: Snapshot): super().__init__(snap) self.dbs: dict[str, SQLBackupItem] = {} def serialize(self) -> SQLBackupGroupDict: """Used internally by Restic.get_backups(serialize=True)""" ret = super().serialize() ret['fin'] = ret.pop('snap') ret['dbs'] = {k: v.serialize() for k, v in self.dbs.items()} return ret class SQLBackupItem(Backup): """Represents one SQL snapshot in a ``SQLBackupGroup``. Attributes: snap (Snapshot): snapshot object failover (bool): whether this is a failover copy user (str): cPanel username or root type (str): mysql or pgsql time (int): creation timestamp dbname (str): database name """ __module__ = 'restic' def __init__(self, snap: Snapshot): super().__init__(snap) self.dbname = Backup.get_label(snap, 'dbname') def dump(self) -> ResticCmd: """Crafts a ResticCmd to dump this file's contents Returns: ResticCmd: restic command executor """ return self.snap.restic.dump(snap=self.snap, filename=self.dbname) def serialize(self) -> dict: """Used internally by Restic.get_backups(serialize=True)""" return self.snap.id BakTypeStr = Literal["mysql", "pgsql", "homedir", "dirs", "pkgacct"] UserBackupDicts = dict[ BakTypeStr, Union[list[BackupDict], list[SQLBackupGroupDict]] ] UserBackups = dict[BakTypeStr, Union[list[Backup], list[SQLBackupGroup]]]