"""TCP Transport for libtropic Model/Simulator
This module provides a TCP transport implementation that communicates
with the TROPIC01 model/simulator server using the libtropic tagged protocol.
The transport enables pytropicsquare to work with the same model/simulator
infrastructure that libtropic uses, making it useful for:
- Hardware-in-the-loop testing
- Chip simulation and development
- Cross-platform testing (CPython and MicroPython)
Protocol Details:
The transport implements the libtropic TCP protocol with tagged messages:
- Buffer format:
- tag (1)
- length (2 LE)
- payload (0-256)
- Request-response pattern with tag validation
- Automatic retry logic for network operations
Example::
from tropicsquare import TropicSquare
from tropicsquare.transports.tcp import TcpTransport
# Connect to model server
transport = TcpTransport("127.0.0.1")
ts = TropicSquare(transport)
# Use chip normally
print(ts.chip_id)
:note: Server must be running libtropic-compatible model/simulator from https://github.com/tropicsquare/ts-tvl/
"""
import socket
from tropicsquare.transports import L1Transport
from tropicsquare.exceptions import TropicSquareError, TropicSquareTimeoutError
[docs]
class TcpTransport(L1Transport):
"""L1 transport for TCP connection to libtropic model/simulator.
Implements the libtropic tagged protocol for communicating with
the TROPIC01 model/simulator server via TCP socket.
The transport maps L1 SPI operations to tagged TCP commands:
- ``_cs_low()`` → TAG_CSN_LOW (0x01)
- ``_cs_high()`` → TAG_CSN_HIGH (0x02)
- ``_transfer()`` → TAG_SPI_SEND (0x03)
- ``_read()`` → TAG_SPI_SEND (0x03) with dummy bytes
:param host: Hostname or IP address of the model server
:param port: Port number for the TCP connection (default: 28992)
:param timeout: Socket timeout in seconds (default: 5.0)
:raises TropicSquareError: If connection fails
Example::
transport = TcpTransport("127.0.0.1")
ts = TropicSquare(transport)
print(ts.chip_id)
"""
# Protocol tags
TAG_CSN_LOW = 0x01
TAG_CSN_HIGH = 0x02
TAG_SPI_SEND = 0x03
TAG_WAIT = 0x06
TAG_INVALID = 0xfd
TAG_UNSUPPORTED = 0xfe
# Protocol limits
MAX_PAYLOAD_LEN = 256
MAX_BUFFER_LEN = 259 # 3 (tag + length) + 256 (payload)
TX_ATTEMPTS = 3
RX_ATTEMPTS = 3
@staticmethod
def _is_timeout_exception(error: Exception) -> bool:
if isinstance(error, OSError):
if getattr(error, "errno", None) in (110, 60):
return True
return "timed out" in str(error).lower()
[docs]
def __init__(
self,
host: str,
port: int = 28992,
timeout: float = 5.0,
connect_timeout: float = 1.0,
):
"""Initialize TCP transport.
:param host: Hostname or IP address of the model server
:param port: Port number for the TCP connection (default: 28992)
:param timeout: Socket I/O timeout in seconds (default: 5.0)
:param connect_timeout: Connect timeout per resolved address in seconds (default: 1.0)
:raises TropicSquareError: If connection fails
"""
self._sock = None
try:
addrinfos = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, 0)
errors = []
for family, socktype, proto, _, sockaddr in addrinfos:
sock = None
try:
sock = socket.socket(family, socktype, proto)
sock.settimeout(connect_timeout)
sock.connect(sockaddr)
sock.settimeout(timeout)
self._sock = sock
break
except Exception as e:
errors.append(f"{sockaddr}: {e}")
if sock is not None:
sock.close()
if self._sock is None:
if errors:
summary = "; ".join(errors)
raise OSError(summary)
raise OSError("No resolved addresses")
except Exception as e:
if self._sock is not None:
self._sock.close()
raise TropicSquareError(
f"Failed to connect to {host}:{port}: {e}"
)
def _transfer(self, tx_data: bytes) -> bytes:
"""SPI bidirectional transfer.
Corresponds to SPI write_readinto operation.
Sends tx_data and receives same number of bytes back.
:param tx_data: Data to transmit
:returns: Received data (same length as tx_data)
:raises TropicSquareError: If transfer length mismatch occurs
"""
rx_data = self._communicate(self.TAG_SPI_SEND, tx_data)
if len(rx_data) != len(tx_data):
raise TropicSquareError(
f"Transfer length mismatch: sent {len(tx_data)}, "
f"received {len(rx_data)}"
)
return rx_data
def _read(self, length: int) -> bytes:
"""SPI read operation.
Corresponds to SPI read operation.
Sends dummy bytes (all zeros) and reads response.
:param length: Number of bytes to read
:returns: Read data
"""
# Send dummy bytes (all zeros) to clock out data
return self._communicate(self.TAG_SPI_SEND, bytes(length))
def _cs_low(self) -> None:
"""Activate chip select (CS to logic 0).
Sends TAG_CSN_LOW command to the model server.
"""
self._communicate(self.TAG_CSN_LOW)
def _cs_high(self) -> None:
"""Deactivate chip select (CS to logic 1).
Sends TAG_CSN_HIGH command to the model server.
"""
self._communicate(self.TAG_CSN_HIGH)
def _send_all(self, data: bytes) -> None:
"""Send all data with retry logic.
:param data: Data to send
:raises TropicSquareError: If send fails
:raises TropicSquareTimeoutError: If send times out after retries
"""
total_sent = 0
attempts = 0
while total_sent < len(data) and attempts < self.TX_ATTEMPTS:
try:
sent = self._sock.send(data[total_sent:])
if sent == 0:
raise TropicSquareError("Connection lost during send")
total_sent += sent
except Exception as e:
if self._is_timeout_exception(e):
attempts += 1
if attempts >= self.TX_ATTEMPTS:
raise TropicSquareTimeoutError(
f"Send timeout after {attempts} attempts"
)
continue
raise TropicSquareError(f"Send failed: {e}")
if total_sent < len(data):
raise TropicSquareError(
f"Sent {total_sent}/{len(data)} bytes after {attempts} attempts"
)
def _recv_exact(self, num_bytes: int) -> bytes:
"""Receive exactly num_bytes with retry logic.
:param num_bytes: Number of bytes to receive
:returns: Received data
:raises TropicSquareError: If receive fails or connection lost
:raises TropicSquareTimeoutError: If receive times out after retries
"""
received = bytearray()
attempts = 0
while len(received) < num_bytes and attempts < self.RX_ATTEMPTS:
try:
chunk = self._sock.recv(num_bytes - len(received))
if not chunk:
raise TropicSquareError("Connection lost during receive")
received.extend(chunk)
except Exception as e:
if self._is_timeout_exception(e):
attempts += 1
if attempts >= self.RX_ATTEMPTS:
raise TropicSquareTimeoutError(
f"Receive timeout after {attempts} attempts"
)
continue
raise TropicSquareError(f"Receive failed: {e}")
if len(received) < num_bytes:
raise TropicSquareError(
f"Received {len(received)}/{num_bytes} bytes"
)
return bytes(received)
def _communicate(self, tag: int, tx_payload: bytes = None) -> bytes:
"""Send tagged request and receive response.
Implements the libtropic TCP protocol:
1. Build TX buffer: [tag (1)][length (2 LE)][payload (0-256)]
2. Send all bytes with retry logic
3. Receive response header (tag + length)
4. Validate tag matches or is error tag
5. Receive payload if length > 0
6. Return payload data
:param tag: Protocol tag (TAG_CSN_LOW, TAG_CSN_HIGH, TAG_SPI_SEND, etc.)
:param tx_payload: Optional payload data (max 256 bytes)
:returns: Response payload (or empty bytes if no payload)
:raises TropicSquareError: On protocol or network errors
:raises TropicSquareTimeoutError: On timeout
"""
# Build TX buffer: [tag][length LE][payload]
payload_len = len(tx_payload) if tx_payload else 0
if payload_len > self.MAX_PAYLOAD_LEN:
raise TropicSquareError(
f"Payload too large: {payload_len} > {self.MAX_PAYLOAD_LEN}"
)
tx_buffer = bytearray(self.MAX_BUFFER_LEN)
tx_buffer[0] = tag
tx_buffer[1] = payload_len & 0xFF # Little-endian low byte
tx_buffer[2] = (payload_len >> 8) & 0xFF # Little-endian high byte
if tx_payload:
tx_buffer[3:3 + payload_len] = tx_payload
tx_size = 3 + payload_len
# Send request
self._send_all(bytes(tx_buffer[:tx_size]))
# Receive response header (tag + length)
rx_header = self._recv_exact(3)
rx_tag = rx_header[0]
rx_len = rx_header[1] | (rx_header[2] << 8) # Little-endian
# Validate tag
if rx_tag == self.TAG_INVALID:
raise TropicSquareError(
f"Server doesn't recognize tag {tag:#04x}"
)
elif rx_tag == self.TAG_UNSUPPORTED:
raise TropicSquareError(
f"Server doesn't support tag {tag:#04x}"
)
elif rx_tag != tag:
raise TropicSquareError(
f"Tag mismatch: sent {tag:#04x}, received {rx_tag:#04x}"
)
# Validate payload length
if rx_len > self.MAX_PAYLOAD_LEN:
raise TropicSquareError(
f"Invalid payload length: {rx_len} > {self.MAX_PAYLOAD_LEN}"
)
# Receive payload if present
if rx_len > 0:
rx_payload = self._recv_exact(rx_len)
else:
rx_payload = b''
return rx_payload