Coverage for tropicsquare / transports / __init__.py: 93%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
1"""L1 Transport Layer for TROPIC01
3This module provides the base class for L1 transport implementations.
4"""
6from time import sleep
8from tropicsquare.constants.chip_status import CHIP_STATUS_NOT_READY, CHIP_STATUS_BUSY, CHIP_STATUS_ALARM
9from tropicsquare.constants.l1 import REQ_ID_GET_RESPONSE, MAX_RETRIES
10from tropicsquare.constants.rsp_status import RSP_STATUS_RES_CONT
11from tropicsquare.exceptions import TropicSquareAlarmError, TropicSquareCRCError, TropicSquareTimeoutError
12from tropicsquare.crc import CRC
13from tropicsquare.error_mapping import raise_for_response_status
16class L1Transport():
17 """Base class for L1 transport layer.
19 Platform-specific classes implement only abstract low-level methods.
20 """
22 def send_request(self, request_data: bytes) -> bytes:
23 """Send request to chip and return response bytes.
25 :param request_data: Complete request frame (with CRC)
27 :returns: Response bytes
28 """
30 self._cs_low()
31 rx_data = self._transfer(request_data)
32 self._cs_high()
34 return rx_data
37 def get_response(self) -> bytes:
38 """Get response from chip with automatic retry logic.
40 :returns: Response data from chip
41 :rtype: bytes
42 :raises TropicSquareAlarmError: If chip is in alarm state
43 :raises TropicSquareCRCError: If CRC validation fails
44 :raises TropicSquareTimeoutError: If chip remains busy after max retries
45 :raises TropicSquareError: On other communication errors
46 """
48 chip_status = CHIP_STATUS_NOT_READY
50 for _ in range(MAX_RETRIES):
51 data = bytearray()
52 data.extend(bytes(REQ_ID_GET_RESPONSE))
54 self._cs_low()
55 data[:] = self._transfer(data)
56 chip_status = data[0]
58 if chip_status in [CHIP_STATUS_NOT_READY, CHIP_STATUS_BUSY]:
59 self._cs_high()
60 sleep(0.025)
61 continue
63 if chip_status & CHIP_STATUS_ALARM:
64 self._cs_high()
65 raise TropicSquareAlarmError("Chip is in alarm state")
67 response = self._read(2)
69 response_status = response[0]
70 response_length = response[1]
72 if response_status == CHIP_STATUS_BUSY:
73 self._cs_high()
74 sleep(0.025)
75 continue
77 if response_length > 0:
78 data = self._read(response_length)
79 else:
80 data = None
82 calccrc = CRC.crc16(response + (data or b''))
83 respcrc = self._read(2)
85 self._cs_high()
87 raise_for_response_status(response_status)
89 if respcrc != calccrc:
90 raise TropicSquareCRCError(
91 f"CRC mismatch ({calccrc.hex()}<!=>{respcrc.hex()})"
92 )
94 if response_status == RSP_STATUS_RES_CONT:
95 data += self.get_response()
97 return data
99 raise TropicSquareTimeoutError("Chip communication timeout - chip remains busy")
102 def _transfer(self, tx_data: bytes) -> bytes:
103 """SPI bidirectional transfer.
105 Corresponds to SPI write_readinto operation.
107 :param tx_data: Data to transmit
109 :returns: Received data (same length as tx_data)
110 """
111 raise NotImplementedError("_transfer() method not implemented in L1Transport subclass")
114 def _read(self, length: int) -> bytes:
115 """SPI read operation.
117 Corresponds to SPI read operation.
119 :param length: Number of bytes to read
121 :returns: Read data
122 """
123 raise NotImplementedError("_read() method not implemented in L1Transport subclass")
126 def _cs_low(self) -> None:
127 """Activate chip select (CS to logic 0)."""
128 pass
131 def _cs_high(self) -> None:
132 """Deactivate chip select (CS to logic 1)."""
133 pass