# Copyright (c) 2015 Kontron Europe GmbH # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA from array import array from ..logger import log from ..msgs import (create_message, create_request_by_name, encode_message, decode_message, constants) from ..utils import check_completion_code from ..utils import py3_array_tobytes, py3_array_frombytes def checksum(data): """Calculate the checksum.""" csum = 0 for b in data: csum += b return -csum % 256 class IpmbHeader(object): """Representation of the IPMI message header. Request: *-------*--------------*----------*-------*---------------*-------* | rs_sa | netfn/rs_lun | checksum | rq_sa | rq_seq/rq_lun | cmdid | *-------*--------------*----------*-------*---------------*-------* Response: *-------*--------------*----------*-------*---------------*-------* | rq_sa | netfn/rq_lun | checksum | rs_sa | rq_seq/rs_lun | cmdid | *-------*--------------*----------*-------*---------------*-------* """ rs_sa = None rs_lun = None rq_sa = None rq_lun = None rq_seq = None netfn = None cmdid = None checksum = None def __init__(self, data=None): if data: self.decode(data) def __str__(self): return f'rs_sa=0x{self.rs_sa:02x}, rs_lun={self.rs_lun}, ' \ f'rq_sa=0x{self.rq_sa:02x}, rq_lun={self.rq_lun}, ' \ f'rq_seq={self.rq_seq}, ' \ f'netfn=0x{self.netfn:02x}, ' \ f'cmdid=0x{self.cmdid:02x}' class IpmbHeaderReq(IpmbHeader): """Representation of the IPMI request message header.""" def encode(self): """Encode the header.""" data = array('B') data.append(self.rs_sa) data.append(self.netfn << 2 | self.rs_lun) data.append(checksum((self.rs_sa, data[1]))) data.append(self.rq_sa) data.append(self.rq_seq << 2 | self.rq_lun) data.append(self.cmdid) return py3_array_tobytes(data) def decode(self, data): """Decode the header.""" msg = array('B') py3_array_frombytes(msg, data) self.rs_sa = msg[0] self.netfn = msg[1] >> 2 self.rs_lun = msg[1] & 3 self.checksum = msg[2] self.rq_sa = msg[3] self.rq_seq = msg[4] >> 2 self.rq_lun = msg[4] & 3 self.cmdid = msg[5] class IpmbHeaderRsp(IpmbHeader): """Representation of the IPMI response message header.""" def encode(self): """Encode the header.""" data = array('B') data.append(self.rq_sa) data.append(self.netfn << 2 | self.rq_lun) data.append(checksum((self.rq_sa, data[1]))) data.append(self.rs_sa) data.append(self.rq_seq << 2 | self.rs_lun) data.append(self.cmdid) return py3_array_tobytes(data) def decode(self, data): """Decode the header.""" data = array('B', data) self.rq_sa = data[0] self.netfn = data[1] >> 2 self.rq_lun = data[1] & 3 self.checksum = data[2] self.rs_sa = data[3] self.rq_seq = data[4] >> 2 self.rs_lun = data[4] & 3 self.cmdid = data[5] def from_req_header(self, req_header): self.rs_lun = req_header.rq_lun self.rs_sa = req_header.rq_sa self.rq_seq = req_header.rq_seq self.rq_lun = req_header.rs_lun self.rq_sa = req_header.rs_sa self.netfn = req_header.netfn self.cmdid = req_header.cmdid def encode_ipmb_msg(header, data): """Encode an IPMB message. header: IPMB header object data: IPMI message data as bytestring Returns the message as bytestring. """ msg = array('B') py3_array_frombytes(msg, header.encode()) if data is not None: a = array('B') py3_array_frombytes(a, data) msg.extend(a) msg.append(checksum(msg[3:])) return py3_array_tobytes(msg) def encode_send_message(payload, rq_sa, rs_sa, channel, seq, tracking=1): """Encode a send message command and embedd the message to be send. payload: the message to be send as bytestring rq_sa: the requester source address rs_sa: the responder source address channel: the channel seq: the sequence number tracking: tracking Returns an encode send message as bytestring """ req = create_request_by_name('SendMessage') req.channel.number = channel req.channel.tracking = tracking data = encode_message(req) header = IpmbHeaderReq() header.netfn = req.__netfn__ header.rs_lun = 0 header.rs_sa = rs_sa header.rq_seq = seq header.rq_lun = 0 header.rq_sa = rq_sa header.cmdid = req.__cmdid__ return encode_ipmb_msg(header, data + payload) def encode_bridged_message(routing, header, payload, seq): """Encode a (multi-)bridged command and embedd the message to be send. routing: payload: the message to be send as bytestring header: seq: the sequence number Returns the encoded send message as bytestring """ # change header requester addresses for bridging header.rq_sa = routing[-1].rq_sa header.rs_sa = routing[-1].rs_sa tx_data = encode_ipmb_msg(header, payload) for bridge in reversed(routing[:-1]): tx_data = encode_send_message(tx_data, rq_sa=bridge.rq_sa, rs_sa=bridge.rs_sa, channel=bridge.channel, seq=seq) return tx_data def decode_bridged_message(rx_data): """Decode a (multi-)bridged command. rx_data: the received message as bytestring Returns the decoded message as bytestring """ while array('B', rx_data)[5] == constants.CMDID_SEND_MESSAGE: rsp = create_message(constants.NETFN_APP + 1, constants.CMDID_SEND_MESSAGE, None) decode_message(rsp, rx_data[6:]) check_completion_code(rsp.completion_code) rx_data = rx_data[7:-1] if len(rx_data) < 6: break return rx_data def rx_filter(header, data, rq_sa=False, rs_sa=False, rq_lun=False, rs_lun=True, rq_seq=True): """Check if the message in rx_data matches to the information in header. The following checks are done: - Header checksum - Payload checksum - NetFn matching - LUN matching - Command Id matching header: the header to compare with data: the received message as bytestring """ rsp_header = IpmbHeaderRsp(data=data) data = array('B', data) checks = [ (checksum(data[0:3]), 0, 'Header checksum failed'), (checksum(data[3:]), 0, 'payload checksum failed'), (rsp_header.netfn, header.netfn | 1, 'NetFn mismatch'), (rsp_header.cmdid, header.cmdid, 'command id mismatch'), ] # optional checks if rq_sa: checks.append((rsp_header.rq_sa, header.rq_sa, 'slave address mismatch')) if rs_sa: checks.append((rsp_header.rs_sa, header.rs_sa, 'target address mismatch')) if rq_lun: checks.append((rsp_header.rq_lun, header.rq_lun, 'request LUN mismatch')) if rs_lun: checks.append((rsp_header.rs_lun, header.rs_lun, 'responder LUN mismatch')) if rq_seq: checks.append((rsp_header.rq_seq, header.rq_seq, 'sequence number mismatch')) match = True for left, right, msg in checks: if left != right: log().debug('{:s}: {:d} {:d}'.format(msg, left, right)) match = False return match