# Copyright (c) 2014 Kontron Europe GmbH # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA from __future__ import print_function import os import codecs import struct import collections import hashlib import time from array import array from .errors import CompletionCodeError, HpmError, IpmiTimeoutError from .msgs import create_request_by_name from .msgs import constants from .utils import check_completion_code, bcd_search, chunks from .utils import py3dec_unic_bytes_fix, py3_array_tobytes from .state import State from .fields import VersionField PROPERTY_GENERAL_PROPERTIES = 0 PROPERTY_CURRENT_VERSION = 1 PROPERTY_DESCRIPTION_STRING = 2 PROPERTY_ROLLBACK_VERSION = 3 PROPERTY_DEFERRED_VERSION = 4 PROPERTY_OEM = list(range(192, 255)) ACTION_BACKUP_COMPONENT = 0x00 ACTION_PREPARE_COMPONENT = 0x01 ACTION_UPLOAD_FOR_UPGRADE = 0x02 ACTION_UPLOAD_FOR_COMPARE = 0x03 CC_LONG_DURATION_CMD_IN_PROGRESS = 0x80 CC_GET_COMP_PROP_UPGRADE_NOT_SUPPORTED_OVER_INTF = 0x81 CC_GET_COMP_PROP_INVALID_COMPONENT = 0x82 CC_GET_COMP_PROP_INVALID_PROPERTIES_SELECTOR = 0x83 CC_INITIATE_UPGRADE_CMD_IN_PROGRESS = 0x80 CC_INITIATE_UPGRADE_INVALID_COMPONENT = 0x81 CC_QUERY_SELFTEST_COMPLETED = 0x00 CC_QUERY_SELFTEST_IN_PROGRESS = 0x80 CC_QUERY_SELFTEST_UPGRADE_NOT_SUPPORTED_OVER_INTF = 0x81 CC_QUERY_SELFTEST_NO_RESULTS_AVAILABLE = 0xD5 CC_ABORT_UPGRADE_CANNOT_ABORT = 0x80 CC_ABORT_UPGRADE_CANNOT_RESUME_OPERATION = 0x81 class Hpm(object): @staticmethod def _get_component_count(components): """Return the number of components.""" return bin(components).count('1') def get_target_upgrade_capabilities(self): rsp = self.send_message_with_name('GetTargetUpgradeCapabilities') return TargetUpgradeCapabilities(rsp) def get_component_property(self, component_id, property_id): rsp = self.send_message_with_name('GetComponentProperties', id=component_id, selector=property_id) return ComponentProperty.from_data(property_id, rsp.data) def get_component_properties(self, component_id): properties = [] for p in (PROPERTY_GENERAL_PROPERTIES, PROPERTY_CURRENT_VERSION, PROPERTY_DESCRIPTION_STRING, PROPERTY_ROLLBACK_VERSION, PROPERTY_DEFERRED_VERSION): try: prop = self.get_component_property(component_id, p) if prop is not None: properties.append(prop) except CompletionCodeError as e: if e.cc == CC_GET_COMP_PROP_INVALID_PROPERTIES_SELECTOR: continue return properties def find_component_id_by_descriptor(self, descriptor): caps = self.get_target_upgrade_capabilities() for component_id in caps.components: prop = self.get_component_property(component_id, PROPERTY_DESCRIPTION_STRING) if prop is not None: if prop.description == descriptor: return component_id return None def abort_firmware_upgrade(self): self.send_message_with_name('AbortFirmwareUpgrade') def initiate_upgrade_action(self, components_mask, action): """Initiate Upgrade Action. components: action: ACTION_BACKUP_COMPONENT = 0x00 ACTION_PREPARE_COMPONENT = 0x01 ACTION_UPLOAD_FOR_UPGRADE = 0x02 ACTION_UPLOAD_FOR_COMPARE = 0x03 """ if action in (ACTION_UPLOAD_FOR_UPGRADE, ACTION_UPLOAD_FOR_COMPARE): if self._get_component_count(components_mask) != 1: raise HpmError("more than 1 component not support for action") self.send_message_with_name('InitiateUpgradeAction', components=components_mask, action=action) def initiate_upgrade_action_and_wait(self, components_mask, action, timeout=2, interval=0.1): """Initiate Upgrade Action and wait for long running command.""" try: self.initiate_upgrade_action(components_mask, action) except CompletionCodeError as e: if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS: self.wait_for_long_duration_command( constants.CMDID_HPM_INITIATE_UPGRADE_ACTION, timeout, interval) else: raise HpmError('initiate_upgrade_action CC=0x%02x' % e.cc) def upload_firmware_block(self, block_number, data): if isinstance(data, str): data = [ord(c) for c in data] self.send_message_with_name('UploadFirmwareBlock', number=block_number, data=data) @staticmethod def _determine_max_block_size(): return 22 def upload_binary(self, binary, timeout=2, interval=0.1, retry=3): """Upload all firmware blocks from a binary.""" block_number = 0 block_size = self._determine_max_block_size() for chunk in chunks(binary, block_size): try: self.upload_firmware_block(block_number, chunk) except CompletionCodeError as e: if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS: self.wait_for_long_duration_command( constants.CMDID_HPM_UPLOAD_FIRMWARE_BLOCK, timeout, interval) else: raise HpmError('upload_firmware_block CC=0x%02x' % e.cc) except IpmiTimeoutError: retry -= 1 if retry == 0: raise IpmiTimeoutError() block_number += 1 block_number &= 0xff def finish_firmware_upload(self, component, length): return self.send_message_with_name('FinishFirmwareUpload', component_id=component, image_length=length) def finish_upload_and_wait(self, component, length, timeout=2, interval=0.1): """Finish, upload and for the firmware.""" try: rsp = self.finish_firmware_upload(component, length) check_completion_code(rsp.completion_code) except CompletionCodeError as e: if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS: self.wait_for_long_duration_command( constants.CMDID_HPM_FINISH_FIRMWARE_UPLOAD, timeout, interval) else: raise HpmError('finish_firmware_upload CC=0x%02x' % e.cc) def get_upgrade_status(self): return UpgradeStatus(self.send_message_with_name('GetUpgradeStatus')) def wait_for_long_duration_command(self, expected_cmd, timeout, interval): start_time = time.time() while time.time() < start_time + timeout: try: status = self.get_upgrade_status() if status.command_in_progress is not expected_cmd \ and status.command_in_progress != 0x34: pass if status.last_completion_code \ == CC_LONG_DURATION_CMD_IN_PROGRESS: time.sleep(interval) else: return except IpmiTimeoutError: time.sleep(interval) except IOError: time.sleep(interval) def activate_firmware(self, rollback_override=None): req = create_request_by_name('ActivateFirmware') if rollback_override is not None: req.rollback_override_policy = rollback_override rsp = self.send_message(req) check_completion_code(rsp.completion_code) def activate_firmware_and_wait(self, rollback_override=None, timeout=2, interval=1): """Activate and wait for the new uploaded firmware.""" try: self.activate_firmware(rollback_override) except CompletionCodeError as e: if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS: self.wait_for_long_duration_command( constants.CMDID_HPM_ACTIVATE_FIRMWARE, timeout, interval) else: raise HpmError('activate_firmware CC=0x%02x' % e.cc) except IpmiTimeoutError: # controller is in reset and flashed new firmware pass def query_selftest_results(self): return SelfTestResult( self.send_message_with_name('QuerySelftestResults')) def query_rollback_status(self): return RollbackStatus( self.send_message_with_name('QueryRollbackStatus')) def initiate_manual_rollback(self): return RollbackStatus( self.send_message_with_name('InitiateManualRollback')) def initiate_manual_rollback_and_wait(self, timeout=2, interval=0.1): try: self.initiate_manual_rollback() except CompletionCodeError as e: if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS: self.wait_for_long_duration_command( constants.CMDID_HPM_INITIATE_MANUAL_ROLLBACK, 60, interval) else: raise HpmError('activate_firmware CC=0x%02x' % e.cc) except IpmiTimeoutError: # controller is in reset and flashed new firmware pass @staticmethod def open_upgrade_image(filename): return UpgradeImage(filename) @staticmethod def get_upgrade_version_from_file(filename): image = UpgradeImage(filename) for action in image.actions: if isinstance(action, UpgradeActionRecordUploadForUpgrade): return action.firmware_version return None @staticmethod def _do_upgrade_action_backup(image): for action in image.actions: if isinstance(action, UpgradeActionRecordBackup): pass @staticmethod def _do_upgrade_action_prepare(image): for action in image.actions: if isinstance(action, UpgradeActionRecordPrepare): print("do ACTION_PREPARE_COMPONENT") @staticmethod def _do_upgrade_action_upload(image): for action in image.actions: if isinstance(action, UpgradeActionRecordUploadForUpgrade): print("do ACTION_UPLOAD_FOR_UPGRADE") def preparation_stage(self, image): #################################################### # match device ID, manfuacturer ID, etc. device_id = self.get_device_id() header = image.header if header.device_id != device_id.device_id: raise HpmError('Device ID: image=0x%x device=0x%x' % (header.device_id, device_id.device_id)) if header.manufacturer_id != device_id.manufacturer_id: raise HpmError('Manufacturer ID: image=0x%x device=0x%x' % (header.manufacturer_id, device_id.manufacturer_id)) if header.product_id != device_id.product_id: raise HpmError('Product ID: image=0x%x device=0x%x' % (header.product_id, device_id.product_id)) # tbd check version #################################################### # compare current revision with upgrade image earlist comp rev targetCap = self.get_target_upgrade_capabilities() # tbd check version #################################################### # Match IPM Controller capabilities with Upgrade Image capabilities support = False for imageComponent in header.components: if imageComponent in targetCap.components: support = True if support is not True: raise HpmError('no supported component in image') def upgrade_stage(self, image, component): for action in image.actions: if action.components & (1 << component) == 0: continue self.initiate_upgrade_action_and_wait(1 << component, action.action_type) if isinstance(action, UpgradeActionRecordUploadForUpgrade): self.upload_binary(action.firmware_image_data) self.finish_upload_and_wait(component, action.firmware_length) def _activation_state_do_self_testing(self): pass def wait_until_new_firmware_comes_up(self, timeout, interval): start_time = time.time() while time.time() < start_time + timeout: try: self.get_upgrade_status() self.get_device_id() except IpmiTimeoutError: time.sleep(interval) except IOError: time.sleep(interval) time.sleep(5) def activation_stage(self, image, component): self.activate_firmware_and_wait( image.header.inaccessibility_timeout, 1) self.wait_until_new_firmware_comes_up( image.header.inaccessibility_timeout, 1) self._activation_state_do_self_testing() def install_component_from_image(self, image, component): self.abort_firmware_upgrade() if component not in image.header.components: raise HpmError('component=%d not in image' % component) self.preparation_stage(image) self.upgrade_stage(image, component) self.activation_stage(image, component) def install_component_from_file(self, filename, component): image = UpgradeImage(filename) self.install_component_from_image(image, component) class UpgradeStatus(State): def _from_response(self, rsp): self.command_in_progress = rsp.command_in_progress self.last_completion_code = rsp.last_completion_code def __str__(self): string = [] string.append("cmd=0x%02x cc=0x%02x" % (self.command_in_progress, self.last_completion_code)) return "\n".join(string) class TargetUpgradeCapabilities(State): def _from_response(self, rsp): self.version = rsp.hpm_1_version self.components = [] for i in range(8): if rsp.component_present & (1 << i): self.components.append(i) def __str__(self): string = [] string.append("Target Upgrade Capabilities") string.append(" HPM.1 version: %s" % self.version) string.append(" Components: %s" % self.components) return "\n".join(string) codecs.register(bcd_search) class ComponentProperty(object): def __init__(self, data=None): if (data): self._from_rsp_data(data) @staticmethod def from_data(component_id, data): if isinstance(data, str): data = [ord(c) for c in data] if component_id is PROPERTY_GENERAL_PROPERTIES: return ComponentPropertyGeneral(data) elif component_id is PROPERTY_CURRENT_VERSION: return ComponentPropertyCurrentVersion(data) elif component_id is PROPERTY_DESCRIPTION_STRING: return ComponentPropertyDescriptionString(data) elif component_id is PROPERTY_ROLLBACK_VERSION: return ComponentPropertyRollbackVersion(data) elif component_id is PROPERTY_DEFERRED_VERSION: return ComponentPropertyDeferredVersion(data) elif component_id in PROPERTY_OEM: raise NotImplementedError class ComponentPropertyGeneral(ComponentProperty): ROLLBACK_SUPPORT_MASK = 0x03 PREPARATION_SUPPORT_MASK = 0x04 COMPARISON_SUPPORT_MASK = 0x08 DEFERRED_ACTIVATION_SUPPORT_MASK = 0x10 PAYLOAD_COLD_RESET_REQ_SUPPORT_MASK = 0x20 def _from_rsp_data(self, data): support = [] cap = data[0] if cap & self.ROLLBACK_SUPPORT_MASK == 0: support.append('rollback_backup_not_supported') elif cap & self.ROLLBACK_SUPPORT_MASK == 1: support.append('rollback_is_supported') elif cap & self.ROLLBACK_SUPPORT_MASK == 2: support.append('rollback_is_supported') elif cap & self.ROLLBACK_SUPPORT_MASK == 3: support.append('reserved') if cap & self.PREPARATION_SUPPORT_MASK: support.append('prepartion') if cap & self.COMPARISON_SUPPORT_MASK: support.append('comparison') if cap & self.DEFERRED_ACTIVATION_SUPPORT_MASK: support.append('deferred_activation') if cap & self.PAYLOAD_COLD_RESET_REQ_SUPPORT_MASK: support.append('payload_cold_reset_required') self.general = support class ComponentPropertyCurrentVersion(ComponentProperty): def _from_rsp_data(self, data): self.version = VersionField(data) class ComponentPropertyDescriptionString(ComponentProperty): def _from_rsp_data(self, data): descr = py3_array_tobytes(array('B', data)) descr = py3dec_unic_bytes_fix(descr) # strip '\x00' descr = descr.replace('\0', '') self.description = descr class ComponentPropertyRollbackVersion(ComponentProperty): def _from_rsp_data(self, data): self.version = VersionField(data) class ComponentPropertyDeferredVersion(ComponentProperty): def _from_rsp_data(self, data): self.version = VersionField(data) class ComponentPropertyOem(ComponentProperty): def _from_rsp_data(self, data): self.oem_data = data class SelfTestResult(State): CORRUPTED_OR_INACCESSIBLE_DATA_OR_DEVICES = 0x57 def _from_response(self, rsp): self.status = rsp.selftest_result_1 result2 = rsp.selftest_result_2 if self.status != self.CORRUPTED_OR_INACCESSIBLE_DATA_OR_DEVICES: self.fail_sel = (result2 & 0x80) >> 7 self.fail_sdrr = (result2 & 0x40) >> 6 self.fail_bmc_fru = (result2 & 0x20) >> 5 self.fail_ipmb = (result2 & 0x10) >> 4 self.fail_sdrr_empty = (result2 & 0x08) >> 3 self.fail_bmc_fru_interanl_area = (result2 & 0x04) >> 2 self.fail_bootblock = (result2 & 0x02) >> 1 self.fail_mc = (result2 & 0x01) >> 0 class RollbackStatus(object): def __init__(self, rsp=None): if rsp: self._from_rsp(rsp) def _from_rsp(self, rsp): if rsp.completion_estimate: self.percent_complete = rsp.completion_estimate image_header = collections.namedtuple('image_header', ['field_name', 'format', 'start', 'len']) class UpgradeImageHeaderRecord(object): FORMAT = [ image_header('format_version', 'B', 8, 1), image_header('device_id', 'B', 9, 1), image_header('product_id', '