new-sonar/gateware/sampler/peak_detector.py

175 lines
5.7 KiB
Python

from migen import *
class PeakDetector(Module):
"""
Module to detect when peak to peak voltage is high enough to consider incoming
data to be a valid ping. Configuration is provided by setting the configuration
attributes. Do not change these settings while detector is running.
Attributes
----------
data: (input)
Data signal to use for detection
data_valid: (input)
Strobed signal that indicates value on `data` is valid to be read
enable: (input)
Enables running peak detection. De-asserting this will clear all state variables
triggered: (output)
Signal that indicates peak has been triggered. Only cleared once enable is de-asserted again
Configuration Attributes
------------------------
thresh_value:
Minimum peak to peak value considered triggered
thresh_time:
Number of consecutive samples above threshold required to consider triggered
decay_value:
Decay value to subtract from peak values to prevent false triggers
decay_period:
Number of samples between each application of decay
"""
def __init__(self, data_width: int):
# Create all state signals
min_val = Signal(data_width)
max_val = Signal(data_width)
diff = Signal(data_width)
triggered_time = Signal(32)
decay_counter = Signal(32)
# Control signals
self.data = Signal(data_width)
self.data_valid = Signal()
self.enable = Signal()
self.triggered = Signal()
# Configuration Parameters
self.thresh_value = Signal(data_width)
self.thresh_time = Signal(32)
self.decay_value = Signal(data_width)
self.decay_period = Signal(32)
self.sync += If(~self.enable,
# Reset halfway. ADCs are 0-2V, and everything should be centered at 1V, so this is approximating the initial value
min_val.eq(int(2**data_width /2)),
max_val.eq(int(2**data_width /2)),
self.triggered.eq(0),
decay_counter.eq(0),
triggered_time.eq(0),
)
# Constantly updating diff to simplify some statements
self.comb += diff.eq(max_val - min_val)
self.sync += If(self.enable & self.data_valid,
# Update maximum value
If(self.data > max_val, max_val.eq(self.data)),
# Update minimum value
If(self.data < min_val, min_val.eq(self.data)),
If(diff > self.thresh_value,
# We have met the threshold for triggering, start counting
triggered_time.eq(triggered_time + 1),
decay_counter.eq(0),
# We have triggered, so we can set the output. After this point,
# nothing we do matters until enable is de-asserted and we reset
# triggered.
If(triggered_time + 1 >= self.thresh_time, self.triggered.eq(1)))
.Else(
# We have not met the threshold, reset timer and handle decay
triggered_time.eq(0),
decay_counter.eq(decay_counter + 1),
# Decay threshold has been reached, apply decay to peaks
If(decay_counter >= self.decay_period,
decay_counter.eq(0),
# Only apply decay if the values would not overlap, and we use the decay
If((diff >= (self.decay_value << 1)) & (self.decay_value > 0),
max_val.eq(max_val - self.decay_value),
min_val.eq(min_val + self.decay_value)))
)
)
from typing import Tuple
import numpy as np
import matplotlib.pyplot as plt
def create_waveform(dc_bias: int = 0, scale: float = 1) -> Tuple[np.ndarray[float], np.ndarray[int]]:
"""
Create a simple 40kHz sine wave in integer values that can be used by peak detector
"""
assert scale <= 1.0, "Scale factor must be some ratio of full range"
# Constants
f_s = 10e6 # Sample rate (Hz)
f = 40e3 # Signal Frequency (Hz)
t = 0.002 # Sample period (s)
n = int(f_s * 0.002) # Number of samples
# Create time from 0ms to 2ms
x = np.linspace(0, t, n)
# Create signal!
y = np.sin(x * 2*np.pi*f)
# Scale according to user inputs
y = y * scale
# Convert to positive integer at provided bias
signal = np.ndarray((len(y)), dtype=np.uint16)
# "unsafe" casting because numpy doesn't know we are staying under 10 bit values
np.copyto(signal, y * 512 + 512 + dc_bias, casting='unsafe')
#plt.plot(x, signal)
#plt.show()
return x, signal
def set_settings(dut: PeakDetector, thresh_value: int, thresh_time: int, decay_value: int, decay_period: int) -> None:
"""Set peak detector settings simply"""
(yield dut.thresh_value.eq(thresh_value))
(yield dut.thresh_time.eq(thresh_time))
(yield dut.decay_value.eq(decay_value))
(yield dut.decay_period.eq(decay_period))
# Load in values with a new clock
yield
def test_simple_waveform():
(_, signal) = create_waveform()
dut = PeakDetector(10)
def test_fn():
# First set settings to be simple, we want to trigger pretty much immediately
yield from set_settings(dut, 800, 10, 0, 0)
# Enable device
(yield dut.enable.eq(1))
yield
# Load data in until we trigger
for i, val in enumerate(signal):
if (yield dut.triggered) == 1:
# Test passed!
return
# Load in data, set valid
(yield dut.data.eq(int(val)))
(yield dut.data_valid.eq(1))
yield # Tick
assert False, "No trigger, test has failed..."
run_simulation(dut, test_fn(), vcd_name="peak_detector.vcd")