"""SA data functions""" import os import gzip import re from datetime import datetime, timedelta class SaUsage: """Holds a record from sadatarunner.sh or sa -cm as an object with attributes for each column Args: line (str): line from sa -cm to parse Hint: Use vars() on this to get a dict of the columns Attributes: user (str): username procs (int): num of processes executed perc_procs (float): percent of total processes real (float): real execution time perc_real (float): percent of real execution time from total cpu (float): cpu time in cp avg_iops (int): average iops per process iops (int): total iops (units of 1000) """ __module__ = 'rads' sa_re = re.compile( r'(?P[^\s]+)\s+' r'(?P\d+)\s+' r'(?P\d+\.?\d*)%\s+' r'(?P\d+\.?\d*)re\s+' r'(?P\d+\.?\d*)%\s+' r'(?P\d+\.?\d*)cp\s+' r'(?P\d+\.?\d*)%\s+' r'(?P\d+)avio\s+' r'(?P\d+)k\s*$' ) def __init__(self, line: str): match = SaUsage.sa_re.match(line) if not match: raise ValueError('Invalid sa line') # could be the header line gdict = match.groupdict() self.user = gdict['user'] self.procs = int(gdict['procs']) self.perc_procs = float(gdict['perc_procs']) self.real = float(gdict['real']) self.perc_real = float(gdict['perc_real']) self.cpu = float(gdict['cpu']) self.perc_cp = float(gdict['perc_cp']) self.avg_iops = int(gdict['avg_iops']) self.iops = int(gdict['iops']) def get_cp_usage(offset: str = 0) -> dict[str, SaUsage]: """Returns archived usage data collected by RADS in sadatarunner.sh Args: offset: number of days ago (default of 0 means today) Returns: mapping of usernames to ``SaUsage`` objects """ offset += 1 # get the time offset days ago # month needs to be the short string representation of the month (i.e. Jun) when = datetime.now() - timedelta(days=offset) month, day = when.strftime('%b %d').split() sadatadir = os.path.join('/var/log/sa/rads', month, day) try: histfiles = [ os.path.join(sadatadir, x) for x in os.listdir(sadatadir) # may raise FileNotFoundError if x.endswith('-avg.gz') ] except FileNotFoundError: return {} # nothing recorded for this day try: # get the latest histfile = sorted(histfiles, key=os.path.getmtime, reverse=True)[0] except IndexError: return {} # sadatadir contained no items try: with gzip.open(histfile, 'rt', encoding='ascii') as filehandle: contents = filehandle.read().splitlines() except OSError: return {} # invalid file? ret = {} for line in contents: try: usage = SaUsage(line) except ValueError: continue ret[usage.user] = usage return ret get_cp_usage.__module__ = 'rads'