Source code for tropicsquare.transports.uart

"""UART SPI Transport Implementation

"""

import sys
from tropicsquare.transports import L1Transport


[docs] class UartTransport(L1Transport): """L1 transport for UART :param uart: UART interface object (e.g., machine.UART instance) """
[docs] def __init__(self, port: str, baudrate: int = 115200): """Initialize UART transport. :param port: UART port name (e.g. /dev/ttyACM0) :param baudrate: Baud rate for UART communication """ if sys.implementation.name == "micropython": if sys.platform == "linux": import os import termios # For micropython on linux open serial as file fd = os.open(port, os.O_RDWR | os.O_NOCTTY) termios.setraw(fd) # Set RAW os.close(fd) self._port = open(port, "r+b", buffering=0) # Reopen serial device self._flush = False # Micropython file does not supports flush else: raise RuntimeError("Unsupported platform for Micropython: {}".format(sys.platform)) elif sys.implementation.name == "cpython": from serial import Serial self._port = Serial(port, baudrate) self._flush = True else: raise RuntimeError("Unsupported Python implementation: {}".format(sys.implementation.name))
def _transfer(self, tx_data: bytes) -> bytes: """SPI transfer using write_readinto. :param tx_data: Data to transmit :returns: Received data """ # Write data hex_data = tx_data.hex().upper() + "x\n" self._port.write(hex_data.encode()) if self._flush: self._port.flush() # Read data hex_line = self._port.readline().decode().strip() rx_buffer = bytes.fromhex(hex_line) return rx_buffer def _read(self, length: int) -> bytes: """SPI read operation. :param length: Number of bytes to read :returns: Read data """ # Send read command with length of dummy bytes self._port.write(b"00" * length + b"x\n") if self._flush: self._port.flush() # Read data hex_line = self._port.readline().decode().strip() data = bytes.fromhex(hex_line) return data def _set_cs(self, state: bool): self._port.write("CS={}\n".format("1" if state else "0").encode()) if self._flush: self._port.flush() self._port.readline() # read OK def _cs_low(self) -> None: self._set_cs(False) def _cs_high(self) -> None: self._set_cs(True) def _close(self): self._port.close()