From 0e3328aac1143ab1ca057cdb742c31217c69dbb9 Mon Sep 17 00:00:00 2001 From: David Lenfesty Date: Sat, 27 May 2023 12:08:34 -0600 Subject: [PATCH] gw: add some end to end unit tests for sampler controller --- gateware/litex_main.py | 2 + gateware/sampler/controller.py | 149 +++++++++++++++++++++++++++++++-- 2 files changed, 145 insertions(+), 6 deletions(-) diff --git a/gateware/litex_main.py b/gateware/litex_main.py index eb03cdd..ae1568e 100755 --- a/gateware/litex_main.py +++ b/gateware/litex_main.py @@ -183,6 +183,8 @@ def main(): results = [] results.append(run_test("CircularBuffer", circular_buffer.testbench)) results.append(run_test("SamplerController", controller.test_bus_access)) + results.append(run_test("SamplerController", controller.test_simple_waveform)) + results.append(run_test("SamplerController", controller.test_simple_waveform_capture_offset)) results.append(run_test("PeakDetector", peak_detector.test_simple_waveform)) results.append(run_test("PeakDetector", peak_detector.test_scrunched_simple_waveform)) results.append(run_test("PeakDetector", peak_detector.test_decay_simple_waveform)) diff --git a/gateware/sampler/controller.py b/gateware/sampler/controller.py index 5878399..070a897 100644 --- a/gateware/sampler/controller.py +++ b/gateware/sampler/controller.py @@ -234,6 +234,8 @@ def read_wishbone(bus, address,): (yield bus.stb.eq(0)) (yield bus.cyc.eq(0)) + yield # Tick + break else: # Tick until we receive an ACK @@ -249,24 +251,23 @@ class MockSampler(Module): Index of data to use from provided data """ def __init__(self, data: List[int]): - memory = Memory(width=10, depth=len(data), init=data) + self.specials.memory = Memory(width=10, depth=len(data), init=data) self.index = Signal(ceil(log2(len(data)))) self.data = Signal(10) self.valid = Signal() - read_port = memory.get_port(async_read=True) + read_port = self.memory.get_port(async_read=True) self.comb += [ read_port.adr.eq(self.index), self.data.eq(read_port.dat_r), ] class TestSoC(Module): - def __init__(self, data): - sampler = MockSampler(data) - self.submodules.sampler = sampler + def __init__(self, data: List[int], *, buffer_len: int = 1024, num_samplers: int = 1): # TODO multiple mock samplers to test that functionality - self.controller = SamplerController([MockSampler(data)], 1024) + self.samplers = [MockSampler(data) for _ in range(num_samplers)] + self.controller = SamplerController(self.samplers, buffer_len) self.submodules.controller = self.controller self.bus = self.controller.bus @@ -281,3 +282,139 @@ def test_bus_access(): # TODO test writing to RO register fails run_simulation(dut, test_fn(), vcd_name="test_bus_access.vcd") + + +def test_simple_waveform(): + """End-to-end test of a simple waveform""" + from .peak_detector import create_waveform + _, data = create_waveform() + data = [int(d) for d in data] + dut = TestSoC(data, buffer_len=32) + + def test_fn(): + # Set settings + yield from write_wishbone(dut.bus, 2, 0) # trigger_run_len = 0 + yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800 + yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10 + yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1 + yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0 + + # Start controller + yield from write_wishbone(dut.bus, 0, 1) + + triggered_yet = False + triggered_num = 0 + for i in range(1000): + (yield dut.samplers[0].index.eq(i)) + (yield dut.samplers[0].valid.eq(1)) + yield + + (yield dut.samplers[0].valid.eq(0)) + yield + + # Total of 6 clocks per sample clock + yield + yield + yield + yield + + if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1: + # Triggered, now we need to run some number of cycles + triggered_yet = True + + if triggered_yet: + triggered_num += 1 + if triggered_num > 32: + # We should now have collected all our samples + yield from read_wishbone(dut.bus, 1) + assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!" + + # Check that length is correct + yield from read_wishbone(dut.bus, 0x100) + len = (yield dut.bus.dat_r) + assert len == 32, f"Len ({len}) not correct!" + + # Read data in + data = [] + for i in range(32): + yield from read_wishbone(dut.bus, 0x800 + i) + sample = (yield dut.bus.dat_r) + data.append(sample) + + # Test pass + return + + assert False, "We should have triggered" + + run_simulation(dut, test_fn()) + + +def test_simple_waveform_capture_offset(): + """Test a simple waveform captured at an offset""" + from .peak_detector import create_waveform + _, data = create_waveform() + data = [int(d) for d in data] + dut = TestSoC(data, buffer_len=32) + + def test_fn(): + # Set settings + yield from write_wishbone(dut.bus, 2, 16) # trigger_run_len = 16 + yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800 + yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10 + yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1 + yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0 + + # Start controller + yield from write_wishbone(dut.bus, 0, 1) + + triggered_yet = False + triggered_num = 0 + for i in range(1000): + (yield dut.samplers[0].index.eq(i)) + (yield dut.samplers[0].valid.eq(1)) + yield + + (yield dut.samplers[0].valid.eq(0)) + yield + + # Total of 6 clocks per sample clock + yield + yield + yield + yield + + if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1: + # Triggered, now we need to run some number of cycles + triggered_yet = True + + if triggered_yet: + triggered_num += 1 + if triggered_num > 16: + # We should now have collected all our samples + yield from read_wishbone(dut.bus, 1) + assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!" + + # Check that length is correct + yield from read_wishbone(dut.bus, 0x100) + len = (yield dut.bus.dat_r) + assert len == 32, f"Len ({len}) not correct!" + + # Read data in + data = [] + for i in range(32): + yield from read_wishbone(dut.bus, 0x800 + i) + sample = (yield dut.bus.dat_r) + data.append(sample) + + + # Manually validated from test above to be offset into the + # data + assert data[0] == 138 + assert data[1] == 132 + + # Test pass + return + + assert False, "We should have triggered" + + run_simulation(dut, test_fn())