From 490b92b1fd298c7d2a57a884ec92b1308290ccc2 Mon Sep 17 00:00:00 2001 From: David Lenfesty Date: Sun, 29 Jan 2023 16:54:29 -0700 Subject: [PATCH] gateware: first work with I2C peripheral Basic details seem to be working, and I'm on my way with testing. Next steps: - Build out more local testing utilities - Build out unittest infrastructure - Flesh out more tests to verify operation --- gateware/i2c.py | 250 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 gateware/i2c.py diff --git a/gateware/i2c.py b/gateware/i2c.py new file mode 100644 index 0000000..f3cd369 --- /dev/null +++ b/gateware/i2c.py @@ -0,0 +1,250 @@ +from amaranth import * +from amaranth.lib.coding import PriorityEncoder +from amaranth_soc.csr import * +from amaranth_soc.memory import * +from amlib.io.i2c import I2CInitiator, I2CTarget + +from amaranth.sim import * + +from math import floor, log2, ceil + + +class I2C(Elaboratable): + """ + CSR-based I2C master peripheral. + + Based on the amlib I2CInitiator, simply provides a simple Wishbone register interface. + I could add more logic and create a proper bus master that handles the main state machine + by itself, but we don't need performance when running I2C so the time spent on that in + gateware is not worth it. + + Parameters + ---------- + :param system_freq: + System clock frequency, i.e. frequency present on Wishbone bus. + :param bus_freq: + Requested I2C frequency. TODO describe how we create this frequency + :param clk_stretch: + Passed through to amlib device, whether or not to monitor for slaves clock stretching. + + Attributes + ---------- + :attr bus: + CSR bus to access peripheral registers on. + """ + def __init__(self, system_freq, bus_freq, pads, clk_stretch: bool = True): + # Control register, writes to this are strobed into the command interface presented by the + # amlib implementation + # + # Fields: + # [0]: start - Generate a start or repeated start condition on the bus. Ignored if busy is high + # [1]: stop - Generate a stop condition on the bus. Ignored if busy is high + # [2]: write - write a byte out on the bus + # [3]: read - read a byte from the bus + # [4]: read_ack - ACK value that gets written out during a read operation + # [5]: read_ack_en - Hacky solution to determine if we want to save read_ack + self.CR = Element(6, Element.Access.W, name="CR") + + # Status register + # + # Fields: + # [0]: busy - bus is busy operating + # [1]: ack - an ACK has been received from a bus slave + # [2]: read_ack - a convenience read field to see value of CR->read_ack + self.SR = Element(3, Element.Access.R, name="SR") + + # Data write register + # + # Latches in data to be written when write signal is applied. + self.DWR = Element(8, Element.Access.W, name="DWR") + + # Data read register + # + # Only presents valid data after 'read' has started, and once 'busy' is no longer asserted. + self.DRR = Element(8, Element.Access.R, name="DRR") + + # Set up CSR bus + addr_width = ceil(log2(64)) # Support up to 64 registers just because + data_width = 32 # 32 bit bus + self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width) + # TODO export the addresses of these somehow + self._csr_mux.add(self.CR) + self._csr_mux.add(self.SR) + self._csr_mux.add(self.DWR) + self._csr_mux.add(self.DRR) + self.bus = self._csr_mux.bus + + # Set up I2C initiator submodule + period_cyc = floor(system_freq / bus_freq) + self._initiator = I2CInitiator(pads, period_cyc, clk_stretch) + + def elaborate(self, _platform): + m = Module() + + m.submodules.initiator = self._initiator + m.submodules.csr_mux = self._csr_mux + + # Strobe control signals in + with m.If(self.CR.w_stb): + m.d.sync += [ + self._initiator.start.eq(self.CR.w_data[0]), + self._initiator.stop.eq(self.CR.w_data[1]), + self._initiator.write.eq(self.CR.w_data[2]), + self._initiator.read.eq(self.CR.w_data[3]), + ] + + # We don't *always* want to change read_ack + with m.If(self.CR.w_data[5]): + m.d.sync += self._initiator.ack_i.eq(self.CR.w_data[4]) + + with m.Else(): + m.d.sync += [ + self._initiator.start.eq(0), + self._initiator.stop.eq(0), + self._initiator.write.eq(0), + self._initiator.read.eq(0), + # We don't strobe ack_i, this is a persistant setting + ] + + # Status signals may as well just be a continuous assignment + self.SR.r_data[0].eq(self._initiator.busy) + self.SR.r_data[1].eq(self._initiator.ack_o) + self.SR.r_data[2].eq(self._initiator.ack_i) + + # Strobe in this byte to send out + with m.If(self.DWR.w_stb): + m.d.sync += self._initiator.data_i.eq(self.DWR.w_data) + + # Read data only becomes valid when not busy (i.e. the read has completed) + with m.If(~self._initiator.busy): + m.d.sync += self.DRR.r_data.eq(self._initiator.data_o) + + return m + +from amaranth.lib.io import pin_layout +i2c_layout = [ + ("sda", pin_layout(1, "io")), + ("scl", pin_layout(1, "io")), +] + +class I2CBusSimulator(Elaboratable): + def __init__(self): + self.interfaces = [] + self.sda = Signal() + self.scl = Signal() + + def elaborate(self, target): + assert target is None, "This bus simulator should never be used in real hardware!" + + n = len(self.interfaces) + + m = Module() + + m.d.comb += self.sda.eq(1) + m.d.comb += self.scl.eq(1) + + # TODO maybe output a bus contention signal? + # First interfaces get priority over interfaces added after + for i in reversed(range(n)): + # Emulate bus drivers + with m.If(self.interfaces[i].sda.oe): + m.d.comb += self.sda.eq(self.interfaces[i].sda.o) + with m.If(self.interfaces[i].scl.oe): + m.d.comb += self.scl.eq(self.interfaces[i].scl.o) + pass + + # Connect inputs to bus value + m.d.comb += [ + self.interfaces[i].sda.i.eq(self.sda), + self.interfaces[i].scl.i.eq(self.scl), + ] + + return m + + def create_interface(self) -> Record: + new_interface = Record(i2c_layout) + self.interfaces.append(new_interface) + return new_interface + + +class TestHarness(Elaboratable): + def __init__(self): + self.i2c = I2CBusSimulator() + + self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface()) + self.i2c_target = I2CTarget(self.i2c.create_interface()) + + + def elaborate(self, platform): + assert platform is None + + m = Module() + + m.submodules.i2c = self.i2c + m.submodules.uut = self.uut + m.submodules.i2c_target = self.i2c_target + + m.d.comb += self.i2c_target.address.eq(0xAA >> 1) + + return m + +# TODO switch to unittest or something +if __name__ == "__main__": + def write_csr(bus, index, data): + yield bus.addr.eq(index) + yield bus.w_stb.eq(1) + yield bus.w_data.eq(data) + yield Tick() + yield bus.w_stb.eq(0) + yield Tick() + + def read_csr(bus, index): + yield bus.r_strb.eq(1) + ret = yield bus.r_data + yield Tick() + yield bus.r_stb.eq(0) + yield Tick() + + harness = TestHarness() + sim = Simulator(harness) + + def test_proc(): + #send start + yield from write_csr(harness.uut.bus, 0, 1) + + # TODO shouldn't need to do this, if I did a proper CSR read maybe? + yield Tick() + yield Tick() + + # TODO I want something like read_csr, unsure how to implement + # Wait for + while True: + busy = yield harness.uut._initiator.busy + if not busy: + break + else: + yield Tick() + + # Set data + yield from write_csr(harness.uut.bus, 2, 0xAA) + # Write data + yield from write_csr(harness.uut.bus, 0, 1 << 2) + + did_start = False + for i in range(1000): + start_cond = yield harness.i2c_target.start + yield Tick() + + if start_cond: + did_start = True + break + + #assert did_start, "Idnaoidnwaioudnwaoiun" + print(did_start) + + + sim.add_clock(100e-9) + sim.add_sync_process(test_proc) + with sim.write_vcd('test.vcd'): + sim.reset() + sim.run()