diff --git a/gateware/i2c.py b/gateware/i2c.py index 31134bd..f6f5fe9 100644 --- a/gateware/i2c.py +++ b/gateware/i2c.py @@ -1,8 +1,6 @@ from amaranth import * -from amaranth.lib.coding import PriorityEncoder from amaranth_soc.csr import * -from amaranth_soc.memory import * -from amlib.io.i2c import I2CInitiator, I2CTarget +from amlib.io.i2c import I2CInitiator from amaranth.sim import * diff --git a/gateware/main.py b/gateware/main.py index 6ccd5da..1280d5f 100644 --- a/gateware/main.py +++ b/gateware/main.py @@ -19,6 +19,7 @@ from led import * import i2c import test_i2c +import uart # To change clock domain of a module: # new_thing = DomainRenamer("new_clock")(MyElaboratable()) @@ -100,6 +101,8 @@ class Core(Elaboratable): # Connect arbiter to decoder m.d.comb += self.arbiter.bus.connect(self.decoder.bus) + m.submodules.uart = uart.UART(10e6) + # Counter #m.d.sync += self.count.eq(self.count + 1) #with m.If(self.count >= 50000000): diff --git a/gateware/test_uart.py b/gateware/test_uart.py new file mode 100644 index 0000000..49ea6b3 --- /dev/null +++ b/gateware/test_uart.py @@ -0,0 +1,17 @@ +from amaranth import * +from amaranth.sim import * +from uart import * +from tests import BaseTestClass, provide_testcase_name + + +class TestHarness(Elaboratable): + def __init__(self): + self.uut = UART(10e6) + + + def elaborate(self, platform): + assert platform is None + + m = Module() + + return m \ No newline at end of file diff --git a/gateware/uart.py b/gateware/uart.py new file mode 100644 index 0000000..3380eb7 --- /dev/null +++ b/gateware/uart.py @@ -0,0 +1,144 @@ +from amaranth import * +from amaranth.lib.fifo import SyncFIFO +from amaranth_soc.csr import * +from amlib.io.serial import * + +from math import ceil, log2 + + +class UART(Elaboratable): + """ + CSR-enabled UART TX/RX peripheral. + + Parameters + ---------- + :param clk_freq: + System clock frequency, used for default divisor calculation. + :param default_baud: + Default baud rate to set divisor for. + :param fifo_depth: + Depth (in bytes) of RX and TX FIFOs. + :param pins: + Optional parameter to supply platform pins into module. + + Attributes + ---------- + :attr bus: + CSR bus to provide access to control registers. + :attr tx: + TX signal. Only created if pins=None, connected to AsyncSerial.tx.o + :attr rx: + RX signal. Only created if pins=None, connected to AsyncSerial.rx.o + """ + def __init__(self, clk_freq, default_baud=115200, fifo_depth=128, pins=None): + self.fifo_depth = fifo_depth + self._pins = pins + + # Clock divisor register + # + # Sets input/output baudrate to system clock / divisor. Resets to value + # that provides 115200 baud rate. Writes to this register clear FIFOs. + self.DIVISOR = Element(16, Element.Access.RW, name="DIVISOR") + + # Status register. + # + # Fields: + # [0]: txfifo_full + # [1]: txfifo_empty + # [2]: rxfifo_full + # [3]: rxfifo_empty + self.SR = Element(4, Element.Access.R, name="SR") + + # Data register. + # + # Writes push data into TX FIFO, and are discarded if full, reads pull + # data from RX FIFO, and are invalid if it is empty. Incoming bytes are discarded + # if the RX FIFO is full. + self.DR = Element(8, Element.Access.RW, name="DR") + + # Set up CSR bus + addr_width = ceil(log2(64)) + data_width = 32 + self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width) + self._csr_mux.add(self.DIVISOR) + self._csr_mux.add(self.SR) + self._csr_mux.add(self.DR) + self.bus = self._csr_mux.bus + + # Actual business logic + self._serial = AsyncSerial(divisor=int(clk_freq // default_baud), divisor_bits=16, data_bits=8, parity="none", pins=pins) + self._tx_fifo = SyncFIFO(width=8, depth=self.fifo_depth) + self._rx_fifo = SyncFIFO(width=8, depth=self.fifo_depth) + + # Optional RX/TX signals + if self._pins is None: + self.tx = Signal() + self.rx = Signal() + + + def elaborate(self, platform): + m = Module() + + # Seperate clock domain to allow for resetting FIFOs separately + m.domains += ClockDomain("fifo", local=True) + m.d.comb += ClockSignal("fifo").eq(ClockSignal("sync")) + m.d.comb += ResetSignal("fifo").eq(self.DIVISOR.w_stb) # Reset on a write to DIVISOR as well + + fifo_domain = DomainRenamer("fifo") + self._tx_fifo = fifo_domain(self._tx_fifo) + self._rx_fifo = fifo_domain(self._rx_fifo) + + m.submodules.serial = self._serial + m.submodules.tx_fifo = self._tx_fifo + m.submodules.rx_fifo = self._rx_fifo + m.submodules.csr_mux = self._csr_mux + + # Hook up divisor to register. + m.d.comb += self.DIVISOR.r_data.eq(self._serial.divisor) + with m.If(self.DIVISOR.w_stb): + m.d.sync += self._serial.divisor.eq(self.DIVISOR.w_data) + + # SR Hookups + m.d.comb += [ + self.SR.r_data[0].eq(self._tx_fifo.level < self.fifo_depth), # txfifo_full + self.SR.r_data[1].eq(self._tx_fifo.level > 0), # txfifo_empty + self.SR.r_data[2].eq(self._rx_fifo.level < self.fifo_depth), # rxfifo_full + self.SR.r_data[3].eq(self._rx_fifo.level > 0), # rxfifo_empty + ] + + # DR hookups + m.d.comb += [ + # Plumb read data in, and connect CSR read strobe to FIFO r_en. + # We can ignore r_rdy because we specify empty reads are invalid. + self.DR.r_data.eq(self._rx_fifo.r_data), + self._rx_fifo.r_en.eq(self.DR.r_stb), + + # Plumb write data from CSR to FIFO, connect write strobe to FIFO w_en. + # We can ignore w_rdy, because we specify writes to a full FIFO are dropped. + self._tx_fifo.w_data.eq(self.DR.w_data), + self._tx_fifo.w_en.eq(self.DR.w_stb), + ] + + # Hook serial devices into FIFOs + rx_err = Signal() + m.d.comb += [ + # RX + rx_err.eq(self._serial.rx.err.overflow & self._serial.rx.err.frame & self._serial.rx.err.parity), + self._rx_fifo.w_data.eq(self._serial.rx.data), + self._rx_fifo.w_en.eq(self._serial.rx.rdy & ~rx_err), # Only pull data into FIFO if no RX error + self._serial.rx.ack.eq(self._rx_fifo.w_rdy | rx_err), # Pull data out if there is an error anyways + + # TX + self._serial.tx.data.eq(self._tx_fifo.r_data), + self._serial.tx.ack.eq(self._tx_fifo.r_rdy), + self._tx_fifo.r_en.eq(self._serial.tx.rdy), + ] + + # Optionally connect out RX/TX signals, if pins are not provided (likely in sim) + if self._pins is None: + m.d.comb += [ + self._serial.rx.i.eq(self.rx), + self.tx.eq(self._serial.tx.o), + ] + + return m