Source code for tropicsquare

# by Petr Kracik (c) 2026

__version__ = "0.0.3"
__license__ = "MIT"


from tropicsquare.constants.l2 import SLEEP_MODE_DEEP_SLEEP, SLEEP_MODE_SLEEP, STARTUP_REBOOT, STARTUP_MAINTENANCE_REBOOT
from tropicsquare.l2_protocol import L2Protocol
from tropicsquare.transports import L1Transport
from tropicsquare.constants import *
from tropicsquare.constants.ecc import ECC_MAX_KEYS, ECC_CURVE_P256, ECC_CURVE_ED25519
from tropicsquare.constants.get_info_req import *
from tropicsquare.exceptions import *
from tropicsquare.error_mapping import raise_for_cmd_result
from tropicsquare.chip_id import ChipId
from tropicsquare.config import parse_config
from tropicsquare.config.base import BaseConfig
from tropicsquare.ecc import EccKeyInfo
from tropicsquare.ecc.signature import EcdsaSignature, EddsaSignature

from hashlib import sha256


[docs] class TropicSquare:
[docs] def __new__(cls, *args, **kwargs): """Factory method that returns platform-specific implementation. When instantiating TropicSquare directly, automatically returns either TropicSquareCPython or TropicSquareMicroPython based on the detected platform. This allows users to write platform-agnostic code: from tropicsquare import TropicSquare ts = TropicSquare(transport) """ if cls is not TropicSquare: return super().__new__(cls) # Only do platform detection when instantiating base class directly import sys if sys.implementation.name == 'micropython': from tropicsquare.ports.micropython import TropicSquareMicroPython return TropicSquareMicroPython(*args, **kwargs) if sys.implementation.name == 'cpython': from tropicsquare.ports.cpython import TropicSquareCPython return TropicSquareCPython(*args, **kwargs) raise TropicSquareError("Unsupported Python implementation: {}".format(sys.implementation.name))
[docs] def __init__(self, transport: L1Transport) -> None: """Initialize TropicSquare base class. :param transport: L1Transport instance """ self._secure_session = None self._certificate = None # Create L2 protocol layer with transport self._l2 = L2Protocol(transport)
@property def certificate(self) -> bytes: """Get X509 certificate from the chip :returns: X509 certificate :rtype: bytes """ if self._certificate: return self._certificate data = self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_0_127) data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_128_255) data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_256_383) data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_384_511) # TODO: Figure out what are that 10 bytes at the beginning # 2 bytes: unknown # 2 bytes (big-endian): length of the certificate # 6 bytes: unknown length = int.from_bytes(data[2:4], "big") self._certificate = data[10:10+length] return self._certificate @property def public_key(self) -> bytes: """Get public key from the X509 certificate In case certificate is not loaded before, it will load also certificate :returns: Public key :rtype: bytes """ if self._certificate is None: cert = self.certificate else : cert = self._certificate # Find signature for X25519 public key # 0x65, 0x6e, 0x03 and 0x21 for i in range(len(cert)): if cert[i] == 0x65: if cert[i+1] == 0x6e and \ cert[i+2] == 0x03 and \ cert[i+3] == 0x21: # Found it # Plus 5 bytes to skip the signature return cert[i+5:i+5+32] return None @property def chip_id(self) -> ChipId: """Get parsed chip ID structure :returns: Parsed chip ID object with all fields :rtype: ChipId """ raw_data = self._l2.get_info_req(GET_INFO_CHIPID) return ChipId(raw_data) @property def riscv_fw_version(self) -> tuple: """Get RISCV firmware version :returns: Firmware version (major, minor, patch, release) :rtype: tuple """ data = self._l2.get_info_req(GET_INFO_RISCV_FW_VERSION) return (data[3], data[2], data[1], data[0]) @property def spect_fw_version(self) -> tuple: """Get SPECT firmware version :returns: Firmware version (major, minor, patch, release) :rtype: tuple """ data = self._l2.get_info_req(GET_INFO_SPECT_FW_VERSION) return (data[3], data[2], data[1], data[0]) @property def fw_bank(self) -> bytes: """Get firmware bank information. :returns: Firmware bank data :rtype: bytes """ return self._l2.get_info_req(GET_INFO_FW_BANK)
[docs] def start_secure_session(self, pkey_index : int, shpriv : bytes, shpub : bytes) -> bool: """Initialize secure session for L3 commands :param phkey_index: Pairing key index :param shpriv: Pairing private key :param shpub: Pairing public key :returns: True if secure session was established :rtype: bool :raises TropicSquareError: If secure session handshake failed """ if not 0 <= pkey_index <= PAIRING_KEY_MAX: raise ValueError( f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {pkey_index}" ) ehpriv, ehpub = self._get_ephemeral_keypair() # Handshake request tsehpub, tsauth = self._l2.handshake_req(ehpub, pkey_index) # Calculation magic sha256hash = sha256() sha256hash.update(PROTOCOL_NAME) sha256hash = sha256(sha256hash.digest()) sha256hash.update(shpub) sha256hash = sha256(sha256hash.digest()) sha256hash.update(self.public_key) sha256hash = sha256(sha256hash.digest()) sha256hash.update(ehpub) sha256hash = sha256(sha256hash.digest()) sha256hash.update(pkey_index.to_bytes(1, "little")) sha256hash = sha256(sha256hash.digest()) sha256hash.update(tsehpub) hash = sha256hash.digest() shared_secret_eh_tseh = self._x25519_exchange(ehpriv, tsehpub) shared_secret_sh_tseh = self._x25519_exchange(shpriv, tsehpub) shared_secret_eh_st = self._x25519_exchange(ehpriv, self.public_key) ck_hkdf_eh_tseh = self._hkdf(PROTOCOL_NAME, shared_secret_eh_tseh) ck_hkdf_sh_tseh = self._hkdf(ck_hkdf_eh_tseh, shared_secret_sh_tseh) ck_hkdf_cmdres, kauth = self._hkdf(ck_hkdf_sh_tseh, shared_secret_eh_st, 2) kcmd, kres = self._hkdf(ck_hkdf_cmdres, b'', 2) ciphertext_with_tag = self._aesgcm(kauth).encrypt(nonce=b'\x00'*12, data=b'', associated_data=hash) tag = ciphertext_with_tag[-16:] # Clear hanshake data shared_secret_eh_tseh = None shared_secret_sh_tseh = None shared_secret_eh_st = None ck_hkdf_eh_tseh = None ck_hkdf_sh_tseh = None ck_hkdf_cmdres = None kauth = None if tag != tsauth: raise TropicSquareHandshakeError("Authentication tag mismatch - handshake failed") encrypt_key = self._aesgcm(kcmd) decrypt_key = self._aesgcm(kres) self._secure_session = [ encrypt_key, decrypt_key, 0 ] return True
[docs] def abort_secure_session(self) -> bool: """Abort secure session :returns: True if secure session was aborted :rtype: bool """ if self._l2.encrypted_session_abt(): self._secure_session = None return True return False
[docs] def reboot(self, mode: int) -> bool: """Startup/reboot chip :param mode: Startup mode (STARTUP_REBOOT or STARTUP_MAINTENANCE_REBOOT) :returns: True if startup request was sent :rtype: bool :raises ValueError: If invalid startup mode :raises TropicSquareError: If startup request failed """ if mode not in [STARTUP_REBOOT, STARTUP_MAINTENANCE_REBOOT]: raise ValueError("Invalid startup mode") return self._l2.startup_req(mode)
[docs] def sleep(self, mode: int) -> bool: """Put chip to sleep :param mode: Sleep mode (SLEEP_MODE_SLEEP or SLEEP_MODE_DEEP_SLEEP) :returns: True if sleep request was sent :rtype: bool :raises ValueError: If invalid sleep mode :raises TropicSquareError: If sleep request failed """ if mode not in [SLEEP_MODE_SLEEP, SLEEP_MODE_DEEP_SLEEP]: raise ValueError("Invalid sleep mode") return self._l2.sleep_req(mode)
[docs] def get_log(self) -> str: """Get log from the RISC Firmware :returns: Log message :rtype: str """ log = b'' while True: part = self._l2.get_log() if not part: break log += part return log.decode("utf-8")
############### # L3 Commands # ###############
[docs] def ping(self, data : bytes) -> bytes: """Returns data back :param data: Data to send :returns: Data from input :rtype: bytes """ request_data = bytearray() request_data.append(CMD_ID_PING) request_data.extend(data) result = self._call_command(request_data) return result
[docs] def random(self, nbytes : int) -> bytes: """Get random bytes :param nbytes: Number of bytes to generate :returns: Random bytes :rtype: bytes """ request_data = bytearray() request_data.append(CMD_ID_RANDOM_VALUE) request_data.extend(nbytes.to_bytes(1, "little")) result = self._call_command(request_data) return result[3:]
[docs] def r_config_read(self, address: int): """Read and parse R-CONFIG register. :param address: Register address (use CFG_* constants from tropicsquare.constants.config) :returns: Parsed config object (StartUpConfig, SensorsConfig, etc.) :rtype: BaseConfig Example:: from tropicsquare.constants.config import CFG_START_UP config = ts.r_config_read(CFG_START_UP) print(config.mbist_dis) """ data = self._config_read_raw(CMD_ID_R_CFG_READ, address) return parse_config(address, data)
[docs] def i_config_read(self, address: int): """Read and parse I-CONFIG register. :param address: Register address (use CFG_* constants from tropicsquare.constants.config) :returns: Parsed config object (StartUpConfig, SensorsConfig, etc.) :rtype: BaseConfig Example:: from tropicsquare.constants.config import CFG_START_UP config = ts.i_config_read(CFG_START_UP) print(config.mbist_dis) """ data = self._config_read_raw(CMD_ID_I_CFG_READ, address) return parse_config(address, data)
[docs] def r_config_write(self, address: int, value) -> bool: """Write single R-CONFIG register. :param address: Register address (use CFG_* constants from tropicsquare.constants.config) :param value: 32-bit register value or BaseConfig object :returns: True if write succeeded :rtype: bool """ self._validate_config_address(address) value_bytes = self._config_value_to_bytes(value) request_data = bytearray() request_data.append(CMD_ID_R_CFG_WRITE) request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little")) request_data.extend(b'M') # Padding dummy data request_data.extend(value_bytes) self._call_command(request_data) return True
[docs] def i_config_write(self, address: int, bit_index: int) -> bool: """Clear a single I-CONFIG bit (1->0 transition only). :param address: Register address (use CFG_* constants from tropicsquare.constants.config) :param bit_index: Bit index to clear (0-31) :returns: True if write succeeded :rtype: bool """ self._validate_config_address(address) if not isinstance(bit_index, int): raise TypeError("I-CONFIG bit index must be integer") if not 0 <= bit_index <= 31: raise ValueError("I-CONFIG bit index must be in range 0-31") request_data = bytearray() request_data.append(CMD_ID_I_CFG_WRITE) request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little")) request_data.append(bit_index) self._call_command(request_data) return True
[docs] def r_config_erase(self) -> bool: """Erase whole R-CONFIG (sets all bits of all COs to 1). :returns: True if erase succeeded :rtype: bool """ request_data = bytearray() request_data.append(CMD_ID_R_CFG_ERASE) self._call_command(request_data) return True
def _config_read_raw(self, cmd_id: int, address: int) -> bytes: """Read raw 4-byte config value payload for a single CO.""" self._validate_config_address(address) request_data = bytearray() request_data.append(cmd_id) request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little")) result = self._call_command(request_data) return result[3:] def _config_value_to_bytes(self, value) -> bytes: """Convert config value input to 4-byte wire format.""" if isinstance(value, BaseConfig): return value.to_bytes() if not isinstance(value, int): raise TypeError("value must be int or BaseConfig") if not 0 <= value <= 0xFFFFFFFF: raise ValueError("Config value must be 32-bit unsigned integer") return value.to_bytes(4, "little") def _validate_config_address(self, address: int) -> None: """Validate 16-bit config CO address.""" if not isinstance(address, int): raise TypeError("Config address must be integer") if not 0 <= address <= 0xFFFF: raise ValueError("Config address must be 16-bit (0x0000-0xFFFF)")
[docs] def mem_data_read(self, slot : int) -> bytes: """Read data from memory slot :param slot: Memory slot :returns: Data from memory slot :rtype: bytes """ request_data = bytearray() request_data.append(CMD_ID_R_MEMDATA_READ) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) result = self._call_command(request_data) return result[3:]
[docs] def mem_data_write(self, data : bytes, slot : int) -> bool: """Write data to memory slot :param data: Data to write (Maximum 444 bytes) :param slot: Memory slot :returns: True if data was written :rtype: bool :raises ValueError: If data size is larger than 444 """ if len(data) > MEM_DATA_MAX_SIZE: raise ValueError(f"Data size ({len(data)} bytes) exceeds maximum allowed size ({MEM_DATA_MAX_SIZE} bytes)") request_data = bytearray() request_data.append(CMD_ID_R_MEMDATA_WRITE) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) request_data.extend(b'M') # Padding dummy data request_data.extend(data) self._call_command(request_data) return True
[docs] def mem_data_erase(self, slot : int) -> bool: """Erase memory slot :param slot: Memory slot :returns: True if data was erased :rtype: bool """ request_data = bytearray() request_data.append(CMD_ID_R_MEMDATA_ERASE) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) self._call_command(request_data) return True
[docs] def ecc_key_generate(self, slot : int, curve : int) -> bool: """Generate ECC key :param slot: Slot for key :param curve: Curve (ECC_CURVE_P256 or ECC_CURVE_ED25519) :returns: True if key was generated :rtype: bool :raises ValueError: If slot is larger than ECC_MAX_KEYS or curve is invalid """ if slot > ECC_MAX_KEYS: raise ValueError("Slot is larger than ECC_MAX_KEYS") if curve not in [ECC_CURVE_P256, ECC_CURVE_ED25519]: raise ValueError("Invalid curve") request_data = bytearray() request_data.append(CMD_ID_ECC_KEY_GENERATE) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) request_data.append(curve) self._call_command(request_data) return True
[docs] def ecc_key_store(self, slot : int, curve : int, key : bytes) -> bytes: """Store own ECC key :param slot: Slot for key :param curve: Curve (ECC_CURVE_P256 or ECC_CURVE_ED25519) :param key: Private key :returns: True if key was stored :rtype: bool :raises ValueError: If slot is larger than ECC_MAX_KEYS or curve is invalid """ if slot > ECC_MAX_KEYS: raise ValueError("Slot is larger than ECC_MAX_KEYS") if curve not in [ECC_CURVE_P256, ECC_CURVE_ED25519]: raise ValueError("Invalid curve") request_data = bytearray() request_data.append(CMD_ID_ECC_KEY_STORE) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) request_data.append(curve) request_data.extend(b'\x00'*12) # Padding dummy data (maybe do random?) request_data.extend(key) self._call_command(request_data) return True
[docs] def ecc_key_read(self, slot : int) -> EccKeyInfo: """Read ECC key information from slot :param slot: Slot for key :returns: Key information with curve, origin, and public_key :rtype: EccKeyInfo :raises ValueError: If slot is larger than ECC_MAX_KEYS Example:: key_info = ts.ecc_key_read(0) if key_info.curve == ECC_CURVE_ED25519: print("Ed25519 key") print(key_info.public_key.hex()) """ if slot > ECC_MAX_KEYS: raise ValueError("Slot is larger than ECC_MAX_KEYS") request_data = bytearray() request_data.append(CMD_ID_ECC_KEY_READ) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) result = self._call_command(request_data) curve = result[0] origin = result[1] pubkey = result[15:] return EccKeyInfo(curve, origin, pubkey)
[docs] def ecc_key_erase(self, slot : int) -> bool: """Erase ECC key :param slot: Slot for key :returns: True if key was erased :rtype: bool :raises ValueError: If slot is larger than ECC_MAX_KEYS """ if slot > ECC_MAX_KEYS: raise ValueError("Slot is larger than ECC_MAX_KEYS") request_data = bytearray() request_data.append(CMD_ID_ECC_KEY_ERASE) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) self._call_command(request_data) return True
[docs] def ecdsa_sign(self, slot : int, hash : bytes) -> EcdsaSignature: """Sign hash with ECDSA using P256 key :param slot: Slot with P256 ECC key :param hash: Hash to sign (32 bytes) :returns: ECDSA signature :rtype: EcdsaSignature :raises ValueError: If slot is larger than ECC_MAX_KEYS Example:: import hashlib message_hash = hashlib.sha256(b"Hello").digest() signature = ts.ecdsa_sign(1, message_hash) print(signature.r.hex()) print(signature.s.hex()) """ if slot > ECC_MAX_KEYS: raise ValueError("Slot is larger than ECC_MAX_KEYS") request_data = bytearray() request_data.append(CMD_ID_ECDSA_SIGN) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) request_data.extend(b'\x00'*13) # Padding dummy data (maybe do random?) request_data.extend(hash) result = self._call_command(request_data) sign_r = result[15:47] sign_s = result[47:] return EcdsaSignature(sign_r, sign_s)
[docs] def eddsa_sign(self, slot : int, message : bytes) -> EddsaSignature: """Sign message with EdDSA using Ed25519 key :param slot: Slot with Ed25519 ECC key :param message: Message to sign :returns: EdDSA signature :rtype: EddsaSignature Example:: signature = ts.eddsa_sign(0, message) print(signature.r.hex()) print(signature.s.hex()) """ if slot > ECC_MAX_KEYS: raise ValueError("Slot is larger than ECC_MAX_KEYS") request_data = bytearray() request_data.append(CMD_ID_EDDSA_SIGN) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) request_data.extend(b'\x00'*13) # Padding dummy data (maybe do random?) request_data.extend(message) result = self._call_command(request_data) sign_r = result[15:47] sign_s = result[47:] return EddsaSignature(sign_r, sign_s)
[docs] def mcounter_init(self, index : int, value : int) -> bool: """Initialize monotonic counter :param index: Counter index :param value: Initial value :returns: True if counter was initialized :rtype: bool """ if index > MCOUNTER_MAX: raise ValueError("Index is larger than MCOUNTER_MAX") request_data = bytearray() request_data.append(CMD_ID_MCOUNTER_INIT) request_data.extend(index.to_bytes(2, "little")) request_data.extend(b'A') # Padding dummy data request_data.extend(value.to_bytes(4, "little")) self._call_command(request_data) return True
[docs] def mcounter_update(self, index : int) -> bool: """Decrement monotonic counter :param index: Counter index :returns: True if counter was updated :rtype: bool """ if index > MCOUNTER_MAX: raise ValueError("Index is larger than MCOUNTER_MAX") request_data = bytearray() request_data.append(CMD_ID_MCOUNTER_UPDATE) request_data.extend(index.to_bytes(2, "little")) self._call_command(request_data) return True
[docs] def mcounter_get(self, index : int) -> int: """Get monotonic counter value :param index: Counter index :returns: Counter value :rtype: int """ if index > MCOUNTER_MAX: raise ValueError("Index is larger than MCOUNTER_MAX") request_data = bytearray() request_data.append(CMD_ID_MCOUNTER_GET) request_data.extend(index.to_bytes(2, "little")) result = self._call_command(request_data) return int.from_bytes(result[3:], "little")
[docs] def mac_and_destroy(self, slot: int, data: bytes) -> bytes: """MAC and destroy operation for atomic PIN verification. This command executes atomic PIN verification using Keccak-based MAC. The operation reads a slot from the MAC-and-Destroy partition (128 slots, 0-127), performs MAC calculation, and destroys/erases the slot data. The MAC-and-Destroy partition is separate from User Data partition and uses Keccak engines with PUF-based per-chip unique keys (K_FXA, K_FXB). :param slot: Slot index in MAC-and-Destroy partition (0-127) :param data: Data to MAC (must be exactly 32 bytes) :returns: MAC result (32 bytes) :raises ValueError: If slot exceeds maximum (127) or data length is not 32 bytes :raises TropicSquareNoSession: If secure session is not established .. note:: Requires active secure session via :meth:`start_secure_session`. .. seealso:: TROPIC01 User API v1.1.2, Table 37: MAC_And_Destroy command specification Example:: # Start secure session first ts.start_secure_session( FACTORY_PAIRING_KEY_INDEX, FACTORY_PAIRING_PRIVATE_KEY_PROD0, FACTORY_PAIRING_PUBLIC_KEY_PROD0 ) # Perform MAC and destroy on slot 0 pin_data = b'my_32_byte_pin_data_here_000' # Exactly 32 bytes mac_result = ts.mac_and_destroy(0, pin_data) print(f"MAC: {mac_result.hex()}") # Returns 32-byte MAC """ if slot > MAC_AND_DESTROY_MAX: raise ValueError(f"Slot {slot} exceeds maximum MAC_AND_DESTROY_MAX ({MAC_AND_DESTROY_MAX})") # Validate data length - must be exactly 32 bytes per API specification if len(data) != MAC_AND_DESTROY_DATA_SIZE: raise ValueError( f"Data must be exactly {MAC_AND_DESTROY_DATA_SIZE} bytes " f"(got {len(data)} bytes). See TROPIC01 User API Table 37." ) request_data = bytearray() request_data.append(CMD_ID_MAC_AND_DESTROY) request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) request_data.extend(b'M') # Padding dummy data request_data.extend(data) result = self._call_command(request_data) return result[3:]
[docs] def pairing_key_read(self, slot: int) -> bytes: """Read pairing key information from slot. :param slot: Pairing key slot index (0-3) :returns: Pairing key information (32 bytes) :rtype: bytes :raises ValueError: If slot exceeds maximum (3) """ if not 0 <= slot <= PAIRING_KEY_MAX: raise ValueError( f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}" ) request_data = bytearray() request_data.append(CMD_ID_PAIRING_KEY_READ) request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little")) result = self._call_command(request_data) return result[3:]
[docs] def pairing_key_write(self, slot: int, key: bytes) -> bool: """Write pairing key information to slot. :param slot: Pairing key slot index (0-3) :param key: Pairing key data (32 bytes) :returns: True if write succeeded :rtype: bool :raises ValueError: If slot exceeds maximum (3) or key length is not 32 bytes """ if not 0 <= slot <= PAIRING_KEY_MAX: raise ValueError( f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}" ) if len(key) != PAIRING_KEY_SIZE: raise ValueError(f"Key must be exactly {PAIRING_KEY_SIZE} bytes") request_data = bytearray() request_data.append(CMD_ID_PAIRING_KEY_WRITE) request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little")) request_data.extend(b'M') # Padding dummy data request_data.extend(key) result = self._call_command(request_data) return True
[docs] def pairing_key_invalidate(self, slot: int) -> bool: """Invalidate pairing key in slot. :param slot: Pairing key slot index (0-3) :returns: True if successful :rtype: bool :raises ValueError: If slot exceeds maximum (3) """ if not 0 <= slot <= PAIRING_KEY_MAX: raise ValueError( f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}" ) request_data = bytearray() request_data.append(CMD_ID_PAIRING_KEY_INVALIDATE) request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little")) self._call_command(request_data) return True
def _call_command(self, data): if self._secure_session is None: raise TropicSquareNoSession("Secure session not started") nonce = self._secure_session[2].to_bytes(12, "little") data = bytes(data) enc = self._secure_session[0].encrypt(nonce=nonce, data=data, associated_data=b'') ciphertext = enc[:-16] tag = enc[-16:] result_cipher, result_tag = self._l2.encrypted_command(len(ciphertext), ciphertext, tag) decrypted = self._secure_session[1].decrypt(nonce=nonce, data=result_cipher+result_tag, associated_data=b'') self._secure_session[2] += 1 raise_for_cmd_result(decrypted[0]) return decrypted[1:] def _get_ephemeral_keypair(self): raise NotImplementedError("Not implemented") def _hkdf(self, salt, shared_secret, length=1): raise NotImplementedError("Not implemented") def _x25519_exchange(self, private_bytes, public_bytes): raise NotImplementedError("Not implemented") def _aesgcm(self, key): raise NotImplementedError("Not implemented")