From f0683e684c8f25a0da3ff1b078554c6acd61e4a3 Mon Sep 17 00:00:00 2001 From: David Lenfesty Date: Sun, 29 Jan 2023 21:32:56 -0700 Subject: [PATCH] gateware: Properly migrate I2C test into my new framework --- gateware/i2c.py | 156 ------------------------------------------- gateware/test_i2c.py | 60 ++++++++++++++++- gateware/tests.py | 7 +- 3 files changed, 62 insertions(+), 161 deletions(-) diff --git a/gateware/i2c.py b/gateware/i2c.py index d6a6566..31134bd 100644 --- a/gateware/i2c.py +++ b/gateware/i2c.py @@ -121,159 +121,3 @@ class I2C(Elaboratable): return m - -from amaranth.lib.io import pin_layout -i2c_layout = [ - ("sda", pin_layout(1, "io")), - ("scl", pin_layout(1, "io")), -] - - -class I2CBusSimulator(Elaboratable): - def __init__(self): - self.interfaces = [] - self.sda = Signal() - self.scl = Signal() - - def elaborate(self, target): - assert target is None, "This bus simulator should never be used in real hardware!" - - n = len(self.interfaces) - - m = Module() - - m.d.comb += self.sda.eq(1) - m.d.comb += self.scl.eq(1) - - # TODO maybe output a bus contention signal? - # First interfaces get priority over interfaces added after - for i in reversed(range(n)): - # Emulate bus drivers - with m.If(self.interfaces[i].sda.oe): - m.d.comb += self.sda.eq(self.interfaces[i].sda.o) - with m.If(self.interfaces[i].scl.oe): - m.d.comb += self.scl.eq(self.interfaces[i].scl.o) - pass - - # Connect inputs to bus value - m.d.comb += [ - self.interfaces[i].sda.i.eq(self.sda), - self.interfaces[i].scl.i.eq(self.scl), - ] - - return m - - def create_interface(self) -> Record: - new_interface = Record(i2c_layout) - self.interfaces.append(new_interface) - return new_interface - - -class TestHarness(Elaboratable): - def __init__(self): - self.i2c = I2CBusSimulator() - - self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface()) - self.i2c_target = I2CTarget(self.i2c.create_interface()) - - - def elaborate(self, platform): - assert platform is None - - m = Module() - - m.submodules.i2c = self.i2c - m.submodules.uut = self.uut - m.submodules.i2c_target = self.i2c_target - - m.d.comb += self.i2c_target.address.eq(0xAA >> 1) - - return m - -import unittest -import os -import sys - -import inspect - -from contextlib import nullcontext - -from typing import Generator - -from tests import BaseTestClass, provide_testcase_name - -class TestCSROperation(BaseTestClass): - def setUp(self): - self.harness = TestHarness() - - @provide_testcase_name - def test_send_byte(self, test_name): - def test(): - yield Tick() - - self._run_test(test, test_name) - - -# TODO switch to unittest or something -if __name__ == "__main__": - unittest.main() - - - def write_csr(bus, index, data): - yield bus.addr.eq(index) - yield bus.w_stb.eq(1) - yield bus.w_data.eq(data) - yield Tick() - yield bus.w_stb.eq(0) - yield Tick() - - def read_csr(bus, index): - yield bus.r_strb.eq(1) - ret = yield bus.r_data - yield Tick() - yield bus.r_stb.eq(0) - yield Tick() - - harness = TestHarness() - sim = Simulator(harness) - - def test_proc(): - #send start - yield from write_csr(harness.uut.bus, 0, 1) - - # TODO shouldn't need to do this, if I did a proper CSR read maybe? - yield Tick() - yield Tick() - - # TODO I want something like read_csr, unsure how to implement - # Wait for - while True: - busy = yield harness.uut._initiator.busy - if not busy: - break - else: - yield Tick() - - # Set data - yield from write_csr(harness.uut.bus, 2, 0xAA) - # Write data - yield from write_csr(harness.uut.bus, 0, 1 << 2) - - did_start = False - for i in range(1000): - start_cond = yield harness.i2c_target.start - yield Tick() - - if start_cond: - did_start = True - break - - #assert did_start, "Idnaoidnwaioudnwaoiun" - print(did_start) - - - sim.add_clock(100e-9) - sim.add_sync_process(test_proc) - with sim.write_vcd('test.vcd'): - sim.reset() - sim.run() diff --git a/gateware/test_i2c.py b/gateware/test_i2c.py index 18f84b4..956f5fc 100644 --- a/gateware/test_i2c.py +++ b/gateware/test_i2c.py @@ -14,6 +14,9 @@ class TestHarness(Elaboratable): self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface()) self.i2c_target = I2CTarget(self.i2c.create_interface()) + self.start_latch = Signal() + self.clear_start = Signal() + def elaborate(self, platform): assert platform is None @@ -26,6 +29,12 @@ class TestHarness(Elaboratable): m.d.comb += self.i2c_target.address.eq(0xAA >> 1) + with m.If(self.i2c_target.start): + m.d.sync += self.start_latch.eq(self.i2c_target.start) + + with m.If(self.clear_start): + m.d.sync += self.start_latch.eq(0) + return m @@ -34,13 +43,58 @@ class TestCSROperation(BaseTestClass): self.harness = TestHarness() - @provide_testcase_name - def test_send_byte(self, test_name): - def test(): + def _write_csr(self, bus, index, data): + yield bus.addr.eq(index) + yield bus.w_stb.eq(1) + yield bus.w_data.eq(data) + yield Tick() + yield bus.w_stb.eq(0) + yield Tick() + + + def _wait_for_signal(self, signal, polarity=False, require_edge=True, timeout=1000): + ready_for_edge = not require_edge # If we don't require edge, we can just ignore + + while True: + timeout -= 1 + if timeout == 0: + self.fail(f"_wait_for_signal({signal}, {polarity}, {require_edge}, {timeout}, timed out!") + + read = yield signal + if read == polarity: + if ready_for_edge: + break + else: + ready_for_edge = True + yield Tick() + + # NOTE So ideally there are more test cases... but the initiator itself is well tested, + # and we only really need it to work for a limited set of use cases, so exhaustive testing + # isn't a huge deal. As well, we can cover all valid uses of the signals with one test. + @provide_testcase_name + def test_operation(self, test_name): + def test(): + #send start + yield from self._write_csr(self.harness.uut.bus, 0, 1) + + yield from self._wait_for_signal(self.harness.uut._initiator.busy, require_edge=True) + + # Set data + yield from self._write_csr(self.harness.uut.bus, 2, 0xAA) + # Write data + yield from self._write_csr(self.harness.uut.bus, 0, 1 << 2) + + yield from self._wait_for_signal(self.harness.uut._initiator.busy) + + # First byte has been written + did_start = yield self.harness.start_latch + self.assertTrue(did_start) + self._run_test(test, test_name) + i2c_layout = [ ("sda", pin_layout(1, "io")), ("scl", pin_layout(1, "io")), diff --git a/gateware/tests.py b/gateware/tests.py index 96db1ce..c60d4d8 100644 --- a/gateware/tests.py +++ b/gateware/tests.py @@ -22,16 +22,19 @@ class BaseTestClass(unittest.TestCase): raise NotImplementedError(f"Must define a self.harness module for TestCase {self.__class__.__name__}!") sim.add_clock(100e-9) + sim.add_sync_process(test) sim.reset() # Pretty hacky way to pass this info in but does it look like I care? if os.environ.get("TEST_SAVE_VCD"): - ctx = sim.write_vcd(f"vcd_out/{name}.vcd") + ctx = sim.write_vcd(f"vcd_out/{name}.vcd") else: ctx = nullcontext() with ctx: - sim.add_sync_process(test) + sim.run() + + del sim def provide_testcase_name(fn):