Coverage for tropicsquare / transports / uart.py: 0%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 21:24 +0000

1"""UART SPI Transport Implementation 

2 

3""" 

4 

5import sys 

6from tropicsquare.transports import L1Transport 

7 

8 

9class UartTransport(L1Transport): 

10 """L1 transport for UART 

11 

12 :param uart: UART interface object (e.g., machine.UART instance) 

13 """ 

14 

15 

16 def __init__(self, port: str, baudrate: int = 115200): 

17 """Initialize UART transport. 

18 

19 :param port: UART port name (e.g. /dev/ttyACM0) 

20 :param baudrate: Baud rate for UART communication 

21 """ 

22 

23 if sys.implementation.name == "micropython": 

24 if sys.platform == "linux": 

25 import os 

26 import termios 

27 # For micropython on linux open serial as file 

28 fd = os.open(port, os.O_RDWR | os.O_NOCTTY) 

29 termios.setraw(fd) # Set RAW 

30 os.close(fd) 

31 self._port = open(port, "r+b", buffering=0) # Reopen serial device 

32 self._flush = False # Micropython file does not supports flush 

33 else: 

34 raise RuntimeError("Unsupported platform for Micropython: {}".format(sys.platform)) 

35 

36 elif sys.implementation.name == "cpython": 

37 from serial import Serial 

38 self._port = Serial(port, baudrate) 

39 self._flush = True 

40 else: 

41 raise RuntimeError("Unsupported Python implementation: {}".format(sys.implementation.name)) 

42 

43 

44 def _transfer(self, tx_data: bytes) -> bytes: 

45 """SPI transfer using write_readinto. 

46 

47 :param tx_data: Data to transmit 

48 

49 :returns: Received data 

50 """ 

51 # Write data 

52 hex_data = tx_data.hex().upper() + "x\n" 

53 self._port.write(hex_data.encode()) 

54 if self._flush: self._port.flush() 

55 

56 # Read data 

57 hex_line = self._port.readline().decode().strip() 

58 rx_buffer = bytes.fromhex(hex_line) 

59 

60 return rx_buffer 

61 

62 

63 def _read(self, length: int) -> bytes: 

64 """SPI read operation. 

65 

66 :param length: Number of bytes to read 

67 

68 :returns: Read data 

69 """ 

70 # Send read command with length of dummy bytes 

71 self._port.write(b"00" * length + b"x\n") 

72 if self._flush: self._port.flush() 

73 

74 # Read data 

75 hex_line = self._port.readline().decode().strip() 

76 data = bytes.fromhex(hex_line) 

77 return data 

78 

79 

80 def _set_cs(self, state: bool): 

81 self._port.write("CS={}\n".format("1" if state else "0").encode()) 

82 if self._flush: self._port.flush() 

83 self._port.readline() # read OK 

84 

85 

86 def _cs_low(self) -> None: 

87 self._set_cs(False) 

88 

89 

90 def _cs_high(self) -> None: 

91 self._set_cs(True) 

92 

93 

94 def _close(self): 

95 self._port.close()