Source code for tropicsquare.l2_protocol

"""L2 Protocol Layer for TROPIC01 Secure Element

    This module implements the L2 (Link Layer) protocol for communication
    with the TROPIC01 chip. It handles low-level SPI communication, CRC
    validation, retry logic, and message framing.

    The L2 layer is responsible for:

        - SPI bus communication and chip select management
        - Request/response framing and CRC validation
        - Chip status monitoring and retry logic
        - Encrypted command transmission
        - Session handshake protocol

    The L2 layer does NOT handle:

        - Cryptographic operations (delegated to parent)
        - Command parsing/building (done by L3 layer)
        - Session state management (done by TropicSquare)
"""

from tropicsquare.crc import CRC
from tropicsquare.transports import L1Transport
from tropicsquare.constants.l2 import *
from tropicsquare.constants.get_info_req import GET_INFO_DATA_CHUNK_0_127
from tropicsquare.exceptions import TropicSquareResponseError


[docs] class L2Protocol: """L2 protocol layer implementation. Provides low-level chip communication primitives for the TROPIC01 secure element. This class handles SPI communication, framing, CRC validation, and chip state management. """
[docs] def __init__(self, transport: L1Transport) -> None: """Initialize L2 protocol layer. :param transport: Transport instance """ self._transport = transport
[docs] def get_info_req(self, object_id: int, req_data_chunk: int = GET_INFO_DATA_CHUNK_0_127) -> bytes: """Request information object from chip. Sends GET_INFO request to retrieve chip information like certificate, chip ID, firmware version, etc. :param object_id: Information object type to retrieve :param req_data_chunk: Data chunk selector (for objects > 128 bytes) :returns: Raw information data :rtype: bytes :raises TropicSquareError: If chip status is not ready """ payload = bytes([object_id, req_data_chunk]) return self._send_and_get_response(REQ_ID_GET_INFO_REQ, payload)
[docs] def handshake_req(self, ehpub: bytes, p_keyslot: int) -> tuple: """Perform secure session handshake. Sends ephemeral public key to chip and receives chip's ephemeral public key and authentication tag. :param ehpub: Ephemeral public key (32 bytes) :param p_keyslot: Pairing key slot index (0-3) :returns: (chip_ephemeral_pubkey, chip_auth_tag) :rtype: tuple :raises TropicSquareError: If chip status is not ready """ payload = ehpub + bytes([p_keyslot]) data = self._send_and_get_response(REQ_ID_HANDSHARE_REQ, payload) tsehpub = data[0:32] tsauth = data[32:48] return (tsehpub, tsauth)
[docs] def get_log(self) -> bytes: """Retrieve firmware logs from chip. :returns: Raw log data :rtype: bytes :raises TropicSquareError: If chip status is not ready """ return self._send_and_get_response(REQ_ID_GET_LOG_REQ)
[docs] def encrypted_command(self, command_size: int, command_ciphertext: bytes, command_tag: bytes) -> tuple: """Send encrypted L3 command to chip. Handles chunking of large commands (> 128 bytes) and sends them to the chip. Returns encrypted response. :param command_size: Size of command ciphertext :param command_ciphertext: Encrypted command data :param command_tag: AES-GCM authentication tag (16 bytes) :returns: (response_ciphertext, response_tag) :rtype: tuple :raises TropicSquareError: If chip status is not ready :raises TropicSquareResponseError: If response size mismatch """ def _chunk_data(data, chunk_size=128): for i in range(0, len(data), chunk_size): yield (data[i:i+chunk_size]) # L3 Data to chunk l3data = bytearray() l3data.extend(command_size.to_bytes(COMMAND_SIZE_LEN, "little")) l3data.extend(command_ciphertext) l3data.extend(command_tag) # Send all chunks for chunk in _chunk_data(l3data): payload = bytes([len(chunk)]) + chunk request = self._build_request(REQ_ID_ENCRYPTED_CMD_REQ, payload) self._transport.send_request(request) # Get ACK response for this chunk self._transport.get_response() # Get final response data = self._transport.get_response() command_size = int.from_bytes(data[0:2], "little") command_ciphertext = data[2:-16] command_tag = data[-16:] if command_size != len(command_ciphertext): raise TropicSquareResponseError("Command size mismatch in response") return (command_ciphertext, command_tag)
[docs] def encrypted_session_abt(self) -> bool: """Abort encrypted session. Terminates the current secure session with the chip. :returns: True on success :rtype: bool :raises TropicSquareError: If chip status is not ready """ self._send_and_get_response(REQ_ID_ENCRYPTED_SESSION_ABT) return True
[docs] def sleep_req(self, sleep_mode: int) -> bool: """Put chip to sleep. :param sleep_mode: Sleep mode (SLEEP_MODE_SLEEP or SLEEP_MODE_DEEP_SLEEP) :returns: True on success :rtype: bool :raises ValueError: If invalid sleep mode :raises TropicSquareError: If chip status is not ready """ payload = bytes([sleep_mode]) self._send_and_get_response(REQ_ID_SLEEP_REQ, payload) return True
[docs] def startup_req(self, startup_id: int) -> bool: """Startup/reboot chip. :param startup_id: Startup mode (STARTUP_REBOOT or STARTUP_MAINTENANCE_REBOOT) :returns: True on success :rtype: bool :raises ValueError: If invalid startup mode :raises TropicSquareError: If chip status is not ready """ payload = bytes([startup_id]) self._send_and_get_response(REQ_ID_STARTUP_REQ, payload) return True
# === Private helper methods for reducing code duplication === def _build_request(self, req_id, payload=b''): """Build request frame with CRC. :param req_id: Request ID bytes (e.g., REQ_ID_GET_INFO_REQ) :param payload: Optional payload bytes :returns: Complete request with CRC :rtype: bytearray """ data = bytearray() data.extend(bytes(req_id)) data.extend(payload) data.extend(CRC.crc16(data)) return data def _send_and_get_response(self, req_id, payload=b''): """Build request, send it, check status, and get response. Convenience method that combines common pattern of: 1. Build request with CRC 2. Send via transport 3. Get response :param req_id: Request ID bytes :param payload: Optional payload bytes :returns: Response data from chip :rtype: bytes :raises TropicSquareError: If chip is not ready """ request = self._build_request(req_id, payload) self._transport.send_request(request) return self._transport.get_response()