Coverage for tropicsquare / transports / network.py: 0%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
1"""Network SPI Transport Implementation
3This module provides a network-based SPI transport implementation.
4for https://github.com/petrkr/netbridge32 SPI bridge
5"""
7from tropicsquare.transports import L1Transport
8from tropicsquare.exceptions import TropicSquareError
10import socket
12class NetworkSpiTransport(L1Transport):
13 """L1 transport for network-based SPI bridge.
15 :param host: Hostname or IP address of the SPI bridge
16 :param port: Port number for the SPI connection (default: 12345)
17 :param timeout: Socket timeout in seconds (default: 5.0)
18 """
20 COMMAND_READ = b'\x01'
21 COMMAND_WRITE_READINTO = b'\x08'
23 COMMAND_CS_LOW = b'\x10'
24 COMMAND_CS_HIGH = b'\x20'
26 def __init__(
27 self,
28 host: str,
29 port: int = 12345,
30 timeout: float = 5.0,
31 connect_timeout: float = 1.0,
32 ):
33 """Initialize Network SPI transport.
35 :param host: Hostname or IP address of the SPI bridge
36 :param port: Port number for the SPI connection (default: 12345)
37 :param timeout: Socket I/O timeout in seconds (default: 5.0)
38 :param connect_timeout: Connect timeout per resolved address in seconds (default: 1.0)
39 """
40 self._sock = None
41 try:
42 addrinfos = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, 0)
43 errors = []
44 for family, socktype, proto, _, sockaddr in addrinfos:
45 sock = None
46 try:
47 sock = socket.socket(family, socktype, proto)
48 sock.settimeout(connect_timeout)
49 sock.connect(sockaddr)
50 sock.settimeout(timeout)
51 self._sock = sock
52 break
53 except Exception as e:
54 errors.append(f"{sockaddr}: {e}")
55 if sock is not None:
56 sock.close()
57 if self._sock is None:
58 if errors:
59 summary = "; ".join(errors)
60 raise OSError(summary)
61 raise OSError("No resolved addresses")
62 except Exception as e:
63 if self._sock is not None:
64 self._sock.close()
65 raise TropicSquareError(
66 f"Failed to connect to {host}:{port}: {e}"
67 )
69 def _transfer(self, write_buf: bytes) -> bytes:
70 command = self.COMMAND_WRITE_READINTO
71 data = bytes(write_buf)
72 length = len(data)
73 packet = command + length.to_bytes(4, 'big') + data
74 self._sock.send(packet)
76 received = b''
77 while len(received) < length:
78 chunk = self._sock.recv(length - len(received))
79 if not chunk:
80 raise RuntimeError("Connection lost during SPI transfer")
81 received += chunk
83 return received
86 def _read(self, length: int) -> bytes:
87 command = self.COMMAND_READ
88 packet = command + length.to_bytes(4, 'big')
89 self._sock.send(packet)
91 received = b''
92 while len(received) < length:
93 chunk = self._sock.recv(length - len(received))
94 if not chunk:
95 raise Exception("Connection lost during SPI transfer")
96 received += chunk
98 return received
101 def _set_cs(self, state: bool):
102 """Sends a command (0x01) to set the chip select state.
103 The state is sent as 1 byte (0 for low, 1 for high)."""
104 command = self.COMMAND_CS_HIGH if state else self.COMMAND_CS_LOW
105 self._sock.send(command)
107 ack = self._sock.recv(1)
108 if ack != b'\x00':
109 raise RuntimeError("Chip select command failed, ack: " + str(ack))
112 def _cs_low(self) -> None:
113 self._set_cs(False)
116 def _cs_high(self) -> None:
117 self._set_cs(True)