Coverage for tropicsquare / __init__.py: 94%

370 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 21:24 +0000

1# by Petr Kracik (c) 2026 

2 

3__version__ = "0.0.3" 

4__license__ = "MIT" 

5 

6 

7from tropicsquare.constants.l2 import SLEEP_MODE_DEEP_SLEEP, SLEEP_MODE_SLEEP, STARTUP_REBOOT, STARTUP_MAINTENANCE_REBOOT 

8from tropicsquare.l2_protocol import L2Protocol 

9from tropicsquare.transports import L1Transport 

10from tropicsquare.constants import * 

11from tropicsquare.constants.ecc import ECC_MAX_KEYS, ECC_CURVE_P256, ECC_CURVE_ED25519 

12from tropicsquare.constants.get_info_req import * 

13from tropicsquare.exceptions import * 

14from tropicsquare.error_mapping import raise_for_cmd_result 

15from tropicsquare.chip_id import ChipId 

16from tropicsquare.config import parse_config 

17from tropicsquare.config.base import BaseConfig 

18from tropicsquare.ecc import EccKeyInfo 

19from tropicsquare.ecc.signature import EcdsaSignature, EddsaSignature 

20 

21from hashlib import sha256 

22 

23 

24class TropicSquare: 

25 def __new__(cls, *args, **kwargs): 

26 """Factory method that returns platform-specific implementation. 

27 

28 When instantiating TropicSquare directly, automatically returns 

29 either TropicSquareCPython or TropicSquareMicroPython based on 

30 the detected platform. 

31 

32 This allows users to write platform-agnostic code: 

33 from tropicsquare import TropicSquare 

34 ts = TropicSquare(transport) 

35 """ 

36 if cls is not TropicSquare: 

37 return super().__new__(cls) 

38 

39 # Only do platform detection when instantiating base class directly 

40 import sys 

41 if sys.implementation.name == 'micropython': 

42 from tropicsquare.ports.micropython import TropicSquareMicroPython 

43 return TropicSquareMicroPython(*args, **kwargs) 

44 

45 if sys.implementation.name == 'cpython': 

46 from tropicsquare.ports.cpython import TropicSquareCPython 

47 return TropicSquareCPython(*args, **kwargs) 

48 

49 raise TropicSquareError("Unsupported Python implementation: {}".format(sys.implementation.name)) 

50 

51 

52 def __init__(self, transport: L1Transport) -> None: 

53 """Initialize TropicSquare base class. 

54 

55 :param transport: L1Transport instance 

56 """ 

57 self._secure_session = None 

58 self._certificate = None 

59 

60 # Create L2 protocol layer with transport 

61 self._l2 = L2Protocol(transport) 

62 

63 

64 @property 

65 def certificate(self) -> bytes: 

66 """Get X509 certificate from the chip 

67 

68 :returns: X509 certificate 

69 :rtype: bytes 

70 """ 

71 if self._certificate: 

72 return self._certificate 

73 

74 data = self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_0_127) 

75 data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_128_255) 

76 data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_256_383) 

77 data += self._l2.get_info_req(GET_INFO_X509_CERT, GET_INFO_DATA_CHUNK_384_511) 

78 

79 # TODO: Figure out what are that 10 bytes at the beginning 

80 # 2 bytes: unknown 

81 # 2 bytes (big-endian): length of the certificate 

82 # 6 bytes: unknown 

83 length = int.from_bytes(data[2:4], "big") 

84 self._certificate = data[10:10+length] 

85 return self._certificate 

86 

87 

88 @property 

89 def public_key(self) -> bytes: 

90 """Get public key from the X509 certificate 

91 

92 In case certificate is not loaded before, it will load also certificate 

93 

94 :returns: Public key 

95 :rtype: bytes 

96 """ 

97 if self._certificate is None: 

98 cert = self.certificate 

99 else : 

100 cert = self._certificate 

101 

102 # Find signature for X25519 public key 

103 # 0x65, 0x6e, 0x03 and 0x21 

104 for i in range(len(cert)): 

105 if cert[i] == 0x65: 

106 if cert[i+1] == 0x6e and \ 

107 cert[i+2] == 0x03 and \ 

