gateware: saving some WIP on UART
This commit is contained in:
parent
71ca5a31be
commit
52012dc4ad
@ -1,8 +1,6 @@
|
||||
from amaranth import *
|
||||
from amaranth.lib.coding import PriorityEncoder
|
||||
from amaranth_soc.csr import *
|
||||
from amaranth_soc.memory import *
|
||||
from amlib.io.i2c import I2CInitiator, I2CTarget
|
||||
from amlib.io.i2c import I2CInitiator
|
||||
|
||||
from amaranth.sim import *
|
||||
|
||||
|
@ -19,6 +19,7 @@ from led import *
|
||||
|
||||
import i2c
|
||||
import test_i2c
|
||||
import uart
|
||||
|
||||
# To change clock domain of a module:
|
||||
# new_thing = DomainRenamer("new_clock")(MyElaboratable())
|
||||
@ -100,6 +101,8 @@ class Core(Elaboratable):
|
||||
# Connect arbiter to decoder
|
||||
m.d.comb += self.arbiter.bus.connect(self.decoder.bus)
|
||||
|
||||
m.submodules.uart = uart.UART(10e6)
|
||||
|
||||
# Counter
|
||||
#m.d.sync += self.count.eq(self.count + 1)
|
||||
#with m.If(self.count >= 50000000):
|
||||
|
17
gateware/test_uart.py
Normal file
17
gateware/test_uart.py
Normal file
@ -0,0 +1,17 @@
|
||||
from amaranth import *
|
||||
from amaranth.sim import *
|
||||
from uart import *
|
||||
from tests import BaseTestClass, provide_testcase_name
|
||||
|
||||
|
||||
class TestHarness(Elaboratable):
|
||||
def __init__(self):
|
||||
self.uut = UART(10e6)
|
||||
|
||||
|
||||
def elaborate(self, platform):
|
||||
assert platform is None
|
||||
|
||||
m = Module()
|
||||
|
||||
return m
|
144
gateware/uart.py
Normal file
144
gateware/uart.py
Normal file
@ -0,0 +1,144 @@
|
||||
from amaranth import *
|
||||
from amaranth.lib.fifo import SyncFIFO
|
||||
from amaranth_soc.csr import *
|
||||
from amlib.io.serial import *
|
||||
|
||||
from math import ceil, log2
|
||||
|
||||
|
||||
class UART(Elaboratable):
|
||||
"""
|
||||
CSR-enabled UART TX/RX peripheral.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
:param clk_freq:
|
||||
System clock frequency, used for default divisor calculation.
|
||||
:param default_baud:
|
||||
Default baud rate to set divisor for.
|
||||
:param fifo_depth:
|
||||
Depth (in bytes) of RX and TX FIFOs.
|
||||
:param pins:
|
||||
Optional parameter to supply platform pins into module.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
:attr bus:
|
||||
CSR bus to provide access to control registers.
|
||||
:attr tx:
|
||||
TX signal. Only created if pins=None, connected to AsyncSerial.tx.o
|
||||
:attr rx:
|
||||
RX signal. Only created if pins=None, connected to AsyncSerial.rx.o
|
||||
"""
|
||||
def __init__(self, clk_freq, default_baud=115200, fifo_depth=128, pins=None):
|
||||
self.fifo_depth = fifo_depth
|
||||
self._pins = pins
|
||||
|
||||
# Clock divisor register
|
||||
#
|
||||
# Sets input/output baudrate to system clock / divisor. Resets to value
|
||||
# that provides 115200 baud rate. Writes to this register clear FIFOs.
|
||||
self.DIVISOR = Element(16, Element.Access.RW, name="DIVISOR")
|
||||
|
||||
# Status register.
|
||||
#
|
||||
# Fields:
|
||||
# [0]: txfifo_full
|
||||
# [1]: txfifo_empty
|
||||
# [2]: rxfifo_full
|
||||
# [3]: rxfifo_empty
|
||||
self.SR = Element(4, Element.Access.R, name="SR")
|
||||
|
||||
# Data register.
|
||||
#
|
||||
# Writes push data into TX FIFO, and are discarded if full, reads pull
|
||||
# data from RX FIFO, and are invalid if it is empty. Incoming bytes are discarded
|
||||
# if the RX FIFO is full.
|
||||
self.DR = Element(8, Element.Access.RW, name="DR")
|
||||
|
||||
# Set up CSR bus
|
||||
addr_width = ceil(log2(64))
|
||||
data_width = 32
|
||||
self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width)
|
||||
self._csr_mux.add(self.DIVISOR)
|
||||
self._csr_mux.add(self.SR)
|
||||
self._csr_mux.add(self.DR)
|
||||
self.bus = self._csr_mux.bus
|
||||
|
||||
# Actual business logic
|
||||
self._serial = AsyncSerial(divisor=int(clk_freq // default_baud), divisor_bits=16, data_bits=8, parity="none", pins=pins)
|
||||
self._tx_fifo = SyncFIFO(width=8, depth=self.fifo_depth)
|
||||
self._rx_fifo = SyncFIFO(width=8, depth=self.fifo_depth)
|
||||
|
||||
# Optional RX/TX signals
|
||||
if self._pins is None:
|
||||
self.tx = Signal()
|
||||
self.rx = Signal()
|
||||
|
||||
|
||||
def elaborate(self, platform):
|
||||
m = Module()
|
||||
|
||||
# Seperate clock domain to allow for resetting FIFOs separately
|
||||
m.domains += ClockDomain("fifo", local=True)
|
||||
m.d.comb += ClockSignal("fifo").eq(ClockSignal("sync"))
|
||||
m.d.comb += ResetSignal("fifo").eq(self.DIVISOR.w_stb) # Reset on a write to DIVISOR as well
|
||||
|
||||
fifo_domain = DomainRenamer("fifo")
|
||||
self._tx_fifo = fifo_domain(self._tx_fifo)
|
||||
self._rx_fifo = fifo_domain(self._rx_fifo)
|
||||
|
||||
m.submodules.serial = self._serial
|
||||
m.submodules.tx_fifo = self._tx_fifo
|
||||
m.submodules.rx_fifo = self._rx_fifo
|
||||
m.submodules.csr_mux = self._csr_mux
|
||||
|
||||
# Hook up divisor to register.
|
||||
m.d.comb += self.DIVISOR.r_data.eq(self._serial.divisor)
|
||||
with m.If(self.DIVISOR.w_stb):
|
||||
m.d.sync += self._serial.divisor.eq(self.DIVISOR.w_data)
|
||||
|
||||
# SR Hookups
|
||||
m.d.comb += [
|
||||
self.SR.r_data[0].eq(self._tx_fifo.level < self.fifo_depth), # txfifo_full
|
||||
self.SR.r_data[1].eq(self._tx_fifo.level > 0), # txfifo_empty
|
||||
self.SR.r_data[2].eq(self._rx_fifo.level < self.fifo_depth), # rxfifo_full
|
||||
self.SR.r_data[3].eq(self._rx_fifo.level > 0), # rxfifo_empty
|
||||
]
|
||||
|
||||
# DR hookups
|
||||
m.d.comb += [
|
||||
# Plumb read data in, and connect CSR read strobe to FIFO r_en.
|
||||
# We can ignore r_rdy because we specify empty reads are invalid.
|
||||
self.DR.r_data.eq(self._rx_fifo.r_data),
|
||||
self._rx_fifo.r_en.eq(self.DR.r_stb),
|
||||
|
||||
# Plumb write data from CSR to FIFO, connect write strobe to FIFO w_en.
|
||||
# We can ignore w_rdy, because we specify writes to a full FIFO are dropped.
|
||||
self._tx_fifo.w_data.eq(self.DR.w_data),
|
||||
self._tx_fifo.w_en.eq(self.DR.w_stb),
|
||||
]
|
||||
|
||||
# Hook serial devices into FIFOs
|
||||
rx_err = Signal()
|
||||
m.d.comb += [
|
||||
# RX
|
||||
rx_err.eq(self._serial.rx.err.overflow & self._serial.rx.err.frame & self._serial.rx.err.parity),
|
||||
self._rx_fifo.w_data.eq(self._serial.rx.data),
|
||||
self._rx_fifo.w_en.eq(self._serial.rx.rdy & ~rx_err), # Only pull data into FIFO if no RX error
|
||||
self._serial.rx.ack.eq(self._rx_fifo.w_rdy | rx_err), # Pull data out if there is an error anyways
|
||||
|
||||
# TX
|
||||
self._serial.tx.data.eq(self._tx_fifo.r_data),
|
||||
self._serial.tx.ack.eq(self._tx_fifo.r_rdy),
|
||||
self._tx_fifo.r_en.eq(self._serial.tx.rdy),
|
||||
]
|
||||
|
||||
# Optionally connect out RX/TX signals, if pins are not provided (likely in sim)
|
||||
if self._pins is None:
|
||||
m.d.comb += [
|
||||
self._serial.rx.i.eq(self.rx),
|
||||
self.tx.eq(self._serial.tx.o),
|
||||
]
|
||||
|
||||
return m
|
Loading…
Reference in New Issue
Block a user