Coverage for tropicsquare / transports / __init__.py: 93%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 21:24 +0000

1"""L1 Transport Layer for TROPIC01 

2 

3This module provides the base class for L1 transport implementations. 

4""" 

5 

6from time import sleep 

7 

8from tropicsquare.constants.chip_status import CHIP_STATUS_NOT_READY, CHIP_STATUS_BUSY, CHIP_STATUS_ALARM 

9from tropicsquare.constants.l1 import REQ_ID_GET_RESPONSE, MAX_RETRIES 

10from tropicsquare.constants.rsp_status import RSP_STATUS_RES_CONT 

11from tropicsquare.exceptions import TropicSquareAlarmError, TropicSquareCRCError, TropicSquareTimeoutError 

12from tropicsquare.crc import CRC 

13from tropicsquare.error_mapping import raise_for_response_status 

14 

15 

16class L1Transport(): 

17 """Base class for L1 transport layer. 

18 

19 Platform-specific classes implement only abstract low-level methods. 

20 """ 

21 

22 def send_request(self, request_data: bytes) -> bytes: 

23 """Send request to chip and return response bytes. 

24 

25 :param request_data: Complete request frame (with CRC) 

26 

27 :returns: Response bytes 

28 """ 

29 

30 self._cs_low() 

31 rx_data = self._transfer(request_data) 

32 self._cs_high() 

33 

34 return rx_data 

35 

36 

37 def get_response(self) -> bytes: 

38 """Get response from chip with automatic retry logic. 

39 

40 :returns: Response data from chip 

41 :rtype: bytes 

42 :raises TropicSquareAlarmError: If chip is in alarm state 

43 :raises TropicSquareCRCError: If CRC validation fails 

44 :raises TropicSquareTimeoutError: If chip remains busy after max retries 

45 :raises TropicSquareError: On other communication errors 

46 """ 

47 

48 chip_status = CHIP_STATUS_NOT_READY 

49 

50 for _ in range(MAX_RETRIES): 

51 data = bytearray() 

52 data.extend(bytes(REQ_ID_GET_RESPONSE)) 

53 

54 self._cs_low() 

55 data[:] = self._transfer(data) 

56 chip_status = data[0] 

57 

58 if chip_status in [CHIP_STATUS_NOT_READY, CHIP_STATUS_BUSY]: 

59 self._cs_high() 

60 sleep(0.025) 

61 continue 

62 

63 if chip_status & CHIP_STATUS_ALARM: 

64 self._cs_high() 

65 raise TropicSquareAlarmError("Chip is in alarm state") 

66 

67 response = self._read(2) 

68 

69 response_status = response[0] 

70 response_length = response[1] 

71 

72 if response_status == CHIP_STATUS_BUSY: 

73 self._cs_high() 

74 sleep(0.025) 

75 continue 

76 

77 if response_length > 0: 

78 data = self._read(response_length) 

79 else: 

80 data = None 

81 

82 calccrc = CRC.crc16(response + (data or b'')) 

83 respcrc = self._read(2) 

84 

85 self._cs_high() 

86 

87 raise_for_response_status(response_status) 

88 

89 if respcrc != calccrc: 

90 raise TropicSquareCRCError( 

91 f"CRC mismatch ({calccrc.hex()}<!=>{respcrc.hex()})" 

92 ) 

93 

94 if response_status == RSP_STATUS_RES_CONT: 

95 data += self.get_response() 

96 

97 return data 

98 

99 raise TropicSquareTimeoutError("Chip communication timeout - chip remains busy") 

100 

101 

102 def _transfer(self, tx_data: bytes) -> bytes: 

103 """SPI bidirectional transfer. 

104 

105 Corresponds to SPI write_readinto operation. 

106 

107 :param tx_data: Data to transmit 

108 

109 :returns: Received data (same length as tx_data) 

110 """ 

111 raise NotImplementedError("_transfer() method not implemented in L1Transport subclass") 

112 

113 

114 def _read(self, length: int) -> bytes: 

115 """SPI read operation. 

116 

117 Corresponds to SPI read operation. 

118 

119 :param length: Number of bytes to read 

120 

121 :returns: Read data 

122 """ 

123 raise NotImplementedError("_read() method not implemented in L1Transport subclass") 

124 

125 

126 def _cs_low(self) -> None: 

127 """Activate chip select (CS to logic 0).""" 

128 pass 

129 

130 

131 def _cs_high(self) -> None: 

132 """Deactivate chip select (CS to logic 1).""" 

133 pass