108 cert[i+3] == 0x21: 

109 # Found it 

110 # Plus 5 bytes to skip the signature 

111 return cert[i+5:i+5+32] 

112 

113 return None 

114 

115 

116 @property 

117 def chip_id(self) -> ChipId: 

118 """Get parsed chip ID structure 

119 

120 :returns: Parsed chip ID object with all fields 

121 :rtype: ChipId 

122 """ 

123 raw_data = self._l2.get_info_req(GET_INFO_CHIPID) 

124 return ChipId(raw_data) 

125 

126 

127 @property 

128 def riscv_fw_version(self) -> tuple: 

129 """Get RISCV firmware version 

130 

131 :returns: Firmware version (major, minor, patch, release) 

132 :rtype: tuple 

133 """ 

134 data = self._l2.get_info_req(GET_INFO_RISCV_FW_VERSION) 

135 return (data[3], data[2], data[1], data[0]) 

136 

137 

138 @property 

139 def spect_fw_version(self) -> tuple: 

140 """Get SPECT firmware version 

141 

142 :returns: Firmware version (major, minor, patch, release) 

143 :rtype: tuple 

144 """ 

145 data = self._l2.get_info_req(GET_INFO_SPECT_FW_VERSION) 

146 return (data[3], data[2], data[1], data[0]) 

147 

148 

149 @property 

150 def fw_bank(self) -> bytes: 

151 """Get firmware bank information. 

152 

153 :returns: Firmware bank data 

154 :rtype: bytes 

155 """ 

156 return self._l2.get_info_req(GET_INFO_FW_BANK) 

157 

158 

159 def start_secure_session(self, pkey_index : int, shpriv : bytes, shpub : bytes) -> bool: 

160 """Initialize secure session for L3 commands 

161 

162 :param phkey_index: Pairing key index 

163 :param shpriv: Pairing private key 

164 :param shpub: Pairing public key 

165 

166 :returns: True if secure session was established 

167 :rtype: bool 

168 

169 :raises TropicSquareError: If secure session handshake failed 

170 """ 

171 if not 0 <= pkey_index <= PAIRING_KEY_MAX: 

172 raise ValueError( 

173 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {pkey_index}" 

174 ) 

175 

176 ehpriv, ehpub = self._get_ephemeral_keypair() 

177 

178 # Handshake request 

179 tsehpub, tsauth = self._l2.handshake_req(ehpub, pkey_index) 

180 

181 # Calculation magic 

182 sha256hash = sha256() 

183 sha256hash.update(PROTOCOL_NAME) 

184 

185 sha256hash = sha256(sha256hash.digest()) 

186 sha256hash.update(shpub) 

187 

188 sha256hash = sha256(sha256hash.digest()) 

189 sha256hash.update(self.public_key) 

190 

191 sha256hash = sha256(sha256hash.digest()) 

192 sha256hash.update(ehpub) 

193 

194 sha256hash = sha256(sha256hash.digest()) 

195 sha256hash.update(pkey_index.to_bytes(1, "little")) 

196 

197 sha256hash = sha256(sha256hash.digest()) 

198 sha256hash.update(tsehpub) 

199 

200 hash = sha256hash.digest() 

201 

202 shared_secret_eh_tseh = self._x25519_exchange(ehpriv, tsehpub) 

203 shared_secret_sh_tseh = self._x25519_exchange(shpriv, tsehpub) 

204 shared_secret_eh_st = self._x25519_exchange(ehpriv, self.public_key) 

205 

206 ck_hkdf_eh_tseh = self._hkdf(PROTOCOL_NAME, shared_secret_eh_tseh) 

207 ck_hkdf_sh_tseh = self._hkdf(ck_hkdf_eh_tseh, shared_secret_sh_tseh) 

208 ck_hkdf_cmdres, kauth = self._hkdf(ck_hkdf_sh_tseh, shared_secret_eh_st, 2) 

209 kcmd, kres = self._hkdf(ck_hkdf_cmdres, b'', 2) 

210 

211 ciphertext_with_tag = self._aesgcm(kauth).encrypt(nonce=b'\x00'*12, data=b'', associated_data=hash) 

212 tag = ciphertext_with_tag[-16:] 

