# Copyright (c) 2014 Kontron Europe GmbH # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA from array import array from .errors import CompletionCodeError, DecodingError from .utils import check_completion_code, ByteBuffer from .msgs import create_request_by_name from .msgs import constants from .event import EVENT_ASSERTION, EVENT_DEASSERTION from .helper import clear_repository_helper from .state import State class Sel(object): def get_sel_entries_count(self): info = SelInfo(self.send_message_with_name('GetSelInfo')) return info.entries def get_sel_reservation_id(self): rsp = self.send_message_with_name('ReserveSel') return rsp.reservation_id def _clear_sel(self, cmd, reservation): rsp = self.send_message_with_name('ClearSel', reservation_id=reservation, cmd=cmd) return rsp.status.erase_in_progress def clear_sel(self, retry=5): clear_repository_helper(self.get_sel_reservation_id, self._clear_sel, retry) def delete_sel_entry(self, record_id, reservation=0): rsp = self.send_message_with_name('DeleteSelEntry', reservation_id=reservation, record_id=record_id) return rsp.record_id def get_and_clear_sel_entry(self, record_id): """Atomically gets and clears the specified SEL record""" while True: reservation = self.get_sel_reservation_id() try: sel_entry, _ = self.get_sel_entry(record_id, reservation) except CompletionCodeError as e: if e.cc == constants.CC_RES_CANCELED: continue else: raise try: self.delete_sel_entry(record_id, reservation) except CompletionCodeError as e: if e.cc == constants.CC_RES_CANCELED: continue else: raise return sel_entry def get_sel_entry(self, record_id, reservation=0): ENTIRE_RECORD = 0xff req = create_request_by_name('GetSelEntry') req.reservation_id = reservation req.record_id = record_id req.offset = 0 self.max_req_len = ENTIRE_RECORD record_data = ByteBuffer() while True: req.length = self.max_req_len if (self.max_req_len != 0xff and (req.offset + req.length) > 16): req.length = 16 - req.offset rsp = self.send_message(req) if rsp.completion_code == constants.CC_CANT_RET_NUM_REQ_BYTES: if self.max_req_len == 0xff: self.max_req_len = 16 else: self.max_req_len -= 1 continue else: check_completion_code(rsp.completion_code) record_data.extend(rsp.record_data) req.offset = len(record_data) if len(record_data) >= 16: break return (SelEntry(record_data), rsp.next_record_id) def sel_entries(self): """Generator which returns all SEL entries.""" START_SEL_RECORD_ID = 0 END_SEL_RECORD_ID = 0xffff if self.get_sel_entries_count() == 0: return reservation_id = self.get_sel_reservation_id() next_record_id = START_SEL_RECORD_ID while True: (sel_entry, next_record_id) = self.get_sel_entry(next_record_id, reservation_id) yield sel_entry if next_record_id == END_SEL_RECORD_ID: break def get_sel_entries(self): """Return all SEL entries as a list.""" return list(self.sel_entries()) class SelInfo(State): def _from_response(self, rsp): self.version = rsp.version self.entries = rsp.entries self.free_bytes = rsp.free_bytes self.most_recent_addition = rsp.most_recent_addition self.most_recent_erase = rsp.most_recent_erase self.operation_support = [] if rsp.operation_support.get_sel_allocation_info: self.operation_support.append('get_sel_allocation_info') if rsp.operation_support.reserve_sel: self.operation_support.append('reserve_sel') if rsp.operation_support.partial_add_sel_entry: self.operation_support.append('partial_add_sel_entry') if rsp.operation_support.delete_sel: self.operation_support.append('delete_sel') if rsp.operation_support.overflow_flag: self.operation_support.append('overflow_flag') class SelEntry(State): TYPE_SYSTEM_EVENT = 0x02 TYPE_OEM_TIMESTAMPED_RANGE = list(range(0xc0, 0xe0)) TYPE_OEM_NON_TIMESTAMPED_RANGE = list(range(0xe0, 0x100)) def __str__(self): raw = '[%s]' % (' '.join(['0x%02x' % b for b in self.data])) string = [] string.append('SEL Record ID 0x%04x' % self.record_id) string.append(' Raw: %s' % raw) string.append(' Type: %d' % self.type) string.append(' Timestamp: %d' % self.timestamp) string.append(' Generator: %d' % self.generator_id) string.append(' EvM rev: %d' % self.evm_rev) string.append(' Sensor Type: 0x%02x' % self.sensor_type) string.append(' Sensor Number: %d' % self.sensor_number) string.append(' Event Direction: %d' % self.event_direction) string.append(' Event Type: 0x%02x' % self.event_type) string.append(' Event Data: %s' % array('B', self.event_data).tolist()) return "\n".join(string) @staticmethod def type_to_string(entry_type): string = None if entry_type == SelEntry.TYPE_SYSTEM_EVENT: string = 'System Event' elif entry_type in SelEntry.TYPE_OEM_TIMESTAMPED_RANGE: string = 'OEM timestamped (0x%02x)' % entry_type elif entry_type in SelEntry.TYPE_OEM_NON_TIMESTAMPED_RANGE: string = 'OEM non-timestamped (0x%02x)' % entry_type return string def _from_response(self, data): if len(data) != 16: raise DecodingError('Invalid SEL record length (%d)' % len(data)) self.data = data # pop will change data, therefore copy it buffer = ByteBuffer(data) self.record_id = buffer.pop_unsigned_int(2) self.type = buffer.pop_unsigned_int(1) if (self.type != self.TYPE_SYSTEM_EVENT and self.type not in self.TYPE_OEM_TIMESTAMPED_RANGE and self.type not in self.TYPE_OEM_NON_TIMESTAMPED_RANGE): raise DecodingError('Unknown SEL type (0x%02x)' % self.type) self.timestamp = buffer.pop_unsigned_int(4) self.generator_id = buffer.pop_unsigned_int(2) self.evm_rev = buffer.pop_unsigned_int(1) self.sensor_type = buffer.pop_unsigned_int(1) self.sensor_number = buffer.pop_unsigned_int(1) event_desc = buffer.pop_unsigned_int(1) if event_desc & 0x80: self.event_direction = EVENT_DEASSERTION else: self.event_direction = EVENT_ASSERTION self.event_type = event_desc & 0x7f self.event_data = [buffer.pop_unsigned_int(1) for _ in range(3)]