Source code for tropicsquare.transports

"""L1 Transport Layer for TROPIC01

This module provides the base class for L1 transport implementations.
"""

from time import sleep

from tropicsquare.constants.chip_status import CHIP_STATUS_NOT_READY, CHIP_STATUS_BUSY, CHIP_STATUS_ALARM
from tropicsquare.constants.l1 import REQ_ID_GET_RESPONSE, MAX_RETRIES
from tropicsquare.constants.rsp_status import RSP_STATUS_RES_CONT
from tropicsquare.exceptions import TropicSquareAlarmError, TropicSquareCRCError, TropicSquareTimeoutError
from tropicsquare.crc import CRC
from tropicsquare.error_mapping import raise_for_response_status


[docs] class L1Transport(): """Base class for L1 transport layer. Platform-specific classes implement only abstract low-level methods. """
[docs] def send_request(self, request_data: bytes) -> bytes: """Send request to chip and return response bytes. :param request_data: Complete request frame (with CRC) :returns: Response bytes """ self._cs_low() rx_data = self._transfer(request_data) self._cs_high() return rx_data
[docs] def get_response(self) -> bytes: """Get response from chip with automatic retry logic. :returns: Response data from chip :rtype: bytes :raises TropicSquareAlarmError: If chip is in alarm state :raises TropicSquareCRCError: If CRC validation fails :raises TropicSquareTimeoutError: If chip remains busy after max retries :raises TropicSquareError: On other communication errors """ chip_status = CHIP_STATUS_NOT_READY for _ in range(MAX_RETRIES): data = bytearray() data.extend(bytes(REQ_ID_GET_RESPONSE)) self._cs_low() data[:] = self._transfer(data) chip_status = data[0] if chip_status in [CHIP_STATUS_NOT_READY, CHIP_STATUS_BUSY]: self._cs_high() sleep(0.025) continue if chip_status & CHIP_STATUS_ALARM: self._cs_high() raise TropicSquareAlarmError("Chip is in alarm state") response = self._read(2) response_status = response[0] response_length = response[1] if response_status == CHIP_STATUS_BUSY: self._cs_high() sleep(0.025) continue if response_length > 0: data = self._read(response_length) else: data = None calccrc = CRC.crc16(response + (data or b'')) respcrc = self._read(2) self._cs_high() raise_for_response_status(response_status) if respcrc != calccrc: raise TropicSquareCRCError( f"CRC mismatch ({calccrc.hex()}<!=>{respcrc.hex()})" ) if response_status == RSP_STATUS_RES_CONT: data += self.get_response() return data raise TropicSquareTimeoutError("Chip communication timeout - chip remains busy")
def _transfer(self, tx_data: bytes) -> bytes: """SPI bidirectional transfer. Corresponds to SPI write_readinto operation. :param tx_data: Data to transmit :returns: Received data (same length as tx_data) """ raise NotImplementedError("_transfer() method not implemented in L1Transport subclass") def _read(self, length: int) -> bytes: """SPI read operation. Corresponds to SPI read operation. :param length: Number of bytes to read :returns: Read data """ raise NotImplementedError("_read() method not implemented in L1Transport subclass") def _cs_low(self) -> None: """Activate chip select (CS to logic 0).""" pass def _cs_high(self) -> None: """Deactivate chip select (CS to logic 1).""" pass