213 

214 # Clear hanshake data 

215 shared_secret_eh_tseh = None 

216 shared_secret_sh_tseh = None 

217 shared_secret_eh_st = None 

218 

219 ck_hkdf_eh_tseh = None 

220 ck_hkdf_sh_tseh = None 

221 ck_hkdf_cmdres = None 

222 kauth = None 

223 

224 if tag != tsauth: 

225 raise TropicSquareHandshakeError("Authentication tag mismatch - handshake failed") 

226 

227 encrypt_key = self._aesgcm(kcmd) 

228 decrypt_key = self._aesgcm(kres) 

229 

230 self._secure_session = [ encrypt_key, decrypt_key, 0 ] 

231 

232 return True 

233 

234 

235 def abort_secure_session(self) -> bool: 

236 """Abort secure session 

237 

238 :returns: True if secure session was aborted 

239 :rtype: bool 

240 """ 

241 if self._l2.encrypted_session_abt(): 

242 self._secure_session = None 

243 return True 

244 

245 return False 

246 

247 def reboot(self, mode: int) -> bool: 

248 """Startup/reboot chip 

249 

250 :param mode: Startup mode (STARTUP_REBOOT or STARTUP_MAINTENANCE_REBOOT) 

251 

252 :returns: True if startup request was sent 

253 :rtype: bool 

254 

255 :raises ValueError: If invalid startup mode 

256 :raises TropicSquareError: If startup request failed 

257 """ 

258 if mode not in [STARTUP_REBOOT, STARTUP_MAINTENANCE_REBOOT]: 

259 raise ValueError("Invalid startup mode") 

260 

261 return self._l2.startup_req(mode) 

262 

263 

264 def sleep(self, mode: int) -> bool: 

265 """Put chip to sleep 

266 

267 :param mode: Sleep mode (SLEEP_MODE_SLEEP or SLEEP_MODE_DEEP_SLEEP) 

268 

269 :returns: True if sleep request was sent 

270 :rtype: bool 

271 

272 :raises ValueError: If invalid sleep mode 

273 :raises TropicSquareError: If sleep request failed 

274 """ 

275 if mode not in [SLEEP_MODE_SLEEP, SLEEP_MODE_DEEP_SLEEP]: 

276 raise ValueError("Invalid sleep mode") 

277 

278 return self._l2.sleep_req(mode) 

279 

280 

281 def get_log(self) -> str: 

282 """Get log from the RISC Firmware 

283 

284 :returns: Log message 

285 :rtype: str 

286 """ 

287 log = b'' 

288 while True: 

289 part = self._l2.get_log() 

290 if not part: 

291 break 

292 

293 log += part 

294 

295 return log.decode("utf-8") 

296 

297 ############### 

298 # L3 Commands # 

299 ############### 

300 

301 def ping(self, data : bytes) -> bytes: 

302 """Returns data back 

303 

304 :param data: Data to send 

305 

306 :returns: Data from input 

307 :rtype: bytes 

308 """ 

309 request_data = bytearray() 

310 request_data.append(CMD_ID_PING) 

311 request_data.extend(data) 

312 

313 result = self._call_command(request_data) 

314 

315 return result 

316 

317 

318 def random(self, nbytes : int) -> bytes: 

319 """Get random bytes 

320 

321 :param nbytes: Number of bytes to generate 

322 

323 :returns: Random bytes 

324 :rtype: bytes 

325 """ 

326 request_data = bytearray() 

327 request_data.append(CMD_ID_RANDOM_VALUE) 

328 request_data.extend(nbytes.to_bytes(1, "little")) 

329 

330 result = self._call_command(request_data) 

331 

332 return result[3:] 

333 

334 

335 def r_config_read(self, address: int): 

336 """Read and parse R-CONFIG register. 

337 

338 :param address: Register address (use CFG_* constants from tropicsquare.constants.config) 

339 

340 :returns: Parsed config object (StartUpConfig, SensorsConfig, etc.) 

341 :rtype: BaseConfig 

342 

343 Example:: 

344 

345 from tropicsquare.constants.config import CFG_START_UP 

346 

347 config = ts.r_config_read(CFG_START_UP) 

348 print(config.mbist_dis) 

349 """ 

