from amaranth import * from amaranth.lib.coding import PriorityEncoder from amaranth_soc.csr import * from amaranth_soc.memory import * from amlib.io.i2c import I2CInitiator, I2CTarget from amaranth.sim import * from math import floor, log2, ceil class I2C(Elaboratable): """ CSR-based I2C master peripheral. Based on the amlib I2CInitiator, simply provides a simple Wishbone register interface. I could add more logic and create a proper bus master that handles the main state machine by itself, but we don't need performance when running I2C so the time spent on that in gateware is not worth it. Parameters ---------- :param system_freq: System clock frequency, i.e. frequency present on Wishbone bus. :param bus_freq: Requested I2C frequency. TODO describe how we create this frequency :param clk_stretch: Passed through to amlib device, whether or not to monitor for slaves clock stretching. Attributes ---------- :attr bus: CSR bus to access peripheral registers on. """ def __init__(self, system_freq, bus_freq, pads, clk_stretch: bool = True): # Control register, writes to this are strobed into the command interface presented by the # amlib implementation # # Fields: # [0]: start - Generate a start or repeated start condition on the bus. Ignored if busy is high # [1]: stop - Generate a stop condition on the bus. Ignored if busy is high # [2]: write - write a byte out on the bus # [3]: read - read a byte from the bus # [4]: read_ack - ACK value that gets written out during a read operation # [5]: read_ack_en - Hacky solution to determine if we want to save read_ack self.CR = Element(6, Element.Access.W, name="CR") # Status register # # Fields: # [0]: busy - bus is busy operating # [1]: ack - an ACK has been received from a bus slave # [2]: read_ack - a convenience read field to see value of CR->read_ack self.SR = Element(3, Element.Access.R, name="SR") # Data write register # # Latches in data to be written when write signal is applied. self.DWR = Element(8, Element.Access.W, name="DWR") # Data read register # # Only presents valid data after 'read' has started, and once 'busy' is no longer asserted. self.DRR = Element(8, Element.Access.R, name="DRR") # Set up CSR bus addr_width = ceil(log2(64)) # Support up to 64 registers just because data_width = 32 # 32 bit bus self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width) # TODO export the addresses of these somehow self._csr_mux.add(self.CR) self._csr_mux.add(self.SR) self._csr_mux.add(self.DWR) self._csr_mux.add(self.DRR) self.bus = self._csr_mux.bus # Set up I2C initiator submodule period_cyc = floor(system_freq / bus_freq) self._initiator = I2CInitiator(pads, period_cyc, clk_stretch) def elaborate(self, _platform): m = Module() m.submodules.initiator = self._initiator m.submodules.csr_mux = self._csr_mux # Strobe control signals in with m.If(self.CR.w_stb): m.d.sync += [ self._initiator.start.eq(self.CR.w_data[0]), self._initiator.stop.eq(self.CR.w_data[1]), self._initiator.write.eq(self.CR.w_data[2]), self._initiator.read.eq(self.CR.w_data[3]), ] # We don't *always* want to change read_ack with m.If(self.CR.w_data[5]): m.d.sync += self._initiator.ack_i.eq(self.CR.w_data[4]) with m.Else(): m.d.sync += [ self._initiator.start.eq(0), self._initiator.stop.eq(0), self._initiator.write.eq(0), self._initiator.read.eq(0), # We don't strobe ack_i, this is a persistant setting ] # Status signals may as well just be a continuous assignment self.SR.r_data[0].eq(self._initiator.busy) self.SR.r_data[1].eq(self._initiator.ack_o) self.SR.r_data[2].eq(self._initiator.ack_i) # Strobe in this byte to send out with m.If(self.DWR.w_stb): m.d.sync += self._initiator.data_i.eq(self.DWR.w_data) # Read data only becomes valid when not busy (i.e. the read has completed) with m.If(~self._initiator.busy): m.d.sync += self.DRR.r_data.eq(self._initiator.data_o) return m from amaranth.lib.io import pin_layout i2c_layout = [ ("sda", pin_layout(1, "io")), ("scl", pin_layout(1, "io")), ] class I2CBusSimulator(Elaboratable): def __init__(self): self.interfaces = [] self.sda = Signal() self.scl = Signal() def elaborate(self, target): assert target is None, "This bus simulator should never be used in real hardware!" n = len(self.interfaces) m = Module() m.d.comb += self.sda.eq(1) m.d.comb += self.scl.eq(1) # TODO maybe output a bus contention signal? # First interfaces get priority over interfaces added after for i in reversed(range(n)): # Emulate bus drivers with m.If(self.interfaces[i].sda.oe): m.d.comb += self.sda.eq(self.interfaces[i].sda.o) with m.If(self.interfaces[i].scl.oe): m.d.comb += self.scl.eq(self.interfaces[i].scl.o) pass # Connect inputs to bus value m.d.comb += [ self.interfaces[i].sda.i.eq(self.sda), self.interfaces[i].scl.i.eq(self.scl), ] return m def create_interface(self) -> Record: new_interface = Record(i2c_layout) self.interfaces.append(new_interface) return new_interface class TestHarness(Elaboratable): def __init__(self): self.i2c = I2CBusSimulator() self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface()) self.i2c_target = I2CTarget(self.i2c.create_interface()) def elaborate(self, platform): assert platform is None m = Module() m.submodules.i2c = self.i2c m.submodules.uut = self.uut m.submodules.i2c_target = self.i2c_target m.d.comb += self.i2c_target.address.eq(0xAA >> 1) return m import unittest import os import sys import inspect from contextlib import nullcontext from typing import Generator from tests import BaseTestClass, provide_testcase_name class TestCSROperation(BaseTestClass): def setUp(self): self.harness = TestHarness() @provide_testcase_name def test_send_byte(self, test_name): def test(): yield Tick() self._run_test(test, test_name) # TODO switch to unittest or something if __name__ == "__main__": unittest.main() def write_csr(bus, index, data): yield bus.addr.eq(index) yield bus.w_stb.eq(1) yield bus.w_data.eq(data) yield Tick() yield bus.w_stb.eq(0) yield Tick() def read_csr(bus, index): yield bus.r_strb.eq(1) ret = yield bus.r_data yield Tick() yield bus.r_stb.eq(0) yield Tick() harness = TestHarness() sim = Simulator(harness) def test_proc(): #send start yield from write_csr(harness.uut.bus, 0, 1) # TODO shouldn't need to do this, if I did a proper CSR read maybe? yield Tick() yield Tick() # TODO I want something like read_csr, unsure how to implement # Wait for while True: busy = yield harness.uut._initiator.busy if not busy: break else: yield Tick() # Set data yield from write_csr(harness.uut.bus, 2, 0xAA) # Write data yield from write_csr(harness.uut.bus, 0, 1 << 2) did_start = False for i in range(1000): start_cond = yield harness.i2c_target.start yield Tick() if start_cond: did_start = True break #assert did_start, "Idnaoidnwaioudnwaoiun" print(did_start) sim.add_clock(100e-9) sim.add_sync_process(test_proc) with sim.write_vcd('test.vcd'): sim.reset() sim.run()