Coverage for tropicsquare / transports / network.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 21:24 +0000

1"""Network SPI Transport Implementation 

2 

3This module provides a network-based SPI transport implementation. 

4for https://github.com/petrkr/netbridge32 SPI bridge 

5""" 

6 

7from tropicsquare.transports import L1Transport 

8from tropicsquare.exceptions import TropicSquareError 

9 

10import socket 

11 

12class NetworkSpiTransport(L1Transport): 

13 """L1 transport for network-based SPI bridge. 

14 

15 :param host: Hostname or IP address of the SPI bridge 

16 :param port: Port number for the SPI connection (default: 12345) 

17 :param timeout: Socket timeout in seconds (default: 5.0) 

18 """ 

19 

20 COMMAND_READ = b'\x01' 

21 COMMAND_WRITE_READINTO = b'\x08' 

22 

23 COMMAND_CS_LOW = b'\x10' 

24 COMMAND_CS_HIGH = b'\x20' 

25 

26 def __init__( 

27 self, 

28 host: str, 

29 port: int = 12345, 

30 timeout: float = 5.0, 

31 connect_timeout: float = 1.0, 

32 ): 

33 """Initialize Network SPI transport. 

34 

35 :param host: Hostname or IP address of the SPI bridge 

36 :param port: Port number for the SPI connection (default: 12345) 

37 :param timeout: Socket I/O timeout in seconds (default: 5.0) 

38 :param connect_timeout: Connect timeout per resolved address in seconds (default: 1.0) 

39 """ 

40 self._sock = None 

41 try: 

42 addrinfos = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, 0) 

43 errors = [] 

44 for family, socktype, proto, _, sockaddr in addrinfos: 

45 sock = None 

46 try: 

47 sock = socket.socket(family, socktype, proto) 

48 sock.settimeout(connect_timeout) 

49 sock.connect(sockaddr) 

50 sock.settimeout(timeout) 

51 self._sock = sock 

52 break 

53 except Exception as e: 

54 errors.append(f"{sockaddr}: {e}") 

55 if sock is not None: 

56 sock.close() 

57 if self._sock is None: 

58 if errors: 

59 summary = "; ".join(errors) 

60 raise OSError(summary) 

61 raise OSError("No resolved addresses") 

62 except Exception as e: 

63 if self._sock is not None: 

64 self._sock.close() 

65 raise TropicSquareError( 

66 f"Failed to connect to {host}:{port}: {e}" 

67 ) 

68 

69 def _transfer(self, write_buf: bytes) -> bytes: 

70 command = self.COMMAND_WRITE_READINTO 

71 data = bytes(write_buf) 

72 length = len(data) 

73 packet = command + length.to_bytes(4, 'big') + data 

74 self._sock.send(packet) 

75 

76 received = b'' 

77 while len(received) < length: 

78 chunk = self._sock.recv(length - len(received)) 

79 if not chunk: 

80 raise RuntimeError("Connection lost during SPI transfer") 

81 received += chunk 

82 

83 return received 

84 

85 

86 def _read(self, length: int) -> bytes: 

87 command = self.COMMAND_READ 

88 packet = command + length.to_bytes(4, 'big') 

89 self._sock.send(packet) 

90 

91 received = b'' 

92 while len(received) < length: 

93 chunk = self._sock.recv(length - len(received)) 

94 if not chunk: 

95 raise Exception("Connection lost during SPI transfer") 

96 received += chunk 

97 

98 return received 

99 

100 

101 def _set_cs(self, state: bool): 

102 """Sends a command (0x01) to set the chip select state. 

103 The state is sent as 1 byte (0 for low, 1 for high).""" 

104 command = self.COMMAND_CS_HIGH if state else self.COMMAND_CS_LOW 

105 self._sock.send(command) 

106 

107 ack = self._sock.recv(1) 

108 if ack != b'\x00': 

109 raise RuntimeError("Chip select command failed, ack: " + str(ack)) 

110 

111 

112 def _cs_low(self) -> None: 

113 self._set_cs(False) 

114 

115 

116 def _cs_high(self) -> None: 

117 self._set_cs(True)