350 data = self._config_read_raw(CMD_ID_R_CFG_READ, address) 

351 return parse_config(address, data) 

352 

353 

354 def i_config_read(self, address: int): 

355 """Read and parse I-CONFIG register. 

356 

357 :param address: Register address (use CFG_* constants from tropicsquare.constants.config) 

358 

359 :returns: Parsed config object (StartUpConfig, SensorsConfig, etc.) 

360 :rtype: BaseConfig 

361 

362 Example:: 

363 

364 from tropicsquare.constants.config import CFG_START_UP 

365 

366 config = ts.i_config_read(CFG_START_UP) 

367 print(config.mbist_dis) 

368 """ 

369 data = self._config_read_raw(CMD_ID_I_CFG_READ, address) 

370 return parse_config(address, data) 

371 

372 

373 def r_config_write(self, address: int, value) -> bool: 

374 """Write single R-CONFIG register. 

375 

376 :param address: Register address (use CFG_* constants from tropicsquare.constants.config) 

377 :param value: 32-bit register value or BaseConfig object 

378 

379 :returns: True if write succeeded 

380 :rtype: bool 

381 """ 

382 self._validate_config_address(address) 

383 value_bytes = self._config_value_to_bytes(value) 

384 

385 request_data = bytearray() 

386 request_data.append(CMD_ID_R_CFG_WRITE) 

387 request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little")) 

388 request_data.extend(b'M') # Padding dummy data 

389 request_data.extend(value_bytes) 

390 self._call_command(request_data) 

391 return True 

392 

393 

394 def i_config_write(self, address: int, bit_index: int) -> bool: 

395 """Clear a single I-CONFIG bit (1->0 transition only). 

396 

397 :param address: Register address (use CFG_* constants from tropicsquare.constants.config) 

398 :param bit_index: Bit index to clear (0-31) 

399 

400 :returns: True if write succeeded 

401 :rtype: bool 

402 """ 

403 self._validate_config_address(address) 

404 

405 if not isinstance(bit_index, int): 

406 raise TypeError("I-CONFIG bit index must be integer") 

407 

408 if not 0 <= bit_index <= 31: 

409 raise ValueError("I-CONFIG bit index must be in range 0-31") 

410 

411 request_data = bytearray() 

412 request_data.append(CMD_ID_I_CFG_WRITE) 

413 request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little")) 

414 request_data.append(bit_index) 

415 self._call_command(request_data) 

416 

417 return True 

418 

419 

420 def r_config_erase(self) -> bool: 

421 """Erase whole R-CONFIG (sets all bits of all COs to 1). 

422 

423 :returns: True if erase succeeded 

424 :rtype: bool 

425 """ 

426 request_data = bytearray() 

427 request_data.append(CMD_ID_R_CFG_ERASE) 

428 self._call_command(request_data) 

429 return True 

430 

431 

432 def _config_read_raw(self, cmd_id: int, address: int) -> bytes: 

433 """Read raw 4-byte config value payload for a single CO.""" 

434 self._validate_config_address(address) 

435 

436 request_data = bytearray() 

437 request_data.append(cmd_id) 

438 request_data.extend(address.to_bytes(CFG_ADDRESS_SIZE, "little")) 

439 result = self._call_command(request_data) 

440 return result[3:] 

441 

442 

443 def _config_value_to_bytes(self, value) -> bytes: 

444 """Convert config value input to 4-byte wire format.""" 

445 if isinstance(value, BaseConfig): 

446 return value.to_bytes() 

447 

448 if not isinstance(value, int): 

449 raise TypeError("value must be int or BaseConfig") 

450 

451 if not 0 <= value <= 0xFFFFFFFF: 

452 raise ValueError("Config value must be 32-bit unsigned integer") 

453 

454 return value.to_bytes(4, "little") 

455 

456 

457 def _validate_config_address(self, address: int) -> None: 

458 """Validate 16-bit config CO address.""" 

459 if not isinstance(address, int): 

460 raise TypeError("Config address must be integer") 

461 if not 0 <= address <= 0xFFFF: 

462 raise ValueError("Config address must be 16-bit (0x0000-0xFFFF)") 

463 

