Coverage for tropicsquare / l2_protocol.py: 100%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 21:24 +0000

1"""L2 Protocol Layer for TROPIC01 Secure Element 

2 

3 This module implements the L2 (Link Layer) protocol for communication 

4 with the TROPIC01 chip. It handles low-level SPI communication, CRC 

5 validation, retry logic, and message framing. 

6 

7 The L2 layer is responsible for: 

8 

9 - SPI bus communication and chip select management 

10 - Request/response framing and CRC validation 

11 - Chip status monitoring and retry logic 

12 - Encrypted command transmission 

13 - Session handshake protocol 

14 

15 The L2 layer does NOT handle: 

16 

17 - Cryptographic operations (delegated to parent) 

18 - Command parsing/building (done by L3 layer) 

19 - Session state management (done by TropicSquare) 

20""" 

21 

22from tropicsquare.crc import CRC 

23from tropicsquare.transports import L1Transport 

24from tropicsquare.constants.l2 import * 

25from tropicsquare.constants.get_info_req import GET_INFO_DATA_CHUNK_0_127 

26from tropicsquare.exceptions import TropicSquareResponseError 

27 

28 

29class L2Protocol: 

30 """L2 protocol layer implementation. 

31 

32 Provides low-level chip communication primitives for the TROPIC01 

33 secure element. This class handles SPI communication, framing, 

34 CRC validation, and chip state management. 

35 """ 

36 

37 def __init__(self, transport: L1Transport) -> None: 

38 """Initialize L2 protocol layer. 

39 

40 :param transport: Transport instance 

41 """ 

42 self._transport = transport 

43 

44 

45 def get_info_req(self, object_id: int, req_data_chunk: int = GET_INFO_DATA_CHUNK_0_127) -> bytes: 

46 """Request information object from chip. 

47 

48 Sends GET_INFO request to retrieve chip information like certificate, 

49 chip ID, firmware version, etc. 

50 

51 :param object_id: Information object type to retrieve 

52 :param req_data_chunk: Data chunk selector (for objects > 128 bytes) 

53 

54 :returns: Raw information data 

55 :rtype: bytes 

56 

57 :raises TropicSquareError: If chip status is not ready 

58 """ 

59 payload = bytes([object_id, req_data_chunk]) 

60 return self._send_and_get_response(REQ_ID_GET_INFO_REQ, payload) 

61 

62 

63 def handshake_req(self, ehpub: bytes, p_keyslot: int) -> tuple: 

64 """Perform secure session handshake. 

65 

66 Sends ephemeral public key to chip and receives chip's ephemeral 

67 public key and authentication tag. 

68 

69 :param ehpub: Ephemeral public key (32 bytes) 

70 :param p_keyslot: Pairing key slot index (0-3) 

71 

72 :returns: (chip_ephemeral_pubkey, chip_auth_tag) 

73 :rtype: tuple 

74 

75 :raises TropicSquareError: If chip status is not ready 

76 """ 

77 payload = ehpub + bytes([p_keyslot]) 

78 data = self._send_and_get_response(REQ_ID_HANDSHARE_REQ, payload) 

79 

80 tsehpub = data[0:32] 

81 tsauth = data[32:48] 

82 

83 return (tsehpub, tsauth) 

84 

85 

86 def get_log(self) -> bytes: 

87 """Retrieve firmware logs from chip. 

88 

89 :returns: Raw log data 

90 :rtype: bytes 

91 

92 :raises TropicSquareError: If chip status is not ready 

93 """ 

94 return self._send_and_get_response(REQ_ID_GET_LOG_REQ) 

95 

96 

97 def encrypted_command(self, command_size: int, command_ciphertext: bytes, command_tag: bytes) -> tuple: 

98 """Send encrypted L3 command to chip. 

99 

100 Handles chunking of large commands (> 128 bytes) and sends them 

101 to the chip. Returns encrypted response. 

102 

103 :param command_size: Size of command ciphertext 

104 :param command_ciphertext: Encrypted command data 

105 :param command_tag: AES-GCM authentication tag (16 bytes) 

106 

107 :returns: (response_ciphertext, response_tag) 

108 :rtype: tuple 

109 

110 :raises TropicSquareError: If chip status is not ready 

111 :raises TropicSquareResponseError: If response size mismatch 

112 """ 

