from migen import * class PeakDetector(Module): """ Module to detect when peak to peak voltage is high enough to consider incoming data to be a valid ping. Configuration is provided by setting the configuration attributes. Do not change these settings while detector is running. Attributes ---------- data: (input) Data signal to use for detection data_valid: (input) Strobed signal that indicates value on `data` is valid to be read enable: (input) Enables running peak detection. De-asserting this will clear all state variables triggered: (output) Signal that indicates peak has been triggered. Only cleared once enable is de-asserted again Configuration Attributes ------------------------ thresh_value: Minimum peak to peak value considered triggered thresh_time: Number of consecutive samples above threshold required to consider triggered decay_value: Decay value to subtract from peak values to prevent false triggers decay_period: Number of samples between each application of decay """ def __init__(self, data_width: int): # Create all state signals (underscored in self to be accessible in tests) self._min_val = Signal(data_width) self._max_val = Signal(data_width) self._diff = Signal(data_width) self._triggered_time = Signal(32) self._decay_counter = Signal(32) # Control signals self.data = Signal(data_width) self.data_valid = Signal() self.enable = Signal() self.triggered = Signal() # Configuration Parameters self.thresh_value = Signal(data_width) self.thresh_time = Signal(32) self.decay_value = Signal(data_width) self.decay_period = Signal(32) self.sync += If(~self.enable, # Reset halfway. ADCs are 0-2V, and everything should be centered at 1V, so this is approximating the initial value self._min_val.eq(int(2**data_width /2)), self._max_val.eq(int(2**data_width /2)), self.triggered.eq(0), self._decay_counter.eq(0), self._triggered_time.eq(0), ) # Constantly updating self._diff to simplify some statements self.comb += self._diff.eq(self._max_val - self._min_val) self.sync += If(self.enable & self.data_valid, # Decay should run irrespective of if we have triggered, # and before everything else so it can be overwritten self._decay_counter.eq(self._decay_counter + 1), # Decay threshold has been reached, apply decay to peaks If(self._decay_counter >= self.decay_period, self._decay_counter.eq(0), # Only apply decay if the values would not overlap, and we use the decay If((self._diff >= (self.decay_value << 1)) & (self.decay_value > 0), self._max_val.eq(self._max_val - self.decay_value), self._min_val.eq(self._min_val + self.decay_value))), # Update maximum value If(self.data > self._max_val, self._max_val.eq(self.data)), # Update minimum value If(self.data < self._min_val, self._min_val.eq(self.data)), If(self._diff > self.thresh_value, # We have met the threshold for triggering, start counting self._triggered_time.eq(self._triggered_time + 1), # We have triggered, so we can set the output. After this point, # nothing we do matters until enable is de-asserted and we reset # triggered. If(self._triggered_time + 1 >= self.thresh_time, self.triggered.eq(1))) .Else( # We have not met the threshold, reset timer self._triggered_time.eq(0), ), ) from typing import Tuple import numpy as np import matplotlib.pyplot as plt def create_waveform(*, dc_bias: int = 0, scale: float = 1) -> Tuple[np.ndarray[float], np.ndarray[int]]: """ Create a simple 40kHz sine wave in integer values that can be used by peak detector """ assert scale <= 1.0, "Scale factor must be some ratio of full range" # Constants f_s = 10e6 # Sample rate (Hz) f = 40e3 # Signal Frequency (Hz) t = 0.002 # Sample period (s) n = int(f_s * 0.002) # Number of samples # Create time from 0ms to 2ms x = np.linspace(0, t, n) # Create signal! y = np.sin(x * 2*np.pi*f) # Scale according to user inputs y = y * scale # Convert to positive integer at provided bias signal = np.ndarray((len(y)), dtype=np.uint16) # "unsafe" casting because numpy doesn't know we are staying under 10 bit values np.copyto(signal, y * 512 + 512 + dc_bias, casting='unsafe') #plt.plot(x, signal) #plt.show() return x, signal def set_settings(dut: PeakDetector, thresh_value: int, thresh_time: int, decay_value: int, decay_period: int) -> None: """Set peak detector settings simply""" (yield dut.thresh_value.eq(thresh_value)) (yield dut.thresh_time.eq(thresh_time)) (yield dut.decay_value.eq(decay_value)) (yield dut.decay_period.eq(decay_period)) # Load in values with a new clock yield # NOTE: These tests aren't really exhaustive. They get good coverage and generally outline results, # but do require some amount of manual validation and checking of waveforms if things change # majorly. At least they are a small set of things that I don't have to re-create later. def test_simple_waveform(): """Test an ideal waveform with simple settings guaranteed to trigger""" (_, signal) = create_waveform() dut = PeakDetector(10) def test_fn(): # First set settings to be simple, we want to trigger pretty much immediately yield from set_settings(dut, 800, 10, 0, 0) # Enable device (yield dut.enable.eq(1)) yield # Load data in until we trigger for i, val in enumerate(signal): if (yield dut.triggered) == 1: # Test passed! return # Load in data, set valid (yield dut.data.eq(int(val))) (yield dut.data_valid.eq(1)) yield # Tick assert False, "No trigger, test has failed..." run_simulation(dut, test_fn()) def test_scrunched_simple_waveform(): """Test a smaller waveform with smaller peaks""" (_, signal) = create_waveform(scale=0.4) dut = PeakDetector(10) def test_fn(): yield from set_settings(dut, 200, 10, 0, 0) # Enable (yield dut.enable.eq(1)) yield # Load data in until we trigger for i, val in enumerate(signal): if (yield dut.triggered) == 1: # Test passed! return # Load in data, set valid (yield dut.data.eq(int(val))) (yield dut.data_valid.eq(1)) yield # Tick assert False, "No trigger, test has failed..." run_simulation(dut, test_fn()) def test_decay_simple_waveform(): """Test that simple case of decay works """ (_, signal) = create_waveform() dut = PeakDetector(10) def test_fn(): yield from set_settings(dut, 800, 10, 5, 10) # Enable (yield dut.enable.eq(1)) yield # Load data in until we trigger for i, val in enumerate(signal): if (yield dut.triggered) == 1: # Test passed! return # Load in data, set valid (yield dut.data.eq(int(val))) (yield dut.data_valid.eq(1)) yield # Tick assert False, "No trigger, test has failed..." run_simulation(dut, test_fn()) def test_decay_simple_waveform_too_much(): """Test that we can overuse decay and discard valid waveforms""" (_, signal) = create_waveform() dut = PeakDetector(10) def test_fn(): yield from set_settings(dut, 800, 10, 40, 0) # Enable (yield dut.enable.eq(1)) yield # Load data in, ensuring we don't trigger for i, val in enumerate(signal): assert (yield dut.triggered) == 0, "Must not trigger!" # Load in data, set valid (yield dut.data.eq(int(val))) (yield dut.data_valid.eq(1)) yield # Tick run_simulation(dut, test_fn()) def test_decay_compensates_bias(): signal = [800] * int(20e3) dut = PeakDetector(10) def test_fn(): yield from set_settings(dut, 800 - 512, 20, 1, 0) # Enable (yield dut.enable.eq(1)) yield for i, val in enumerate(signal): assert (yield dut.triggered) == 0, "Must not trigger!" assert i < 500, "Decay must not take too long to work!" # min val won't necessarily match max_val because of the crossover, but as long # as it's close enough, we're good if (yield dut._max_val) == 800 and (yield dut._min_val) >= 798: # Test pass return # Load data in (yield dut.data.eq(val)) (yield dut.data_valid.eq(1)) yield # Tick run_simulation(dut, test_fn()) def test_biased_simple_waveform(): """Test a biased waveform. This does also need to test decay, else we get invalid results TODO this test is slightly broken. I think it's passing on the initial bias, not later stuff. It should get fixed, but I think it mostly covers area already covered in other tests so I'm fine with this for now. """ (_, signal) = create_waveform(dc_bias=200) dut = PeakDetector(10) def test_fn(): yield from set_settings(dut, 200, 20, 20, 1) # Enable (yield dut.enable.eq(1)) yield # Load data in until we trigger for i, val in enumerate(signal): if (yield dut.triggered) == 1: # Test passed! return # Load in data, set valid (yield dut.data.eq(int(val))) (yield dut.data_valid.eq(1)) yield # Tick assert False, "No trigger, test has failed..." run_simulation(dut, test_fn()) def test_noise_spike(): """Test that appropriate filtering and decay can filter out a spike in noise""" signal = [512, 512, 512, 1024, 1024] + [512] * 1000 dut = PeakDetector(10) def test_fn(): yield from set_settings(dut, 300, 20, 20, 0) # Enable (yield dut.enable.eq(1)) yield # Tick # Load data in until we trigger for val in signal: assert (yield dut.triggered) == 0, "Can't trigger!" # Load in data, set valid (yield dut.data.eq(int(val))) (yield dut.data_valid.eq(1)) yield # Tick # Test success return run_simulation(dut, test_fn())