464 

465 def mem_data_read(self, slot : int) -> bytes: 

466 """Read data from memory slot 

467 

468 :param slot: Memory slot 

469 

470 :returns: Data from memory slot 

471 :rtype: bytes 

472 """ 

473 request_data = bytearray() 

474 request_data.append(CMD_ID_R_MEMDATA_READ) 

475 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

476 

477 result = self._call_command(request_data) 

478 

479 return result[3:] 

480 

481 

482 def mem_data_write(self, data : bytes, slot : int) -> bool: 

483 """Write data to memory slot 

484 

485 :param data: Data to write (Maximum 444 bytes) 

486 :param slot: Memory slot 

487 

488 :returns: True if data was written 

489 :rtype: bool 

490 

491 :raises ValueError: If data size is larger than 444 

492 """ 

493 if len(data) > MEM_DATA_MAX_SIZE: 

494 raise ValueError(f"Data size ({len(data)} bytes) exceeds maximum allowed size ({MEM_DATA_MAX_SIZE} bytes)") 

495 

496 request_data = bytearray() 

497 request_data.append(CMD_ID_R_MEMDATA_WRITE) 

498 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

499 request_data.extend(b'M') # Padding dummy data 

500 request_data.extend(data) 

501 

502 self._call_command(request_data) 

503 

504 return True 

505 

506 

507 def mem_data_erase(self, slot : int) -> bool: 

508 """Erase memory slot 

509 

510 :param slot: Memory slot 

511 

512 :returns: True if data was erased 

513 :rtype: bool 

514 """ 

515 request_data = bytearray() 

516 request_data.append(CMD_ID_R_MEMDATA_ERASE) 

517 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

518 

519 self._call_command(request_data) 

520 

521 return True 

522 

523 

524 def ecc_key_generate(self, slot : int, curve : int) -> bool: 

525 """Generate ECC key 

526 

527 :param slot: Slot for key 

528 :param curve: Curve (ECC_CURVE_P256 or ECC_CURVE_ED25519) 

529 

530 :returns: True if key was generated 

531 :rtype: bool 

532 

533 :raises ValueError: If slot is larger than ECC_MAX_KEYS or curve is invalid 

534 """ 

535 if slot > ECC_MAX_KEYS: 

536 raise ValueError("Slot is larger than ECC_MAX_KEYS") 

537 

538 if curve not in [ECC_CURVE_P256, ECC_CURVE_ED25519]: 

539 raise ValueError("Invalid curve") 

540 

541 

542 request_data = bytearray() 

543 request_data.append(CMD_ID_ECC_KEY_GENERATE) 

544 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

545 request_data.append(curve) 

546 

547 self._call_command(request_data) 

548 

549 return True 

550 

551 

552 def ecc_key_store(self, slot : int, curve : int, key : bytes) -> bytes: 

553 """Store own ECC key 

554 

555 :param slot: Slot for key 

556 :param curve: Curve (ECC_CURVE_P256 or ECC_CURVE_ED25519) 

557 :param key: Private key 

558 

559 :returns: True if key was stored 

560 :rtype: bool 

561 

562 :raises ValueError: If slot is larger than ECC_MAX_KEYS or curve is invalid 

563 """ 

564 if slot > ECC_MAX_KEYS: 

565 raise ValueError("Slot is larger than ECC_MAX_KEYS") 

566 

567 if curve not in [ECC_CURVE_P256, ECC_CURVE_ED25519]: 

568 raise ValueError("Invalid curve") 

569 

570 request_data = bytearray() 

571 request_data.append(CMD_ID_ECC_KEY_STORE) 

572 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

573 request_data.append(curve) 

574 request_data.extend(b'\x00'*12) # Padding dummy data (maybe do random?) 

575 request_data.extend(key) 

576 

577 self._call_command(request_data) 

578 

579 return True 

580 

581 

582 def ecc_key_read(self, slot : int) -> EccKeyInfo: 