113 def _chunk_data(data, chunk_size=128): 

114 for i in range(0, len(data), chunk_size): 

115 yield (data[i:i+chunk_size]) 

116 

117 # L3 Data to chunk 

118 l3data = bytearray() 

119 l3data.extend(command_size.to_bytes(COMMAND_SIZE_LEN, "little")) 

120 l3data.extend(command_ciphertext) 

121 l3data.extend(command_tag) 

122 

123 # Send all chunks 

124 for chunk in _chunk_data(l3data): 

125 payload = bytes([len(chunk)]) + chunk 

126 request = self._build_request(REQ_ID_ENCRYPTED_CMD_REQ, payload) 

127 self._transport.send_request(request) 

128 # Get ACK response for this chunk 

129 self._transport.get_response() 

130 

131 # Get final response 

132 data = self._transport.get_response() 

133 

134 command_size = int.from_bytes(data[0:2], "little") 

135 command_ciphertext = data[2:-16] 

136 command_tag = data[-16:] 

137 

138 if command_size != len(command_ciphertext): 

139 raise TropicSquareResponseError("Command size mismatch in response") 

140 

141 return (command_ciphertext, command_tag) 

142 

143 

144 def encrypted_session_abt(self) -> bool: 

145 """Abort encrypted session. 

146 

147 Terminates the current secure session with the chip. 

148 

149 :returns: True on success 

150 :rtype: bool 

151 

152 :raises TropicSquareError: If chip status is not ready 

153 """ 

154 self._send_and_get_response(REQ_ID_ENCRYPTED_SESSION_ABT) 

155 return True 

156 

157 

158 def sleep_req(self, sleep_mode: int) -> bool: 

159 """Put chip to sleep. 

160 

161 :param sleep_mode: Sleep mode (SLEEP_MODE_SLEEP or SLEEP_MODE_DEEP_SLEEP) 

162 

163 :returns: True on success 

164 :rtype: bool 

165 

166 :raises ValueError: If invalid sleep mode 

167 :raises TropicSquareError: If chip status is not ready 

168 """ 

169 

170 payload = bytes([sleep_mode]) 

171 self._send_and_get_response(REQ_ID_SLEEP_REQ, payload) 

172 return True 

173 

174 

175 def startup_req(self, startup_id: int) -> bool: 

176 """Startup/reboot chip. 

177 

178 :param startup_id: Startup mode (STARTUP_REBOOT or STARTUP_MAINTENANCE_REBOOT) 

179 

180 :returns: True on success 

181 :rtype: bool 

182 

183 :raises ValueError: If invalid startup mode 

184 :raises TropicSquareError: If chip status is not ready 

185 """ 

186 

187 payload = bytes([startup_id]) 

188 self._send_and_get_response(REQ_ID_STARTUP_REQ, payload) 

189 return True 

190 

191 

192 # === Private helper methods for reducing code duplication === 

193 

194 def _build_request(self, req_id, payload=b''): 

195 """Build request frame with CRC. 

196 

197 :param req_id: Request ID bytes (e.g., REQ_ID_GET_INFO_REQ) 

198 :param payload: Optional payload bytes 

199 

200 :returns: Complete request with CRC 

201 :rtype: bytearray 

202 """ 

203 data = bytearray() 

204 data.extend(bytes(req_id)) 

205 data.extend(payload) 

206 data.extend(CRC.crc16(data)) 

207 return data 

208 

209 

210 def _send_and_get_response(self, req_id, payload=b''): 

211 """Build request, send it, check status, and get response. 

212 

213 Convenience method that combines common pattern of: 

214 1. Build request with CRC 

215 2. Send via transport 

216 3. Get response 

217 

218 :param req_id: Request ID bytes 

219 :param payload: Optional payload bytes 

220 

221 :returns: Response data from chip 

222 :rtype: bytes 

223 

224 :raises TropicSquareError: If chip is not ready 

225 """ 

226 request = self._build_request(req_id, payload) 

227 self._transport.send_request(request) 

228 return self._transport.get_response()