353 lines
11 KiB
Python
353 lines
11 KiB
Python
from migen import *
|
|
|
|
class PeakDetector(Module):
|
|
"""
|
|
Module to detect when peak to peak voltage is high enough to consider incoming
|
|
data to be a valid ping. Configuration is provided by setting the configuration
|
|
attributes. Do not change these settings while detector is running.
|
|
|
|
Attributes
|
|
----------
|
|
data: (input)
|
|
Data signal to use for detection
|
|
|
|
data_valid: (input)
|
|
Strobed signal that indicates value on `data` is valid to be read
|
|
|
|
enable: (input)
|
|
Enables running peak detection. De-asserting this will clear all state variables
|
|
|
|
triggered: (output)
|
|
Signal that indicates peak has been triggered. Only cleared once enable is de-asserted again
|
|
|
|
Configuration Attributes
|
|
------------------------
|
|
thresh_value:
|
|
Minimum peak to peak value considered triggered
|
|
|
|
thresh_time:
|
|
Number of consecutive samples above threshold required to consider triggered
|
|
|
|
decay_value:
|
|
Decay value to subtract from peak values to prevent false triggers
|
|
|
|
decay_period:
|
|
Number of samples between each application of decay
|
|
"""
|
|
|
|
def __init__(self, data_width: int):
|
|
# Create all state signals (underscored in self to be accessible in tests)
|
|
self._min_val = Signal(data_width)
|
|
self._max_val = Signal(data_width)
|
|
self._diff = Signal(data_width)
|
|
self._triggered_time = Signal(32)
|
|
self._decay_counter = Signal(32)
|
|
|
|
# Control signals
|
|
self.data = Signal(data_width)
|
|
self.data_valid = Signal()
|
|
self.enable = Signal()
|
|
self.triggered = Signal()
|
|
|
|
# Configuration Parameters
|
|
self.thresh_value = Signal(data_width)
|
|
self.thresh_time = Signal(32)
|
|
self.decay_value = Signal(data_width)
|
|
self.decay_period = Signal(32)
|
|
|
|
self.sync += If(~self.enable,
|
|
# Reset halfway. ADCs are 0-2V, and everything should be centered at 1V, so this is approximating the initial value
|
|
self._min_val.eq(int(2**data_width /2)),
|
|
self._max_val.eq(int(2**data_width /2)),
|
|
self.triggered.eq(0),
|
|
self._decay_counter.eq(0),
|
|
self._triggered_time.eq(0),
|
|
)
|
|
|
|
# Constantly updating self._diff to simplify some statements
|
|
self.comb += self._diff.eq(self._max_val - self._min_val)
|
|
|
|
self.sync += If(self.enable & self.data_valid,
|
|
# Decay should run irrespective of if we have triggered,
|
|
# and before everything else so it can be overwritten
|
|
self._decay_counter.eq(self._decay_counter + 1),
|
|
# Decay threshold has been reached, apply decay to peaks
|
|
If(self._decay_counter >= self.decay_period,
|
|
self._decay_counter.eq(0),
|
|
|
|
# Only apply decay if the values would not overlap, and we use the decay
|
|
If((self._diff >= (self.decay_value << 1)) & (self.decay_value > 0),
|
|
self._max_val.eq(self._max_val - self.decay_value),
|
|
self._min_val.eq(self._min_val + self.decay_value))),
|
|
|
|
|
|
# Update maximum value
|
|
If(self.data > self._max_val, self._max_val.eq(self.data)),
|
|
# Update minimum value
|
|
If(self.data < self._min_val, self._min_val.eq(self.data)),
|
|
If(self._diff > self.thresh_value,
|
|
# We have met the threshold for triggering, start counting
|
|
self._triggered_time.eq(self._triggered_time + 1),
|
|
|
|
# We have triggered, so we can set the output. After this point,
|
|
# nothing we do matters until enable is de-asserted and we reset
|
|
# triggered.
|
|
If(self._triggered_time + 1 >= self.thresh_time, self.triggered.eq(1)))
|
|
.Else(
|
|
# We have not met the threshold, reset timer
|
|
self._triggered_time.eq(0),
|
|
),
|
|
|
|
)
|
|
|
|
|
|
from typing import Tuple
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
def create_waveform(*, dc_bias: int = 0, scale: float = 1) -> Tuple[np.ndarray[float], np.ndarray[int]]:
|
|
"""
|
|
Create a simple 40kHz sine wave in integer values that can be used by peak detector
|
|
"""
|
|
assert scale <= 1.0, "Scale factor must be some ratio of full range"
|
|
|
|
# Constants
|
|
f_s = 10e6 # Sample rate (Hz)
|
|
f = 40e3 # Signal Frequency (Hz)
|
|
t = 0.002 # Sample period (s)
|
|
n = int(f_s * 0.002) # Number of samples
|
|
|
|
# Create time from 0ms to 2ms
|
|
x = np.linspace(0, t, n)
|
|
|
|
# Create signal!
|
|
y = np.sin(x * 2*np.pi*f)
|
|
|
|
# Scale according to user inputs
|
|
y = y * scale
|
|
|
|
# Convert to positive integer at provided bias
|
|
signal = np.ndarray((len(y)), dtype=np.uint16)
|
|
# "unsafe" casting because numpy doesn't know we are staying under 10 bit values
|
|
np.copyto(signal, y * 512 + 512 + dc_bias, casting='unsafe')
|
|
|
|
#plt.plot(x, signal)
|
|
#plt.show()
|
|
|
|
return x, signal
|
|
|
|
|
|
def set_settings(dut: PeakDetector, thresh_value: int, thresh_time: int, decay_value: int, decay_period: int) -> None:
|
|
"""Set peak detector settings simply"""
|
|
(yield dut.thresh_value.eq(thresh_value))
|
|
(yield dut.thresh_time.eq(thresh_time))
|
|
(yield dut.decay_value.eq(decay_value))
|
|
(yield dut.decay_period.eq(decay_period))
|
|
|
|
# Load in values with a new clock
|
|
yield
|
|
|
|
|
|
# NOTE: These tests aren't really exhaustive. They get good coverage and generally outline results,
|
|
# but do require some amount of manual validation and checking of waveforms if things change
|
|
# majorly. At least they are a small set of things that I don't have to re-create later.
|
|
|
|
|
|
def test_simple_waveform():
|
|
"""Test an ideal waveform with simple settings guaranteed to trigger"""
|
|
(_, signal) = create_waveform()
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
# First set settings to be simple, we want to trigger pretty much immediately
|
|
yield from set_settings(dut, 800, 10, 0, 0)
|
|
|
|
# Enable device
|
|
(yield dut.enable.eq(1))
|
|
yield
|
|
|
|
# Load data in until we trigger
|
|
for i, val in enumerate(signal):
|
|
if (yield dut.triggered) == 1:
|
|
# Test passed!
|
|
return
|
|
|
|
# Load in data, set valid
|
|
(yield dut.data.eq(int(val)))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
assert False, "No trigger, test has failed..."
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
def test_scrunched_simple_waveform():
|
|
"""Test a smaller waveform with smaller peaks"""
|
|
(_, signal) = create_waveform(scale=0.4)
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
yield from set_settings(dut, 200, 10, 0, 0)
|
|
|
|
# Enable
|
|
(yield dut.enable.eq(1))
|
|
yield
|
|
|
|
# Load data in until we trigger
|
|
for i, val in enumerate(signal):
|
|
if (yield dut.triggered) == 1:
|
|
# Test passed!
|
|
return
|
|
|
|
# Load in data, set valid
|
|
(yield dut.data.eq(int(val)))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
assert False, "No trigger, test has failed..."
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
def test_decay_simple_waveform():
|
|
"""Test that simple case of decay works """
|
|
(_, signal) = create_waveform()
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
yield from set_settings(dut, 800, 10, 5, 10)
|
|
|
|
# Enable
|
|
(yield dut.enable.eq(1))
|
|
yield
|
|
|
|
# Load data in until we trigger
|
|
for i, val in enumerate(signal):
|
|
if (yield dut.triggered) == 1:
|
|
# Test passed!
|
|
return
|
|
|
|
# Load in data, set valid
|
|
(yield dut.data.eq(int(val)))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
assert False, "No trigger, test has failed..."
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
def test_decay_simple_waveform_too_much():
|
|
"""Test that we can overuse decay and discard valid waveforms"""
|
|
(_, signal) = create_waveform()
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
yield from set_settings(dut, 800, 10, 40, 0)
|
|
|
|
# Enable
|
|
(yield dut.enable.eq(1))
|
|
yield
|
|
|
|
# Load data in, ensuring we don't trigger
|
|
for i, val in enumerate(signal):
|
|
assert (yield dut.triggered) == 0, "Must not trigger!"
|
|
|
|
# Load in data, set valid
|
|
(yield dut.data.eq(int(val)))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
def test_decay_compensates_bias():
|
|
signal = [800] * int(20e3)
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
yield from set_settings(dut, 800 - 512, 20, 1, 0)
|
|
|
|
# Enable
|
|
(yield dut.enable.eq(1))
|
|
yield
|
|
|
|
for i, val in enumerate(signal):
|
|
assert (yield dut.triggered) == 0, "Must not trigger!"
|
|
assert i < 500, "Decay must not take too long to work!"
|
|
|
|
# min val won't necessarily match max_val because of the crossover, but as long
|
|
# as it's close enough, we're good
|
|
if (yield dut._max_val) == 800 and (yield dut._min_val) >= 798:
|
|
# Test pass
|
|
return
|
|
|
|
# Load data in
|
|
(yield dut.data.eq(val))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
def test_biased_simple_waveform():
|
|
"""Test a biased waveform. This does also need to test decay, else we get invalid results
|
|
|
|
TODO this test is slightly broken. I think it's passing on the initial bias, not later stuff.
|
|
It should get fixed, but I think it mostly covers area already covered in other tests so I'm
|
|
fine with this for now.
|
|
"""
|
|
(_, signal) = create_waveform(dc_bias=200)
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
yield from set_settings(dut, 200, 20, 20, 1)
|
|
|
|
# Enable
|
|
(yield dut.enable.eq(1))
|
|
yield
|
|
|
|
# Load data in until we trigger
|
|
for i, val in enumerate(signal):
|
|
if (yield dut.triggered) == 1:
|
|
# Test passed!
|
|
return
|
|
|
|
# Load in data, set valid
|
|
(yield dut.data.eq(int(val)))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
assert False, "No trigger, test has failed..."
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
def test_noise_spike():
|
|
"""Test that appropriate filtering and decay can filter out a spike in noise"""
|
|
signal = [512, 512, 512, 1024, 1024] + [512] * 1000
|
|
dut = PeakDetector(10)
|
|
|
|
def test_fn():
|
|
yield from set_settings(dut, 300, 20, 20, 0)
|
|
|
|
# Enable
|
|
(yield dut.enable.eq(1))
|
|
yield # Tick
|
|
|
|
# Load data in until we trigger
|
|
for val in signal:
|
|
assert (yield dut.triggered) == 0, "Can't trigger!"
|
|
|
|
# Load in data, set valid
|
|
(yield dut.data.eq(int(val)))
|
|
(yield dut.data_valid.eq(1))
|
|
yield # Tick
|
|
|
|
# Test success
|
|
return
|
|
|
|
run_simulation(dut, test_fn())
|