583 """Read ECC key information from slot 

584 

585 :param slot: Slot for key 

586 

587 :returns: Key information with curve, origin, and public_key 

588 :rtype: EccKeyInfo 

589 

590 :raises ValueError: If slot is larger than ECC_MAX_KEYS 

591 

592 Example:: 

593 

594 key_info = ts.ecc_key_read(0) 

595 if key_info.curve == ECC_CURVE_ED25519: 

596 print("Ed25519 key") 

597 print(key_info.public_key.hex()) 

598 """ 

599 if slot > ECC_MAX_KEYS: 

600 raise ValueError("Slot is larger than ECC_MAX_KEYS") 

601 

602 request_data = bytearray() 

603 request_data.append(CMD_ID_ECC_KEY_READ) 

604 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

605 

606 result = self._call_command(request_data) 

607 

608 curve = result[0] 

609 origin = result[1] 

610 pubkey = result[15:] 

611 

612 return EccKeyInfo(curve, origin, pubkey) 

613 

614 

615 def ecc_key_erase(self, slot : int) -> bool: 

616 """Erase ECC key 

617 

618 :param slot: Slot for key 

619 

620 :returns: True if key was erased 

621 :rtype: bool 

622 

623 :raises ValueError: If slot is larger than ECC_MAX_KEYS 

624 """ 

625 if slot > ECC_MAX_KEYS: 

626 raise ValueError("Slot is larger than ECC_MAX_KEYS") 

627 

628 request_data = bytearray() 

629 request_data.append(CMD_ID_ECC_KEY_ERASE) 

630 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

631 

632 self._call_command(request_data) 

633 

634 return True 

635 

636 

637 def ecdsa_sign(self, slot : int, hash : bytes) -> EcdsaSignature: 

638 """Sign hash with ECDSA using P256 key 

639 

640 :param slot: Slot with P256 ECC key 

641 :param hash: Hash to sign (32 bytes) 

642 

643 :returns: ECDSA signature 

644 :rtype: EcdsaSignature 

645 

646 :raises ValueError: If slot is larger than ECC_MAX_KEYS 

647 

648 Example:: 

649 

650 import hashlib 

651 message_hash = hashlib.sha256(b"Hello").digest() 

652 signature = ts.ecdsa_sign(1, message_hash) 

653 print(signature.r.hex()) 

654 print(signature.s.hex()) 

655 """ 

656 if slot > ECC_MAX_KEYS: 

657 raise ValueError("Slot is larger than ECC_MAX_KEYS") 

658 

659 request_data = bytearray() 

660 request_data.append(CMD_ID_ECDSA_SIGN) 

661 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

662 request_data.extend(b'\x00'*13) # Padding dummy data (maybe do random?) 

663 request_data.extend(hash) 

664 

665 result = self._call_command(request_data) 

666 

667 sign_r = result[15:47] 

668 sign_s = result[47:] 

669 

670 return EcdsaSignature(sign_r, sign_s) 

671 

672 

673 def eddsa_sign(self, slot : int, message : bytes) -> EddsaSignature: 

674 """Sign message with EdDSA using Ed25519 key 

675 

676 :param slot: Slot with Ed25519 ECC key 

677 :param message: Message to sign 

678 

679 :returns: EdDSA signature 

680 :rtype: EddsaSignature 

681 

682 Example:: 

683 

684 signature = ts.eddsa_sign(0, message) 

685 print(signature.r.hex()) 

686 print(signature.s.hex()) 

687 """ 

688 if slot > ECC_MAX_KEYS: 

689 raise ValueError("Slot is larger than ECC_MAX_KEYS") 

690 

691 request_data = bytearray() 

692 request_data.append(CMD_ID_EDDSA_SIGN) 

693 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

694 request_data.extend(b'\x00'*13) # Padding dummy data (maybe do random?) 

695 request_data.extend(message) 

696 

697 result = self._call_command(request_data) 

698 

699 sign_r = result[15:47] 

700 sign_s = result[47:] 

701 

702 return EddsaSignature(sign_r, sign_s) 

703 

704 

705 def mcounter_init(self, index : int, value : int) -> bool: 

706 """Initialize monotonic counter 

707 

708 :param index: Counter index 

709 :param value: Initial value 

710 

711 :returns: True if counter was initialized 

712 :rtype: bool 

713 """ 

714 if index > MCOUNTER_MAX: 

715 raise ValueError("Index is larger than MCOUNTER_MAX") 

