from migen import * from litex.soc.interconnect.wishbone import * from math import ceil, log2 from typing import List from .sampler import Sampler from .circular_buffer import CircularBuffer from .peak_detector import PeakDetector class SamplerController(Module): """ Sampler control Attributes ---------- bus: Slave wishbone bus to be connected to a higher-level bus. Has an address width set according to the provided buffer length. buffers: List of FIFO buffer objects used to store sample data. samplers: List of sampler objects provided by user. Registers -------- 0x00: Control Register (WO) Bit 0 - Start capture Bit 1 - Stop capture. Does nothing if capture is not ongoing Bit 2 - Clear sample buffers 0x01: Status Register (RO) Bit 0 - Capture complete. Set by peak detection block and cleared when capture is began Bit 1 - Sampling running 0x02: trigger_run_len (RW) Number of samples to acquire after triggering sample. 0x03: thresh_value (RW) Minimum peak to peak value considered triggered 0x04: thresh_time (RW) Number of consecutive samples above threshold required to consider triggered 0x05: decay_value (RW) Decay value to subtract from peak values to prevent false triggers 0x06: decay_period (RW) Number of samples between each application of decay 0x1xx: BUFFER_LEN_X (RO) Lenght of data in buffer, up to the number of samplers provided. """ def __init__(self, samplers: List[Sampler], buffer_len): self.samplers = samplers num_channels = len(samplers) # Enables reading in samples sample_enable = Signal() # Pull in only one CDC sync signal sample_ready = self.samplers[0].valid # Generate buffers for each sampler self.buffers = [CircularBuffer(9, buffer_len) for _ in range(num_channels)] # Connect each buffer to each sampler for buffer, sampler in zip(self.buffers, self.samplers): self.submodules += buffer self.submodules += sampler self.comb += [ # Connect only top 9 bits to memory buffer.wr_data.eq(sampler.data[1:]), # Writes enter FIFO only when enabled and every clock cycle buffer.wr_valid.eq(sample_enable & sample_ready), ] # Each sampler gets some chunk of memory at least large enough to fit # all of it's data, so use that as a consistent offset. Use a minimum # address of 0x800 to avoid conflicts with control registers sample_mem_addr_width = max(ceil(log2(buffer_len)), ceil(log2(0x800))) # 1 control block + number of channels used = control bits control_block_addr_width = ceil(log2(num_channels + 1)) # Bus address width addr_width = (num_channels + 1) * sample_mem_addr_width # "Master" bus self.bus = Interface(data_width=32, adr_width=addr_width) # Wishbone bus used for mapping control registers self.control_regs_bus = Interface(data_width=32, adr_width=sample_mem_addr_width) slaves = [] slaves.append((lambda adr: adr[sample_mem_addr_width:] == 0, self.control_regs_bus)) for i, buffer in enumerate(self.buffers): # Connect subordinate buses of buffers to decoder slaves.append((lambda adr: adr[sample_mem_addr_width:] == i + 1, buffer.bus)) adr = (i + 1) << sample_mem_addr_width print(f"Sampler {i} available at 0x{adr:08x}") self.submodules.decoder = Decoder(self.bus, slaves) self.submodules.peak_detector = PeakDetector(10) self.comb += [ # Simply enable whenever we start capturing self.peak_detector.enable.eq(sample_enable), # Connect to the first ADC self.peak_detector.data.eq(self.samplers[0].data), # Use the same criteria as the fifo buffer self.peak_detector.data_valid.eq(sample_enable & sample_ready), ] #### Control register logic # Storage control_register = Signal(32) status_register = Signal(32) trigger_run_len = Signal(32) def rw_register(storage: Signal, *, read: bool = True, write: bool = True): if read: read = self.control_regs_bus.dat_r.eq(storage) else: read = self.control_regs_bus.ack.eq(0) if write: write = storage.eq(self.control_regs_bus.dat_w) else: write = self.control_regs_bus.ack.eq(0) return If(self.control_regs_bus.we, write).Else(read) # Handle explicit config registers cases = { 0: rw_register(control_register, read=False), 1: rw_register(status_register, write=False), 2: rw_register(trigger_run_len), 3: rw_register(self.peak_detector.thresh_value), 4: rw_register(self.peak_detector.thresh_time), 5: rw_register(self.peak_detector.decay_value), 6: rw_register(self.peak_detector.decay_period), "default": rw_register(None, read=False, write=False) } # Handle length values for each sample buffer for i, buffer in enumerate(self.buffers): cases.update({(0x100 >> 2) + i: rw_register(buffer.len, write=False)}) # Connect up control registers bus self.sync += [ self.control_regs_bus.ack.eq(0), # Hold control register low to use as strobe functionality control_register.eq(0), If(self.control_regs_bus.cyc & self.control_regs_bus.stb, self.control_regs_bus.ack.eq(1), Case(self.control_regs_bus.adr, cases)), ] # Handle the control logic post_trigger_count = Signal(32) self.sync += [ # Reset state whenever sampling is disabled If(~sample_enable, post_trigger_count.eq(0)), # Reset triggering status if we have started sampling # (peak_detector.triggered resets if sample_enable is de-asserted, so # this is a reliable reset mechanism) If(sample_enable & ~self.peak_detector.triggered, status_register[0].eq(0)), # Keep sampling past the trigger for the configured number of samples If(self.peak_detector.triggered & sample_enable & sample_ready, post_trigger_count.eq(post_trigger_count + 1), # We have sampled enough, update status and stop sampling If(post_trigger_count + 1 >= trigger_run_len, status_register[0].eq(1), sample_enable.eq(0))), ] # Update register storage self.sync += [ status_register[1].eq(sample_enable), If(control_register[0], sample_enable.eq(1)), If(control_register[1], sample_enable.eq(0)), ] for buffer in self.buffers: self.sync += [ buffer.clear.eq(0), If(control_register[2], buffer.clear.eq(1)), ] def write_wishbone(bus, address, value): # Set up bus (yield bus.adr.eq(address)) (yield bus.dat_w.eq(value)) (yield bus.stb.eq(1)) (yield bus.cyc.eq(1)) (yield bus.we.eq(1)) yield cycles = 0 while True: cycles += 1 assert cycles < 5, "Write fail" if (yield bus.ack) == 1: # We received a response, clear out bus status and exit (yield bus.stb.eq(0)) (yield bus.cyc.eq(0)) yield break else: # Tick until we receive an ACK yield def read_wishbone(bus, address,): """Sets up a read transaction. Due to limitations of the simulation method, you have to read from dat_r, and also tick immediately after calling""" # Set up bus (yield bus.adr.eq(address)) (yield bus.stb.eq(1)) (yield bus.cyc.eq(1)) (yield bus.we.eq(0)) yield cycles = 0 while True: cycles += 1 assert cycles < 5, "Write fail" if (yield bus.ack) == 1: # We received a response, clear out bus status and exit (yield bus.stb.eq(0)) (yield bus.cyc.eq(0)) yield # Tick break else: # Tick until we receive an ACK yield class MockSampler(Module): """ Attributes ---------- All Sampler attributes by default, plus the following: index: Index of data to use from provided data """ def __init__(self, data: List[int]): self.specials.memory = Memory(width=10, depth=len(data), init=data) self.index = Signal(ceil(log2(len(data)))) self.data = Signal(10) self.valid = Signal() read_port = self.memory.get_port(async_read=True) self.comb += [ read_port.adr.eq(self.index), self.data.eq(read_port.dat_r), ] class TestSoC(Module): def __init__(self, data: List[int], *, buffer_len: int = 1024, num_samplers: int = 1): # TODO multiple mock samplers to test that functionality self.samplers = [MockSampler(data) for _ in range(num_samplers)] self.controller = SamplerController(self.samplers, buffer_len) self.submodules.controller = self.controller self.bus = self.controller.bus def test_bus_access(): dut = TestSoC([2, 3, 4, 5]) def test_fn(): yield from write_wishbone(dut.bus, 2, 0xDEADBEEF) yield from read_wishbone(dut.bus, 2) assert (yield dut.bus.dat_r) == 0xDEADBEEF, "Read failed!" # TODO test writing to RO register fails run_simulation(dut, test_fn(), vcd_name="test_bus_access.vcd") def test_simple_waveform(): """End-to-end test of a simple waveform""" from .peak_detector import create_waveform _, data = create_waveform() data = [int(d) for d in data] dut = TestSoC(data, buffer_len=32) def test_fn(): # Set settings yield from write_wishbone(dut.bus, 2, 0) # trigger_run_len = 0 yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800 yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10 yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1 yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0 # Start controller yield from write_wishbone(dut.bus, 0, 1) triggered_yet = False triggered_num = 0 for i in range(1000): (yield dut.samplers[0].index.eq(i)) (yield dut.samplers[0].valid.eq(1)) yield (yield dut.samplers[0].valid.eq(0)) yield # Total of 6 clocks per sample clock yield yield yield yield if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1: # Triggered, now we need to run some number of cycles triggered_yet = True if triggered_yet: triggered_num += 1 if triggered_num > 32: # We should now have collected all our samples yield from read_wishbone(dut.bus, 1) assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!" # Check that length is correct yield from read_wishbone(dut.bus, 0x100) len = (yield dut.bus.dat_r) assert len == 32, f"Len ({len}) not correct!" # Read data in data = [] for i in range(32): yield from read_wishbone(dut.bus, 0x800 + i) sample = (yield dut.bus.dat_r) data.append(sample) # Manually validated, this is what we should read on a correct # run assert data[15] == 138 assert data[16] == 132 # Test pass return assert False, "We should have triggered" run_simulation(dut, test_fn()) def test_simple_waveform_capture_offset(): """Test a simple waveform captured at an offset""" from .peak_detector import create_waveform _, data = create_waveform() data = [int(d) for d in data] dut = TestSoC(data, buffer_len=32) def test_fn(): # Set settings yield from write_wishbone(dut.bus, 2, 16) # trigger_run_len = 16 yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800 yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10 yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1 yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0 # Start controller yield from write_wishbone(dut.bus, 0, 1) triggered_yet = False triggered_num = 0 for i in range(1000): (yield dut.samplers[0].index.eq(i)) (yield dut.samplers[0].valid.eq(1)) yield (yield dut.samplers[0].valid.eq(0)) yield # Total of 6 clocks per sample clock yield yield yield yield if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1: # Triggered, now we need to run some number of cycles triggered_yet = True if triggered_yet: triggered_num += 1 if triggered_num > 16: # We should now have collected all our samples yield from read_wishbone(dut.bus, 1) assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!" # Check that length is correct yield from read_wishbone(dut.bus, 0x100) len = (yield dut.bus.dat_r) assert len == 32, f"Len ({len}) not correct!" # Read data in data = [] for i in range(32): yield from read_wishbone(dut.bus, 0x800 + i) sample = (yield dut.bus.dat_r) data.append(sample) # Manually validated from test above to be offset into the # data assert data[0] == 138 assert data[1] == 132 # Test pass return assert False, "We should have triggered" run_simulation(dut, test_fn()) def test_multiple_reads(): """ Testing multiple triggers/captures in succession to ensure typical (i.e. repeated) operation works correctly. """ # Enable, trigger works correctly # Enable, tick a bit of data in, should not trigger, and trigger should have reset immediately # Enable again, and tick in lots of data, should trigger again now """Test a simple waveform captured at an offset""" from .peak_detector import create_waveform _, data = create_waveform() data = [int(d) for d in data] dut = TestSoC(data, buffer_len=32) def test_fn(): # Set settings yield from write_wishbone(dut.bus, 2, 0) # trigger_run_len = 0 yield from write_wishbone(dut.bus, 3, 800) # thresh_value = 800 yield from write_wishbone(dut.bus, 4, 10) # thresh_time = 10 yield from write_wishbone(dut.bus, 5, 1) # decay_value = 1 yield from write_wishbone(dut.bus, 5, 0) # decay_period = 0 # Start controller yield from write_wishbone(dut.bus, 0, 1) triggered_yet = False triggered_num = 0 for i in range(1000): (yield dut.samplers[0].index.eq(i)) (yield dut.samplers[0].valid.eq(1)) yield (yield dut.samplers[0].valid.eq(0)) yield # Total of 6 clocks per sample clock yield yield yield yield if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1: # Triggered, now we need to run some number of cycles triggered_yet = True if triggered_yet: triggered_num += 1 if triggered_num > 16: # We should now have collected all our samples yield from read_wishbone(dut.bus, 1) assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!" # Check that length is correct yield from read_wishbone(dut.bus, 0x100) len = (yield dut.bus.dat_r) assert len == 32, f"Len ({len}) not correct!" # Read data in data = [] for i in range(32): yield from read_wishbone(dut.bus, 0x800 + i) sample = (yield dut.bus.dat_r) data.append(sample) # Manually validated from test above to be offset into the # data assert data[15] == 138 assert data[16] == 132 break assert triggered_yet, "We should have triggered" # Clear out sampler and re-enable yield from write_wishbone(dut.bus, 0, 0b101) yield assert (yield dut.controller.peak_detector.triggered) == 0, "Trigger should have been cleared" assert (yield dut.controller.buffers[0].len) == 0, "Buffers should have been cleared" # Tick a few clocks through, and we shouldn't have triggered for i in range(10): (yield dut.samplers[0].index.eq(i)) (yield dut.samplers[0].valid.eq(1)) yield (yield dut.samplers[0].valid.eq(0)) yield # Total of 6 clocks per sample clock yield yield yield yield assert (yield dut.controller.peak_detector.triggered) == 0, "We didn't push enough data through to trigger" # Disable sampler, run lots of data through, we should not trigger yield from write_wishbone(dut.bus, 0, 0b010) for i in range(1000): (yield dut.samplers[0].index.eq(i)) (yield dut.samplers[0].valid.eq(1)) yield (yield dut.samplers[0].valid.eq(0)) yield # Total of 6 clocks per sample clock yield yield yield yield # Enable sampler and run again, we should get another trigger yield from write_wishbone(dut.bus, 2, 16) # trigger_run_len = 16 yield from write_wishbone(dut.bus, 0, 1) triggered_yet = False triggered_num = 0 for i in range(1000): (yield dut.samplers[0].index.eq(i)) (yield dut.samplers[0].valid.eq(1)) yield (yield dut.samplers[0].valid.eq(0)) yield # Total of 6 clocks per sample clock yield yield yield yield if not triggered_yet and (yield dut.controller.peak_detector.triggered) == 1: # Triggered, now we need to run some number of cycles triggered_yet = True if triggered_yet: triggered_num += 1 if triggered_num > 16: # We should now have collected all our samples yield from read_wishbone(dut.bus, 1) assert (yield dut.bus.dat_r) == 1, "Trigger did not propogate to WB!" # Check that length is correct yield from read_wishbone(dut.bus, 0x100) len = (yield dut.bus.dat_r) assert len == 32, f"Len ({len}) not correct!" # Read data in data = [] for i in range(32): yield from read_wishbone(dut.bus, 0x800 + i) sample = (yield dut.bus.dat_r) data.append(sample) # Manually validated from test above to be offset into the # data assert data[0] == 138 assert data[1] == 132 # Test pass return assert triggered_yet, "We should have triggered" run_simulation(dut, test_fn(), vcd_name="controller.vcd")