from amaranth import * from amaranth.lib.fifo import SyncFIFO from amaranth_soc.csr import * from amlib.io.serial import * from math import ceil, log2 class UART(Elaboratable): """ CSR-enabled UART TX/RX peripheral. Parameters ---------- :param clk_freq: System clock frequency, used for default divisor calculation. :param default_baud: Default baud rate to set divisor for. :param fifo_depth: Depth (in bytes) of RX and TX FIFOs. :param pins: Optional parameter to supply platform pins into module. Attributes ---------- :attr bus: CSR bus to provide access to control registers. :attr tx: TX signal. Only created if pins=None, connected to AsyncSerial.tx.o :attr rx: RX signal. Only created if pins=None, connected to AsyncSerial.rx.o """ def __init__(self, clk_freq, default_baud=115200, fifo_depth=128, pins=None): self.fifo_depth = fifo_depth self._pins = pins # Clock divisor register # # Sets input/output baudrate to system clock / divisor. Resets to value # that provides 115200 baud rate. Writes to this register clear FIFOs. self.DIVISOR = Element(16, Element.Access.RW, name="UART_DIVISOR") # Status register. # # Fields: # [0]: txfifo_full # [1]: txfifo_empty # [2]: rxfifo_full # [3]: rxfifo_empty self.SR = Element(4, Element.Access.R, name="UART_SR") # Data register. # # Writes push data into TX FIFO, and are discarded if full, reads pull # data from RX FIFO, and are invalid if it is empty. Incoming bytes are discarded # if the RX FIFO is full. self.DR = Element(8, Element.Access.RW, name="UART_DR") # Set up CSR bus addr_width = ceil(log2(64)) data_width = 8 self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width) div_start, _stop = self._csr_mux.add(self.DIVISOR) sr_start, _stop = self._csr_mux.add(self.SR) dr_start, _stop = self._csr_mux.add(self.DR) print(f"UART added. DIVISOR 0x{div_start:x}, SR 0x{sr_start:x}, DR 0x{dr_start:x}") self.bus = self._csr_mux.bus # Actual business logic self._serial = AsyncSerial(divisor=int(clk_freq // default_baud), divisor_bits=16, data_bits=8, parity="none", pins=pins) self._tx_fifo = SyncFIFO(width=8, depth=self.fifo_depth) self._rx_fifo = SyncFIFO(width=8, depth=self.fifo_depth) # Optional RX/TX signals if self._pins is None: self.tx = Signal() self.rx = Signal() def elaborate(self, platform): m = Module() # Seperate clock domain to allow for resetting FIFOs separately m.domains += ClockDomain("fifo", local=True) m.d.comb += ClockSignal("fifo").eq(ClockSignal("sync")) m.d.comb += ResetSignal("fifo").eq(self.DIVISOR.w_stb) # Reset on a write to DIVISOR as well fifo_domain = DomainRenamer("fifo") self._tx_fifo = fifo_domain(self._tx_fifo) self._rx_fifo = fifo_domain(self._rx_fifo) m.submodules.serial = self._serial m.submodules.tx_fifo = self._tx_fifo m.submodules.rx_fifo = self._rx_fifo m.submodules.csr_mux = self._csr_mux # Hook up divisor to register. # TODO do some validation and write a known good value if a dumb value was provided m.d.comb += self.DIVISOR.r_data.eq(self._serial.divisor) with m.If(self.DIVISOR.w_stb): m.d.sync += self._serial.divisor.eq(self.DIVISOR.w_data) # SR Hookups m.d.comb += [ self.SR.r_data[0].eq(self._tx_fifo.level == self.fifo_depth), # txfifo_full self.SR.r_data[1].eq(self._tx_fifo.level == 0), # txfifo_empty self.SR.r_data[2].eq(self._rx_fifo.level == self.fifo_depth), # rxfifo_full self.SR.r_data[3].eq(self._rx_fifo.level == 0), # rxfifo_empty ] # DR hookups m.d.comb += [ # Plumb read data in, and connect CSR read strobe to FIFO r_en. # We can ignore r_rdy because we specify empty reads are invalid. self.DR.r_data.eq(self._rx_fifo.r_data), self._rx_fifo.r_en.eq(self.DR.r_stb), # Plumb write data from CSR to FIFO, connect write strobe to FIFO w_en. # We can ignore w_rdy, because we specify writes to a full FIFO are dropped. self._tx_fifo.w_data.eq(self.DR.w_data), self._tx_fifo.w_en.eq(self.DR.w_stb), ] # Hook serial devices into FIFOs rx_err = Signal() m.d.comb += [ # RX rx_err.eq(self._serial.rx.err.overflow & self._serial.rx.err.frame & self._serial.rx.err.parity), self._rx_fifo.w_data.eq(self._serial.rx.data), self._rx_fifo.w_en.eq(self._serial.rx.rdy & ~rx_err), # Only pull data into FIFO if no RX error self._serial.rx.ack.eq(self._rx_fifo.w_rdy | rx_err), # Pull data out if there is an error anyways # TX self._serial.tx.data.eq(self._tx_fifo.r_data), self._serial.tx.ack.eq(self._tx_fifo.r_rdy), self._tx_fifo.r_en.eq(self._serial.tx.rdy), ] # Optionally connect out RX/TX signals, if pins are not provided (likely in sim) if self._pins is None: m.d.comb += [ self._serial.rx.i.eq(self.rx), self.tx.eq(self._serial.tx.o), ] return m