gw: add some end to end unit tests for sampler controller

This commit is contained in:
David Lenfesty 2023-05-27 12:08:34 -06:00
parent e624d82742
commit 0e3328aac1
2 changed files with 145 additions and 6 deletions

View File

@ -183,6 +183,8 @@ def main():
results = [] results = []
results.append(run_test("CircularBuffer", circular_buffer.testbench)) results.append(run_test("CircularBuffer", circular_buffer.testbench))
results.append(run_test("SamplerController", controller.test_bus_access)) results.append(run_test("SamplerController", controller.test_bus_access))
results.append(run_test("SamplerController", controller.test_simple_waveform))
results.append(run_test("SamplerController", controller.test_simple_waveform_capture_offset))
results.append(run_test("PeakDetector", peak_detector.test_simple_waveform)) results.append(run_test("PeakDetector", peak_detector.test_simple_waveform))
results.append(run_test("PeakDetector", peak_detector.test_scrunched_simple_waveform)) results.append(run_test("PeakDetector", peak_detector.test_scrunched_simple_waveform))
results.append(run_test("PeakDetector", peak_detector.test_decay_simple_waveform)) results.append(run_test("PeakDetector", peak_detector.test_decay_simple_waveform))

View File

@ -234,6 +234,8 @@ def read_wishbone(bus, address,):
(yield bus.stb.eq(0)) (yield bus.stb.eq(0))
(yield bus.cyc.eq(0)) (yield bus.cyc.eq(0))
yield # Tick
break break
else: else:
# Tick until we receive an ACK # Tick until we receive an ACK
@ -249,24 +251,23 @@ class MockSampler(Module):
Index of data to use from provided data Index of data to use from provided data
""" """
def __init__(self, data: List[int]): def __init__(self, data: List[int]):
memory = Memory(width=10, depth=len(data), init=data) self.specials.memory = Memory(width=10, depth=len(data), init=data)
self.index = Signal(ceil(log2(len(data)))) self.index = Signal(ceil(log2(len(data))))
self.data = Signal(10) self.data = Signal(10)
self.valid = Signal() self.valid = Signal()
read_port = memory.get_port(async_read=True) read_port = self.memory.get_port(async_read=True)
self.comb += [ self.comb += [
read_port.adr.eq(self.index), read_port.adr.eq(self.index),
self.data.eq(read_port.dat_r), self.data.eq(read_port.dat_r),
] ]
class TestSoC(Module): class TestSoC(Module):
def __init__(self, data): def __init__(self, data: List[int], *, buffer_len: int = 1024, num_samplers: int = 1):
sampler = MockSampler(data)
self.submodules.sampler = sampler
# TODO multiple mock samplers to test that functionality # TODO multiple mock samplers to test that functionality
self.controller = SamplerController([MockSampler(data)], 1024) self.samplers = [MockSampler(data) for _ in range(num_samplers)]
self.controller = SamplerController(self.samplers, buffer_len)
self.submodules.controller = self.controller self.submodules.controller = self.controller
self.bus = self.controller.bus self.bus = self.controller.bus
@ -281,3 +282,139 @@ def test_bus_access():
# TODO test writing to RO register fails # TODO test writing to RO register fails
run_simulation(dut, test_fn(), vcd_name="test_bus_access.vcd") run_simulation(dut, test_fn(), vcd_name="test_bus_access.vcd")
def test_simple_waveform():
"""End-to-end test of a simple waveform"""
from .peak_detector import create_waveform
_, data = create_waveform()
data = [int(d) for d in data]
dut = TestSoC(data, buffer_len=32)
def test_fn():
# Set settings
yield from write_wishbone(dut.bus, 2, 0) # trigger_run_len = 0
yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800
yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10
yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1
yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0
# Start controller
yield from write_wishbone(dut.bus, 0, 1)
triggered_yet = False
triggered_num = 0
for i in range(1000):
(yield dut.samplers[0].index.eq(i))
(yield dut.samplers[0].valid.eq(1))
yield
(yield dut.samplers[0].valid.eq(0))
yield
# Total of 6 clocks per sample clock
yield
yield
yield
yield
if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1:
# Triggered, now we need to run some number of cycles
triggered_yet = True
if triggered_yet:
triggered_num += 1
if triggered_num > 32:
# We should now have collected all our samples
yield from read_wishbone(dut.bus, 1)
assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!"
# Check that length is correct
yield from read_wishbone(dut.bus, 0x100)
len = (yield dut.bus.dat_r)
assert len == 32, f"Len ({len}) not correct!"
# Read data in
data = []
for i in range(32):
yield from read_wishbone(dut.bus, 0x800 + i)
sample = (yield dut.bus.dat_r)
data.append(sample)
# Test pass
return
assert False, "We should have triggered"
run_simulation(dut, test_fn())
def test_simple_waveform_capture_offset():
"""Test a simple waveform captured at an offset"""
from .peak_detector import create_waveform
_, data = create_waveform()
data = [int(d) for d in data]
dut = TestSoC(data, buffer_len=32)
def test_fn():
# Set settings
yield from write_wishbone(dut.bus, 2, 16) # trigger_run_len = 16
yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800
yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10
yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1
yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0
# Start controller
yield from write_wishbone(dut.bus, 0, 1)
triggered_yet = False
triggered_num = 0
for i in range(1000):
(yield dut.samplers[0].index.eq(i))
(yield dut.samplers[0].valid.eq(1))
yield
(yield dut.samplers[0].valid.eq(0))
yield
# Total of 6 clocks per sample clock
yield
yield
yield
yield
if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1:
# Triggered, now we need to run some number of cycles
triggered_yet = True
if triggered_yet:
triggered_num += 1
if triggered_num > 16:
# We should now have collected all our samples
yield from read_wishbone(dut.bus, 1)
assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!"
# Check that length is correct
yield from read_wishbone(dut.bus, 0x100)
len = (yield dut.bus.dat_r)
assert len == 32, f"Len ({len}) not correct!"
# Read data in
data = []
for i in range(32):
yield from read_wishbone(dut.bus, 0x800 + i)
sample = (yield dut.bus.dat_r)
data.append(sample)
# Manually validated from test above to be offset into the
# data
assert data[0] == 138
assert data[1] == 132
# Test pass
return
assert False, "We should have triggered"
run_simulation(dut, test_fn())