716 

717 request_data = bytearray() 

718 request_data.append(CMD_ID_MCOUNTER_INIT) 

719 request_data.extend(index.to_bytes(2, "little")) 

720 request_data.extend(b'A') # Padding dummy data 

721 request_data.extend(value.to_bytes(4, "little")) 

722 

723 self._call_command(request_data) 

724 

725 return True 

726 

727 

728 def mcounter_update(self, index : int) -> bool: 

729 """Decrement monotonic counter 

730 

731 :param index: Counter index 

732 

733 :returns: True if counter was updated 

734 :rtype: bool 

735 """ 

736 if index > MCOUNTER_MAX: 

737 raise ValueError("Index is larger than MCOUNTER_MAX") 

738 

739 request_data = bytearray() 

740 request_data.append(CMD_ID_MCOUNTER_UPDATE) 

741 request_data.extend(index.to_bytes(2, "little")) 

742 

743 self._call_command(request_data) 

744 

745 return True 

746 

747 

748 def mcounter_get(self, index : int) -> int: 

749 """Get monotonic counter value 

750 

751 :param index: Counter index 

752 

753 :returns: Counter value 

754 :rtype: int 

755 """ 

756 if index > MCOUNTER_MAX: 

757 raise ValueError("Index is larger than MCOUNTER_MAX") 

758 

759 request_data = bytearray() 

760 request_data.append(CMD_ID_MCOUNTER_GET) 

761 request_data.extend(index.to_bytes(2, "little")) 

762 

763 result = self._call_command(request_data) 

764 

765 return int.from_bytes(result[3:], "little") 

766 

767 

768 def mac_and_destroy(self, slot: int, data: bytes) -> bytes: 

769 """MAC and destroy operation for atomic PIN verification. 

770 

771 This command executes atomic PIN verification using Keccak-based MAC. 

772 The operation reads a slot from the MAC-and-Destroy partition (128 slots, 0-127), 

773 performs MAC calculation, and destroys/erases the slot data. 

774 

775 The MAC-and-Destroy partition is separate from User Data partition and 

776 uses Keccak engines with PUF-based per-chip unique keys (K_FXA, K_FXB). 

777 

778 :param slot: Slot index in MAC-and-Destroy partition (0-127) 

779 :param data: Data to MAC (must be exactly 32 bytes) 

780 

781 :returns: MAC result (32 bytes) 

782 

783 :raises ValueError: If slot exceeds maximum (127) or data length is not 32 bytes 

784 :raises TropicSquareNoSession: If secure session is not established 

785 

786 .. note:: 

787 Requires active secure session via :meth:`start_secure_session`. 

788 

789 .. seealso:: 

790 TROPIC01 User API v1.1.2, Table 37: MAC_And_Destroy command specification 

791 

792 Example:: 

793 

794 # Start secure session first 

795 ts.start_secure_session( 

796 FACTORY_PAIRING_KEY_INDEX, 

797 FACTORY_PAIRING_PRIVATE_KEY_PROD0, 

798 FACTORY_PAIRING_PUBLIC_KEY_PROD0 

799 ) 

800 

801 # Perform MAC and destroy on slot 0 

802 pin_data = b'my_32_byte_pin_data_here_000' # Exactly 32 bytes 

803 mac_result = ts.mac_and_destroy(0, pin_data) 

804 print(f"MAC: {mac_result.hex()}") # Returns 32-byte MAC 

805 """ 

806 if slot > MAC_AND_DESTROY_MAX: 

807 raise ValueError(f"Slot {slot} exceeds maximum MAC_AND_DESTROY_MAX ({MAC_AND_DESTROY_MAX})") 

808 

809 # Validate data length - must be exactly 32 bytes per API specification 

810 if len(data) != MAC_AND_DESTROY_DATA_SIZE: 

811 raise ValueError( 

812 f"Data must be exactly {MAC_AND_DESTROY_DATA_SIZE} bytes " 

813 f"(got {len(data)} bytes). See TROPIC01 User API Table 37." 

814 ) 

815 

816 request_data = bytearray() 

817 request_data.append(CMD_ID_MAC_AND_DESTROY) 

