Coverage for tropicsquare / l2_protocol.py: 100%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
1"""L2 Protocol Layer for TROPIC01 Secure Element
3 This module implements the L2 (Link Layer) protocol for communication
4 with the TROPIC01 chip. It handles low-level SPI communication, CRC
5 validation, retry logic, and message framing.
7 The L2 layer is responsible for:
9 - SPI bus communication and chip select management
10 - Request/response framing and CRC validation
11 - Chip status monitoring and retry logic
12 - Encrypted command transmission
13 - Session handshake protocol
15 The L2 layer does NOT handle:
17 - Cryptographic operations (delegated to parent)
18 - Command parsing/building (done by L3 layer)
19 - Session state management (done by TropicSquare)
20"""
22from tropicsquare.crc import CRC
23from tropicsquare.transports import L1Transport
24from tropicsquare.constants.l2 import *
25from tropicsquare.constants.get_info_req import GET_INFO_DATA_CHUNK_0_127
26from tropicsquare.exceptions import TropicSquareResponseError
29class L2Protocol:
30 """L2 protocol layer implementation.
32 Provides low-level chip communication primitives for the TROPIC01
33 secure element. This class handles SPI communication, framing,
34 CRC validation, and chip state management.
35 """
37 def __init__(self, transport: L1Transport) -> None:
38 """Initialize L2 protocol layer.
40 :param transport: Transport instance
41 """
42 self._transport = transport
45 def get_info_req(self, object_id: int, req_data_chunk: int = GET_INFO_DATA_CHUNK_0_127) -> bytes:
46 """Request information object from chip.
48 Sends GET_INFO request to retrieve chip information like certificate,
49 chip ID, firmware version, etc.
51 :param object_id: Information object type to retrieve
52 :param req_data_chunk: Data chunk selector (for objects > 128 bytes)
54 :returns: Raw information data
55 :rtype: bytes
57 :raises TropicSquareError: If chip status is not ready
58 """
59 payload = bytes([object_id, req_data_chunk])
60 return self._send_and_get_response(REQ_ID_GET_INFO_REQ, payload)
63 def handshake_req(self, ehpub: bytes, p_keyslot: int) -> tuple:
64 """Perform secure session handshake.
66 Sends ephemeral public key to chip and receives chip's ephemeral
67 public key and authentication tag.
69 :param ehpub: Ephemeral public key (32 bytes)
70 :param p_keyslot: Pairing key slot index (0-3)
72 :returns: (chip_ephemeral_pubkey, chip_auth_tag)
73 :rtype: tuple
75 :raises TropicSquareError: If chip status is not ready
76 """
77 payload = ehpub + bytes([p_keyslot])
78 data = self._send_and_get_response(REQ_ID_HANDSHARE_REQ, payload)
80 tsehpub = data[0:32]
81 tsauth = data[32:48]
83 return (tsehpub, tsauth)
86 def get_log(self) -> bytes:
87 """Retrieve firmware logs from chip.
89 :returns: Raw log data
90 :rtype: bytes
92 :raises TropicSquareError: If chip status is not ready
93 """
94 return self._send_and_get_response(REQ_ID_GET_LOG_REQ)
97 def encrypted_command(self, command_size: int, command_ciphertext: bytes, command_tag: bytes) -> tuple:
98 """Send encrypted L3 command to chip.
100 Handles chunking of large commands (> 128 bytes) and sends them
101 to the chip. Returns encrypted response.
103 :param command_size: Size of command ciphertext
104 :param command_ciphertext: Encrypted command data
105 :param command_tag: AES-GCM authentication tag (16 bytes)
107 :returns: (response_ciphertext, response_tag)
108 :rtype: tuple
110 :raises TropicSquareError: If chip status is not ready
111 :raises TropicSquareResponseError: If response size mismatch
112 """
113 def _chunk_data(data, chunk_size=128):
114 for i in range(0, len(data), chunk_size):
115 yield (data[i:i+chunk_size])
117 # L3 Data to chunk
118 l3data = bytearray()
119 l3data.extend(command_size.to_bytes(COMMAND_SIZE_LEN, "little"))
120 l3data.extend(command_ciphertext)
121 l3data.extend(command_tag)
123 # Send all chunks
124 for chunk in _chunk_data(l3data):
125 payload = bytes([len(chunk)]) + chunk
126 request = self._build_request(REQ_ID_ENCRYPTED_CMD_REQ, payload)
127 self._transport.send_request(request)
128 # Get ACK response for this chunk
129 self._transport.get_response()
131 # Get final response
132 data = self._transport.get_response()
134 command_size = int.from_bytes(data[0:2], "little")
135 command_ciphertext = data[2:-16]
136 command_tag = data[-16:]
138 if command_size != len(command_ciphertext):
139 raise TropicSquareResponseError("Command size mismatch in response")
141 return (command_ciphertext, command_tag)
144 def encrypted_session_abt(self) -> bool:
145 """Abort encrypted session.
147 Terminates the current secure session with the chip.
149 :returns: True on success
150 :rtype: bool
152 :raises TropicSquareError: If chip status is not ready
153 """
154 self._send_and_get_response(REQ_ID_ENCRYPTED_SESSION_ABT)
155 return True
158 def sleep_req(self, sleep_mode: int) -> bool:
159 """Put chip to sleep.
161 :param sleep_mode: Sleep mode (SLEEP_MODE_SLEEP or SLEEP_MODE_DEEP_SLEEP)
163 :returns: True on success
164 :rtype: bool
166 :raises ValueError: If invalid sleep mode
167 :raises TropicSquareError: If chip status is not ready
168 """
170 payload = bytes([sleep_mode])
171 self._send_and_get_response(REQ_ID_SLEEP_REQ, payload)
172 return True
175 def startup_req(self, startup_id: int) -> bool:
176 """Startup/reboot chip.
178 :param startup_id: Startup mode (STARTUP_REBOOT or STARTUP_MAINTENANCE_REBOOT)
180 :returns: True on success
181 :rtype: bool
183 :raises ValueError: If invalid startup mode
184 :raises TropicSquareError: If chip status is not ready
185 """
187 payload = bytes([startup_id])
188 self._send_and_get_response(REQ_ID_STARTUP_REQ, payload)
189 return True
192 # === Private helper methods for reducing code duplication ===
194 def _build_request(self, req_id, payload=b''):
195 """Build request frame with CRC.
197 :param req_id: Request ID bytes (e.g., REQ_ID_GET_INFO_REQ)
198 :param payload: Optional payload bytes
200 :returns: Complete request with CRC
201 :rtype: bytearray
202 """
203 data = bytearray()
204 data.extend(bytes(req_id))
205 data.extend(payload)
206 data.extend(CRC.crc16(data))
207 return data
210 def _send_and_get_response(self, req_id, payload=b''):
211 """Build request, send it, check status, and get response.
213 Convenience method that combines common pattern of:
214 1. Build request with CRC
215 2. Send via transport
216 3. Get response
218 :param req_id: Request ID bytes
219 :param payload: Optional payload bytes
221 :returns: Response data from chip
222 :rtype: bytes
224 :raises TropicSquareError: If chip is not ready
225 """
226 request = self._build_request(req_id, payload)
227 self._transport.send_request(request)
228 return self._transport.get_response()