Coverage for tropicsquare / transports / uart.py: 0%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 21:24 +0000
1"""UART SPI Transport Implementation
3"""
5import sys
6from tropicsquare.transports import L1Transport
9class UartTransport(L1Transport):
10 """L1 transport for UART
12 :param uart: UART interface object (e.g., machine.UART instance)
13 """
16 def __init__(self, port: str, baudrate: int = 115200):
17 """Initialize UART transport.
19 :param port: UART port name (e.g. /dev/ttyACM0)
20 :param baudrate: Baud rate for UART communication
21 """
23 if sys.implementation.name == "micropython":
24 if sys.platform == "linux":
25 import os
26 import termios
27 # For micropython on linux open serial as file
28 fd = os.open(port, os.O_RDWR | os.O_NOCTTY)
29 termios.setraw(fd) # Set RAW
30 os.close(fd)
31 self._port = open(port, "r+b", buffering=0) # Reopen serial device
32 self._flush = False # Micropython file does not supports flush
33 else:
34 raise RuntimeError("Unsupported platform for Micropython: {}".format(sys.platform))
36 elif sys.implementation.name == "cpython":
37 from serial import Serial
38 self._port = Serial(port, baudrate)
39 self._flush = True
40 else:
41 raise RuntimeError("Unsupported Python implementation: {}".format(sys.implementation.name))
44 def _transfer(self, tx_data: bytes) -> bytes:
45 """SPI transfer using write_readinto.
47 :param tx_data: Data to transmit
49 :returns: Received data
50 """
51 # Write data
52 hex_data = tx_data.hex().upper() + "x\n"
53 self._port.write(hex_data.encode())
54 if self._flush: self._port.flush()
56 # Read data
57 hex_line = self._port.readline().decode().strip()
58 rx_buffer = bytes.fromhex(hex_line)
60 return rx_buffer
63 def _read(self, length: int) -> bytes:
64 """SPI read operation.
66 :param length: Number of bytes to read
68 :returns: Read data
69 """
70 # Send read command with length of dummy bytes
71 self._port.write(b"00" * length + b"x\n")
72 if self._flush: self._port.flush()
74 # Read data
75 hex_line = self._port.readline().decode().strip()
76 data = bytes.fromhex(hex_line)
77 return data
80 def _set_cs(self, state: bool):
81 self._port.write("CS={}\n".format("1" if state else "0").encode())
82 if self._flush: self._port.flush()
83 self._port.readline() # read OK
86 def _cs_low(self) -> None:
87 self._set_cs(False)
90 def _cs_high(self) -> None:
91 self._set_cs(True)
94 def _close(self):
95 self._port.close()