818 request_data.extend(slot.to_bytes(MEM_ADDRESS_SIZE, "little")) 

819 request_data.extend(b'M') # Padding dummy data 

820 request_data.extend(data) 

821 

822 result = self._call_command(request_data) 

823 

824 return result[3:] 

825 

826 

827 def pairing_key_read(self, slot: int) -> bytes: 

828 """Read pairing key information from slot. 

829 

830 :param slot: Pairing key slot index (0-3) 

831 

832 :returns: Pairing key information (32 bytes) 

833 :rtype: bytes 

834 

835 :raises ValueError: If slot exceeds maximum (3) 

836 """ 

837 if not 0 <= slot <= PAIRING_KEY_MAX: 

838 raise ValueError( 

839 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}" 

840 ) 

841 

842 request_data = bytearray() 

843 request_data.append(CMD_ID_PAIRING_KEY_READ) 

844 request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little")) 

845 result = self._call_command(request_data) 

846 

847 return result[3:] 

848 

849 

850 def pairing_key_write(self, slot: int, key: bytes) -> bool: 

851 """Write pairing key information to slot. 

852 

853 :param slot: Pairing key slot index (0-3) 

854 :param key: Pairing key data (32 bytes) 

855 

856 :returns: True if write succeeded 

857 :rtype: bool 

858 

859 :raises ValueError: If slot exceeds maximum (3) or key length is not 32 bytes 

860 """ 

861 if not 0 <= slot <= PAIRING_KEY_MAX: 

862 raise ValueError( 

863 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}" 

864 ) 

865 

866 if len(key) != PAIRING_KEY_SIZE: 

867 raise ValueError(f"Key must be exactly {PAIRING_KEY_SIZE} bytes") 

868 

869 request_data = bytearray() 

870 request_data.append(CMD_ID_PAIRING_KEY_WRITE) 

871 request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little")) 

872 request_data.extend(b'M') # Padding dummy data 

873 request_data.extend(key) 

874 

875 result = self._call_command(request_data) 

876 

877 return True 

878 

879 

880 def pairing_key_invalidate(self, slot: int) -> bool: 

881 """Invalidate pairing key in slot. 

882 

883 :param slot: Pairing key slot index (0-3) 

884 

885 :returns: True if successful 

886 :rtype: bool 

887 

888 :raises ValueError: If slot exceeds maximum (3) 

889 """ 

890 if not 0 <= slot <= PAIRING_KEY_MAX: 

891 raise ValueError( 

892 f"Pairing key slot must be in range 0-{PAIRING_KEY_MAX}, got {slot}" 

893 ) 

894 

895 request_data = bytearray() 

896 request_data.append(CMD_ID_PAIRING_KEY_INVALIDATE) 

897 request_data.extend(slot.to_bytes(PAIRING_ADDRESS_SIZE, "little")) 

898 

899 self._call_command(request_data) 

900 

901 return True 

902 

903 def _call_command(self, data): 

904 if self._secure_session is None: 

905 raise TropicSquareNoSession("Secure session not started") 

906 

907 nonce = self._secure_session[2].to_bytes(12, "little") 

908 data = bytes(data) 

909 

910 enc = self._secure_session[0].encrypt(nonce=nonce, data=data, associated_data=b'') 

911 ciphertext = enc[:-16] 

912 tag = enc[-16:] 

913 

914 result_cipher, result_tag = self._l2.encrypted_command(len(ciphertext), ciphertext, tag) 

915 decrypted = self._secure_session[1].decrypt(nonce=nonce, data=result_cipher+result_tag, associated_data=b'') 

916 

917 self._secure_session[2] += 1 

918 

919 raise_for_cmd_result(decrypted[0]) 

920 

921 return decrypted[1:] 

922 

923 

924 def _get_ephemeral_keypair(self): 

925 raise NotImplementedError("Not implemented") 

926 

927 

928 def _hkdf(self, salt, shared_secret, length=1): 

929 raise NotImplementedError("Not implemented") 

930 

931 

932 def _x25519_exchange(self, private_bytes, public_bytes): 

933 raise NotImplementedError("Not implemented") 

934 

935 

936 def _aesgcm(self, key): 

937 raise NotImplementedError("Not implemented")