from amaranth import * from i2c import * from amaranth.lib.io import pin_layout from tests import BaseTestClass, provide_testcase_name __all__ = ["i2c_layout", "I2CBusSimulator", "TestHarness", "TestCSROperation"] class TestHarness(Elaboratable): def __init__(self): self.i2c = I2CBusSimulator() self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface()) self.i2c_target = I2CTarget(self.i2c.create_interface()) self.start_latch = Signal() self.clear_start = Signal() def elaborate(self, platform): assert platform is None m = Module() m.submodules.i2c = self.i2c m.submodules.uut = self.uut m.submodules.i2c_target = self.i2c_target m.d.comb += self.i2c_target.address.eq(0xAA >> 1) with m.If(self.i2c_target.start): m.d.sync += self.start_latch.eq(self.i2c_target.start) with m.If(self.clear_start): m.d.sync += self.start_latch.eq(0) return m class TestCSROperation(BaseTestClass): def setUp(self): self.harness = TestHarness() def _write_csr(self, bus, index, data): yield bus.addr.eq(index) yield bus.w_stb.eq(1) yield bus.w_data.eq(data) yield Tick() yield bus.w_stb.eq(0) yield Tick() def _wait_for_signal(self, signal, polarity=False, require_edge=True, timeout=1000): ready_for_edge = not require_edge # If we don't require edge, we can just ignore while True: timeout -= 1 if timeout == 0: self.fail(f"_wait_for_signal({signal}, {polarity}, {require_edge}, {timeout}, timed out!") read = yield signal if read == polarity: if ready_for_edge: break else: ready_for_edge = True yield Tick() # NOTE So ideally there are more test cases... but the initiator itself is well tested, # and we only really need it to work for a limited set of use cases, so exhaustive testing # isn't a huge deal. As well, we can cover all valid uses of the signals with one test. @provide_testcase_name def test_operation(self, test_name): def test(): #send start yield from self._write_csr(self.harness.uut.bus, 0, 1) yield from self._wait_for_signal(self.harness.uut._initiator.busy, require_edge=True) # Set data yield from self._write_csr(self.harness.uut.bus, 2, 0xAA) # Write data yield from self._write_csr(self.harness.uut.bus, 0, 1 << 2) yield from self._wait_for_signal(self.harness.uut._initiator.busy) # First byte has been written did_start = yield self.harness.start_latch self.assertTrue(did_start) self._run_test(test, test_name) i2c_layout = [ ("sda", pin_layout(1, "io")), ("scl", pin_layout(1, "io")), ] class I2CBusSimulator(Elaboratable): def __init__(self): self.interfaces = [] self.sda = Signal() self.scl = Signal() def elaborate(self, target): assert target is None, "This bus simulator should never be used in real hardware!" n = len(self.interfaces) m = Module() m.d.comb += self.sda.eq(1) m.d.comb += self.scl.eq(1) # TODO maybe output a bus contention signal? # First interfaces get priority over interfaces added after for i in reversed(range(n)): # Emulate bus drivers with m.If(self.interfaces[i].sda.oe): m.d.comb += self.sda.eq(self.interfaces[i].sda.o) with m.If(self.interfaces[i].scl.oe): m.d.comb += self.scl.eq(self.interfaces[i].scl.o) pass # Connect inputs to bus value m.d.comb += [ self.interfaces[i].sda.i.eq(self.sda), self.interfaces[i].scl.i.eq(self.scl), ] return m def create_interface(self) -> Record: new_interface = Record(i2c_layout) self.interfaces.append(new_interface) return new_interface