Coverage for tropicsquare / __init__.py: 94%
370 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
1# by Petr Kracik (c) 2026
3__version__ = "0.0.3"
4__license__ = "MIT"
7from tropicsquare.constants.l2 import SLEEP_MODE_DEEP_SLEEP, SLEEP_MODE_SLEEP, STARTUP_REBOOT, STARTUP_MAINTENANCE_REBOOT
8from tropicsquare.l2_protocol import L2Protocol
9from tropicsquare.transports import L1Transport
10from tropicsquare.constants import *
11from tropicsquare.constants.ecc import ECC_MAX_KEYS, ECC_CURVE_P256, ECC_CURVE_ED25519
12from tropicsquare.constants.get_info_req import *
13from tropicsquare.exceptions import *
14from tropicsquare.error_mapping import raise_for_cmd_result
15from tropicsquare.chip_id import ChipId
16from tropicsquare.config import parse_config
17from tropicsquare.config.base import BaseConfig
18from tropicsquare.ecc import EccKeyInfo
19from tropicsquare.ecc.signature import EcdsaSignature, EddsaSignature
21from hashlib import sha256
24class TropicSquare:
25 def __new__(cls, *args, **kwargs):
26 """Factory method that returns platform-specific implementation.
28 When instantiating TropicSquare directly, automatically returns
29 either TropicSquareCPython or TropicSquareMicroPython based on
30 the detected platform.
32 This allows users to write platform-agnostic code:
33 from tropicsquare import TropicSquare
34 ts = TropicSquare(transport)
35 """
36 if cls is not TropicSquare:
37 return super().__new__(cls)
39 # Only do platform detection when instantiating base class directly
40 import sys
41 if sys.implementation.name == 'micropython':
42 from tropicsquare.ports.micropython import TropicSquareMicroPython
43 return TropicSquareMicroPython(*args, **kwargs)
45 if sys.implementation.name == 'cpython':
46 from tropicsquare.ports.cpython import TropicSquareCPython
47 return TropicSquareCPython(*args, **kwargs)
49 raise TropicSquareError("Unsupported Python implementation: {}".format(sys.implementation.name))
52 def __init__(self, transport: L1Transport) -> None:
53 """Initialize TropicSquare base class.
55 :param transport: L1Transport instance
56 """
57 self._secure_session = None
58 self._certificate = None
60 # Create L2 protocol layer with transport
61 self._l2 = L2Protocol(transport)
64 @property
65 def certificate(self) -> bytes:
66 """Get X509 certificate from the chip
68 :returns: X509 certificate
69 :rtype: bytes
70 """
71 if self._certificate:
72 return self._certificate
74 data = self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_0_127)
75 data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_128_255)
76 data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_256_383)
77 data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_384_511)
79 # TODO: Figure out what are that 10 bytes at the beginning
80 # 2 bytes: unknown
81 # 2 bytes (big-endian): length of the certificate
82 # 6 bytes: unknown
83 length = int.from_bytes(data[2:4], "big")
84 self._certificate = data[10:10+length]
85 return self._certificate
88 @property
89 def public_key(self) -> bytes:
90 """Get public key from the X509 certificate
92 In case certificate is not loaded before, it will load also certificate
94 :returns: Public key
95 :rtype: bytes
96 """
97 if self._certificate is None:
98 cert = self.certificate
99 else :
100 cert = self._certificate
102 # Find signature for X25519 public key
103 # 0x65, 0x6e, 0x03 and 0x21
104 for i in range(len(cert)):
105 if cert[i] == 0x65:
106 if cert[i+1] == 0x6e and \
107 cert[i+2] == 0x03 and \
108 cert[i+3] == 0x21:
109 # Found it
110 # Plus 5 bytes to skip the signature
111 return cert[i+5:i+5+32]
113 return None
116 @property
117 def chip_id(self) -> ChipId:
118 """Get parsed chip ID structure
120 :returns: Parsed chip ID object with all fields
121 :rtype: ChipId
122 """
123 raw_data = self._l2.get_info_req(GET_INFO_CHIPID)
124 return ChipId(raw_data)
127 @property
128 def riscv_fw_version(self) -> tuple:
129 """Get RISCV firmware version
131 :returns: Firmware version (major, minor, patch, release)
132 :rtype: tuple
133 """
134 data = self._l2.get_info_req(GET_INFO_RISCV_FW_VERSION)
135 return (data[3], data[2], data[1], data[0])
138 @property
139 def spect_fw_version(self) -> tuple:
140 """Get SPECT firmware version
142 :returns: Firmware version (major, minor, patch, release)
143 :rtype: tuple
144 """
145 data = self._l2.get_info_req(GET_INFO_SPECT_FW_VERSION)
146 return (data[3], data[2], data[1], data[0])
149 @property
150 def fw_bank(self) -> bytes:
151 """Get firmware bank information.
153 :returns: Firmware bank data
154 :rtype: bytes
155 """
156 return self._l2.get_info_req(GET_INFO_FW_BANK)
159 def start_secure_session(self, pkey_index : int, shpriv : bytes, shpub : bytes) -> bool:
160 """Initialize secure session for L3 commands
162 :param phkey_index: Pairing key index
163 :param shpriv: Pairing private key
164 :param shpub: Pairing public key
166 :returns: True if secure session was established
167 :rtype: bool
169 :raises TropicSquareError: If secure session handshake failed
170 """
171 if not 0 <= pkey_index <= PAIRING_KEY_MAX:
172 raise ValueError(
173 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {pkey_index}"
174 )
176 ehpriv, ehpub = self._get_ephemeral_keypair()
178 # Handshake request
179 tsehpub, tsauth = self._l2.handshake_req(ehpub, pkey_index)
181 # Calculation magic
182 sha256hash = sha256()
183 sha256hash.update(PROTOCOL_NAME)
185 sha256hash = sha256(sha256hash.digest())
186 sha256hash.update(shpub)
188 sha256hash = sha256(sha256hash.digest())
189 sha256hash.update(self.public_key)
191 sha256hash = sha256(sha256hash.digest())
192 sha256hash.update(ehpub)
194 sha256hash = sha256(sha256hash.digest())
195 sha256hash.update(pkey_index.to_bytes(1, "little"))
197 sha256hash = sha256(sha256hash.digest())
198 sha256hash.update(tsehpub)
200 hash = sha256hash.digest()
202 shared_secret_eh_tseh = self._x25519_exchange(ehpriv, tsehpub)
203 shared_secret_sh_tseh = self._x25519_exchange(shpriv, tsehpub)
204 shared_secret_eh_st = self._x25519_exchange(ehpriv, self.public_key)
206 ck_hkdf_eh_tseh = self._hkdf(PROTOCOL_NAME, shared_secret_eh_tseh)
207 ck_hkdf_sh_tseh = self._hkdf(ck_hkdf_eh_tseh, shared_secret_sh_tseh)
208 ck_hkdf_cmdres, kauth = self._hkdf(ck_hkdf_sh_tseh, shared_secret_eh_st, 2)
209 kcmd, kres = self._hkdf(ck_hkdf_cmdres, b'', 2)
211 ciphertext_with_tag = self._aesgcm(kauth).encrypt(nonce=b'\x00'*12, data=b'', associated_data=hash)
212 tag = ciphertext_with_tag[-16:]
214 # Clear hanshake data
215 shared_secret_eh_tseh = None
216 shared_secret_sh_tseh = None
217 shared_secret_eh_st = None
219 ck_hkdf_eh_tseh = None
220 ck_hkdf_sh_tseh = None
221 ck_hkdf_cmdres = None
222 kauth = None
224 if tag != tsauth:
225 raise TropicSquareHandshakeError("Authentication tag mismatch - handshake failed")
227 encrypt_key = self._aesgcm(kcmd)
228 decrypt_key = self._aesgcm(kres)
230 self._secure_session = [ encrypt_key, decrypt_key, 0 ]
232 return True
235 def abort_secure_session(self) -> bool:
236 """Abort secure session
238 :returns: True if secure session was aborted
239 :rtype: bool
240 """
241 if self._l2.encrypted_session_abt():
242 self._secure_session = None
243 return True
245 return False
247 def reboot(self, mode: int) -> bool:
248 """Startup/reboot chip
250 :param mode: Startup mode (STARTUP_REBOOT or STARTUP_MAINTENANCE_REBOOT)
252 :returns: True if startup request was sent
253 :rtype: bool
255 :raises ValueError: If invalid startup mode
256 :raises TropicSquareError: If startup request failed
257 """
258 if mode not in [STARTUP_REBOOT, STARTUP_MAINTENANCE_REBOOT]:
259 raise ValueError("Invalid startup mode")
261 return self._l2.startup_req(mode)
264 def sleep(self, mode: int) -> bool:
265 """Put chip to sleep
267 :param mode: Sleep mode (SLEEP_MODE_SLEEP or SLEEP_MODE_DEEP_SLEEP)
269 :returns: True if sleep request was sent
270 :rtype: bool
272 :raises ValueError: If invalid sleep mode
273 :raises TropicSquareError: If sleep request failed
274 """
275 if mode not in [SLEEP_MODE_SLEEP, SLEEP_MODE_DEEP_SLEEP]:
276 raise ValueError("Invalid sleep mode")
278 return self._l2.sleep_req(mode)
281 def get_log(self) -> str:
282 """Get log from the RISC Firmware
284 :returns: Log message
285 :rtype: str
286 """
287 log = b''
288 while True:
289 part = self._l2.get_log()
290 if not part:
291 break
293 log += part
295 return log.decode("utf-8")
297 ###############
298 # L3 Commands #
299 ###############
301 def ping(self, data : bytes) -> bytes:
302 """Returns data back
304 :param data: Data to send
306 :returns: Data from input
307 :rtype: bytes
308 """
309 request_data = bytearray()
310 request_data.append(CMD_ID_PING)
311 request_data.extend(data)
313 result = self._call_command(request_data)
315 return result
318 def random(self, nbytes : int) -> bytes:
319 """Get random bytes
321 :param nbytes: Number of bytes to generate
323 :returns: Random bytes
324 :rtype: bytes
325 """
326 request_data = bytearray()
327 request_data.append(CMD_ID_RANDOM_VALUE)
328 request_data.extend(nbytes.to_bytes(1, "little"))
330 result = self._call_command(request_data)
332 return result[3:]
335 def r_config_read(self, address: int):
336 """Read and parse R-CONFIG register.
338 :param address: Register address (use CFG_* constants from tropicsquare.constants.config)
340 :returns: Parsed config object (StartUpConfig, SensorsConfig, etc.)
341 :rtype: BaseConfig
343 Example::
345 from tropicsquare.constants.config import CFG_START_UP
347 config = ts.r_config_read(CFG_START_UP)
348 print(config.mbist_dis)
349 """
350 data = self._config_read_raw(CMD_ID_R_CFG_READ, address)
351 return parse_config(address, data)
354 def i_config_read(self, address: int):
355 """Read and parse I-CONFIG register.
357 :param address: Register address (use CFG_* constants from tropicsquare.constants.config)
359 :returns: Parsed config object (StartUpConfig, SensorsConfig, etc.)
360 :rtype: BaseConfig
362 Example::
364 from tropicsquare.constants.config import CFG_START_UP
366 config = ts.i_config_read(CFG_START_UP)
367 print(config.mbist_dis)
368 """
369 data = self._config_read_raw(CMD_ID_I_CFG_READ, address)
370 return parse_config(address, data)
373 def r_config_write(self, address: int, value) -> bool:
374 """Write single R-CONFIG register.
376 :param address: Register address (use CFG_* constants from tropicsquare.constants.config)
377 :param value: 32-bit register value or BaseConfig object
379 :returns: True if write succeeded
380 :rtype: bool
381 """
382 self._validate_config_address(address)
383 value_bytes = self._config_value_to_bytes(value)
385 request_data = bytearray()
386 request_data.append(CMD_ID_R_CFG_WRITE)
387 request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little"))
388 request_data.extend(b'M') # Padding dummy data
389 request_data.extend(value_bytes)
390 self._call_command(request_data)
391 return True
394 def i_config_write(self, address: int, bit_index: int) -> bool:
395 """Clear a single I-CONFIG bit (1->0 transition only).
397 :param address: Register address (use CFG_* constants from tropicsquare.constants.config)
398 :param bit_index: Bit index to clear (0-31)
400 :returns: True if write succeeded
401 :rtype: bool
402 """
403 self._validate_config_address(address)
405 if not isinstance(bit_index, int):
406 raise TypeError("I-CONFIG bit index must be integer")
408 if not 0 <= bit_index <= 31:
409 raise ValueError("I-CONFIG bit index must be in range 0-31")
411 request_data = bytearray()
412 request_data.append(CMD_ID_I_CFG_WRITE)
413 request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little"))
414 request_data.append(bit_index)
415 self._call_command(request_data)
417 return True
420 def r_config_erase(self) -> bool:
421 """Erase whole R-CONFIG (sets all bits of all COs to 1).
423 :returns: True if erase succeeded
424 :rtype: bool
425 """
426 request_data = bytearray()
427 request_data.append(CMD_ID_R_CFG_ERASE)
428 self._call_command(request_data)
429 return True
432 def _config_read_raw(self, cmd_id: int, address: int) -> bytes:
433 """Read raw 4-byte config value payload for a single CO."""
434 self._validate_config_address(address)
436 request_data = bytearray()
437 request_data.append(cmd_id)
438 request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little"))
439 result = self._call_command(request_data)
440 return result[3:]
443 def _config_value_to_bytes(self, value) -> bytes:
444 """Convert config value input to 4-byte wire format."""
445 if isinstance(value, BaseConfig):
446 return value.to_bytes()
448 if not isinstance(value, int):
449 raise TypeError("value must be int or BaseConfig")
451 if not 0 <= value <= 0xFFFFFFFF:
452 raise ValueError("Config value must be 32-bit unsigned integer")
454 return value.to_bytes(4, "little")
457 def _validate_config_address(self, address: int) -> None:
458 """Validate 16-bit config CO address."""
459 if not isinstance(address, int):
460 raise TypeError("Config address must be integer")
461 if not 0 <= address <= 0xFFFF:
462 raise ValueError("Config address must be 16-bit (0x0000-0xFFFF)")
465 def mem_data_read(self, slot : int) -> bytes:
466 """Read data from memory slot
468 :param slot: Memory slot
470 :returns: Data from memory slot
471 :rtype: bytes
472 """
473 request_data = bytearray()
474 request_data.append(CMD_ID_R_MEMDATA_READ)
475 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
477 result = self._call_command(request_data)
479 return result[3:]
482 def mem_data_write(self, data : bytes, slot : int) -> bool:
483 """Write data to memory slot
485 :param data: Data to write (Maximum 444 bytes)
486 :param slot: Memory slot
488 :returns: True if data was written
489 :rtype: bool
491 :raises ValueError: If data size is larger than 444
492 """
493 if len(data) > MEM_DATA_MAX_SIZE:
494 raise ValueError(f"Data size ({len(data)} bytes) exceeds maximum allowed size ({MEM_DATA_MAX_SIZE} bytes)")
496 request_data = bytearray()
497 request_data.append(CMD_ID_R_MEMDATA_WRITE)
498 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
499 request_data.extend(b'M') # Padding dummy data
500 request_data.extend(data)
502 self._call_command(request_data)
504 return True
507 def mem_data_erase(self, slot : int) -> bool:
508 """Erase memory slot
510 :param slot: Memory slot
512 :returns: True if data was erased
513 :rtype: bool
514 """
515 request_data = bytearray()
516 request_data.append(CMD_ID_R_MEMDATA_ERASE)
517 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
519 self._call_command(request_data)
521 return True
524 def ecc_key_generate(self, slot : int, curve : int) -> bool:
525 """Generate ECC key
527 :param slot: Slot for key
528 :param curve: Curve (ECC_CURVE_P256 or ECC_CURVE_ED25519)
530 :returns: True if key was generated
531 :rtype: bool
533 :raises ValueError: If slot is larger than ECC_MAX_KEYS or curve is invalid
534 """
535 if slot > ECC_MAX_KEYS:
536 raise ValueError("Slot is larger than ECC_MAX_KEYS")
538 if curve not in [ECC_CURVE_P256, ECC_CURVE_ED25519]:
539 raise ValueError("Invalid curve")
542 request_data = bytearray()
543 request_data.append(CMD_ID_ECC_KEY_GENERATE)
544 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
545 request_data.append(curve)
547 self._call_command(request_data)
549 return True
552 def ecc_key_store(self, slot : int, curve : int, key : bytes) -> bytes:
553 """Store own ECC key
555 :param slot: Slot for key
556 :param curve: Curve (ECC_CURVE_P256 or ECC_CURVE_ED25519)
557 :param key: Private key
559 :returns: True if key was stored
560 :rtype: bool
562 :raises ValueError: If slot is larger than ECC_MAX_KEYS or curve is invalid
563 """
564 if slot > ECC_MAX_KEYS:
565 raise ValueError("Slot is larger than ECC_MAX_KEYS")
567 if curve not in [ECC_CURVE_P256, ECC_CURVE_ED25519]:
568 raise ValueError("Invalid curve")
570 request_data = bytearray()
571 request_data.append(CMD_ID_ECC_KEY_STORE)
572 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
573 request_data.append(curve)
574 request_data.extend(b'\x00'*12) # Padding dummy data (maybe do random?)
575 request_data.extend(key)
577 self._call_command(request_data)
579 return True
582 def ecc_key_read(self, slot : int) -> EccKeyInfo:
583 """Read ECC key information from slot
585 :param slot: Slot for key
587 :returns: Key information with curve, origin, and public_key
588 :rtype: EccKeyInfo
590 :raises ValueError: If slot is larger than ECC_MAX_KEYS
592 Example::
594 key_info = ts.ecc_key_read(0)
595 if key_info.curve == ECC_CURVE_ED25519:
596 print("Ed25519 key")
597 print(key_info.public_key.hex())
598 """
599 if slot > ECC_MAX_KEYS:
600 raise ValueError("Slot is larger than ECC_MAX_KEYS")
602 request_data = bytearray()
603 request_data.append(CMD_ID_ECC_KEY_READ)
604 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
606 result = self._call_command(request_data)
608 curve = result[0]
609 origin = result[1]
610 pubkey = result[15:]
612 return EccKeyInfo(curve, origin, pubkey)
615 def ecc_key_erase(self, slot : int) -> bool:
616 """Erase ECC key
618 :param slot: Slot for key
620 :returns: True if key was erased
621 :rtype: bool
623 :raises ValueError: If slot is larger than ECC_MAX_KEYS
624 """
625 if slot > ECC_MAX_KEYS:
626 raise ValueError("Slot is larger than ECC_MAX_KEYS")
628 request_data = bytearray()
629 request_data.append(CMD_ID_ECC_KEY_ERASE)
630 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
632 self._call_command(request_data)
634 return True
637 def ecdsa_sign(self, slot : int, hash : bytes) -> EcdsaSignature:
638 """Sign hash with ECDSA using P256 key
640 :param slot: Slot with P256 ECC key
641 :param hash: Hash to sign (32 bytes)
643 :returns: ECDSA signature
644 :rtype: EcdsaSignature
646 :raises ValueError: If slot is larger than ECC_MAX_KEYS
648 Example::
650 import hashlib
651 message_hash = hashlib.sha256(b"Hello").digest()
652 signature = ts.ecdsa_sign(1, message_hash)
653 print(signature.r.hex())
654 print(signature.s.hex())
655 """
656 if slot > ECC_MAX_KEYS:
657 raise ValueError("Slot is larger than ECC_MAX_KEYS")
659 request_data = bytearray()
660 request_data.append(CMD_ID_ECDSA_SIGN)
661 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
662 request_data.extend(b'\x00'*13) # Padding dummy data (maybe do random?)
663 request_data.extend(hash)
665 result = self._call_command(request_data)
667 sign_r = result[15:47]
668 sign_s = result[47:]
670 return EcdsaSignature(sign_r, sign_s)
673 def eddsa_sign(self, slot : int, message : bytes) -> EddsaSignature:
674 """Sign message with EdDSA using Ed25519 key
676 :param slot: Slot with Ed25519 ECC key
677 :param message: Message to sign
679 :returns: EdDSA signature
680 :rtype: EddsaSignature
682 Example::
684 signature = ts.eddsa_sign(0, message)
685 print(signature.r.hex())
686 print(signature.s.hex())
687 """
688 if slot > ECC_MAX_KEYS:
689 raise ValueError("Slot is larger than ECC_MAX_KEYS")
691 request_data = bytearray()
692 request_data.append(CMD_ID_EDDSA_SIGN)
693 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
694 request_data.extend(b'\x00'*13) # Padding dummy data (maybe do random?)
695 request_data.extend(message)
697 result = self._call_command(request_data)
699 sign_r = result[15:47]
700 sign_s = result[47:]
702 return EddsaSignature(sign_r, sign_s)
705 def mcounter_init(self, index : int, value : int) -> bool:
706 """Initialize monotonic counter
708 :param index: Counter index
709 :param value: Initial value
711 :returns: True if counter was initialized
712 :rtype: bool
713 """
714 if index > MCOUNTER_MAX:
715 raise ValueError("Index is larger than MCOUNTER_MAX")
717 request_data = bytearray()
718 request_data.append(CMD_ID_MCOUNTER_INIT)
719 request_data.extend(index.to_bytes(2, "little"))
720 request_data.extend(b'A') # Padding dummy data
721 request_data.extend(value.to_bytes(4, "little"))
723 self._call_command(request_data)
725 return True
728 def mcounter_update(self, index : int) -> bool:
729 """Decrement monotonic counter
731 :param index: Counter index
733 :returns: True if counter was updated
734 :rtype: bool
735 """
736 if index > MCOUNTER_MAX:
737 raise ValueError("Index is larger than MCOUNTER_MAX")
739 request_data = bytearray()
740 request_data.append(CMD_ID_MCOUNTER_UPDATE)
741 request_data.extend(index.to_bytes(2, "little"))
743 self._call_command(request_data)
745 return True
748 def mcounter_get(self, index : int) -> int:
749 """Get monotonic counter value
751 :param index: Counter index
753 :returns: Counter value
754 :rtype: int
755 """
756 if index > MCOUNTER_MAX:
757 raise ValueError("Index is larger than MCOUNTER_MAX")
759 request_data = bytearray()
760 request_data.append(CMD_ID_MCOUNTER_GET)
761 request_data.extend(index.to_bytes(2, "little"))
763 result = self._call_command(request_data)
765 return int.from_bytes(result[3:], "little")
768 def mac_and_destroy(self, slot: int, data: bytes) -> bytes:
769 """MAC and destroy operation for atomic PIN verification.
771 This command executes atomic PIN verification using Keccak-based MAC.
772 The operation reads a slot from the MAC-and-Destroy partition (128 slots, 0-127),
773 performs MAC calculation, and destroys/erases the slot data.
775 The MAC-and-Destroy partition is separate from User Data partition and
776 uses Keccak engines with PUF-based per-chip unique keys (K_FXA, K_FXB).
778 :param slot: Slot index in MAC-and-Destroy partition (0-127)
779 :param data: Data to MAC (must be exactly 32 bytes)
781 :returns: MAC result (32 bytes)
783 :raises ValueError: If slot exceeds maximum (127) or data length is not 32 bytes
784 :raises TropicSquareNoSession: If secure session is not established
786 .. note::
787 Requires active secure session via :meth:`start_secure_session`.
789 .. seealso::
790 TROPIC01 User API v1.1.2, Table 37: MAC_And_Destroy command specification
792 Example::
794 # Start secure session first
795 ts.start_secure_session(
796 FACTORY_PAIRING_KEY_INDEX,
797 FACTORY_PAIRING_PRIVATE_KEY_PROD0,
798 FACTORY_PAIRING_PUBLIC_KEY_PROD0
799 )
801 # Perform MAC and destroy on slot 0
802 pin_data = b'my_32_byte_pin_data_here_000' # Exactly 32 bytes
803 mac_result = ts.mac_and_destroy(0, pin_data)
804 print(f"MAC: {mac_result.hex()}") # Returns 32-byte MAC
805 """
806 if slot > MAC_AND_DESTROY_MAX:
807 raise ValueError(f"Slot {slot} exceeds maximum MAC_AND_DESTROY_MAX ({MAC_AND_DESTROY_MAX})")
809 # Validate data length - must be exactly 32 bytes per API specification
810 if len(data) != MAC_AND_DESTROY_DATA_SIZE:
811 raise ValueError(
812 f"Data must be exactly {MAC_AND_DESTROY_DATA_SIZE} bytes "
813 f"(got {len(data)} bytes). See TROPIC01 User API Table 37."
814 )
816 request_data = bytearray()
817 request_data.append(CMD_ID_MAC_AND_DESTROY)
818 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little"))
819 request_data.extend(b'M') # Padding dummy data
820 request_data.extend(data)
822 result = self._call_command(request_data)
824 return result[3:]
827 def pairing_key_read(self, slot: int) -> bytes:
828 """Read pairing key information from slot.
830 :param slot: Pairing key slot index (0-3)
832 :returns: Pairing key information (32 bytes)
833 :rtype: bytes
835 :raises ValueError: If slot exceeds maximum (3)
836 """
837 if not 0 <= slot <= PAIRING_KEY_MAX:
838 raise ValueError(
839 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}"
840 )
842 request_data = bytearray()
843 request_data.append(CMD_ID_PAIRING_KEY_READ)
844 request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little"))
845 result = self._call_command(request_data)
847 return result[3:]
850 def pairing_key_write(self, slot: int, key: bytes) -> bool:
851 """Write pairing key information to slot.
853 :param slot: Pairing key slot index (0-3)
854 :param key: Pairing key data (32 bytes)
856 :returns: True if write succeeded
857 :rtype: bool
859 :raises ValueError: If slot exceeds maximum (3) or key length is not 32 bytes
860 """
861 if not 0 <= slot <= PAIRING_KEY_MAX:
862 raise ValueError(
863 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}"
864 )
866 if len(key) != PAIRING_KEY_SIZE:
867 raise ValueError(f"Key must be exactly {PAIRING_KEY_SIZE} bytes")
869 request_data = bytearray()
870 request_data.append(CMD_ID_PAIRING_KEY_WRITE)
871 request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little"))
872 request_data.extend(b'M') # Padding dummy data
873 request_data.extend(key)
875 result = self._call_command(request_data)
877 return True
880 def pairing_key_invalidate(self, slot: int) -> bool:
881 """Invalidate pairing key in slot.
883 :param slot: Pairing key slot index (0-3)
885 :returns: True if successful
886 :rtype: bool
888 :raises ValueError: If slot exceeds maximum (3)
889 """
890 if not 0 <= slot <= PAIRING_KEY_MAX:
891 raise ValueError(
892 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}"
893 )
895 request_data = bytearray()
896 request_data.append(CMD_ID_PAIRING_KEY_INVALIDATE)
897 request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little"))
899 self._call_command(request_data)
901 return True
903 def _call_command(self, data):
904 if self._secure_session is None:
905 raise TropicSquareNoSession("Secure session not started")
907 nonce = self._secure_session[2].to_bytes(12, "little")
908 data = bytes(data)
910 enc = self._secure_session[0].encrypt(nonce=nonce, data=data, associated_data=b'')
911 ciphertext = enc[:-16]
912 tag = enc[-16:]
914 result_cipher, result_tag = self._l2.encrypted_command(len(ciphertext), ciphertext, tag)
915 decrypted = self._secure_session[1].decrypt(nonce=nonce, data=result_cipher+result_tag, associated_data=b'')
917 self._secure_session[2] += 1
919 raise_for_cmd_result(decrypted[0])
921 return decrypted[1:]
924 def _get_ephemeral_keypair(self):
925 raise NotImplementedError("Not implemented")
928 def _hkdf(self, salt, shared_secret, length=1):
929 raise NotImplementedError("Not implemented")
932 def _x25519_exchange(self, private_bytes, public_bytes):
933 raise NotImplementedError("Not implemented")
936 def _aesgcm(self, key):
937 raise NotImplementedError("Not implemented")