Just need to hook this into the FIFO I made before, and write all the peak detection, triggering, and trigger enable logic, + hook everything into a single wishbone address space.
222 lines
7.5 KiB
Python
222 lines
7.5 KiB
Python
from migen import *
|
|
|
|
from litex.soc.interconnect.wishbone import *
|
|
|
|
from math import log2, ceil
|
|
|
|
"""
|
|
Random implementation notes:
|
|
|
|
- Circular buffers can keep overwriting. We only need a setting to say how many samples to save after
|
|
trigger occurs.
|
|
- Data valid from samplers to FIFOs can simply be gated via the enable signal. Everything can just run
|
|
all the time to keep things simple
|
|
- can we correct clock skew on the sample clock via Lattice primitives? I think it's possible. I doubt it
|
|
matters. Would need significant calibration effort to even have it be accurate.
|
|
"""
|
|
|
|
class CircularBuffer(Module):
|
|
"""
|
|
Circular buffer implementation that allows users to read the entire data.
|
|
|
|
Assumptions:
|
|
- Reading values while writes are ocurring does not need to have well-defined behaviour
|
|
|
|
Implementation is largely based on Migen SyncFIFO, just tweaked to operate how I want
|
|
"""
|
|
|
|
def __init__(self, width: int, depth: int) -> None:
|
|
storage = Memory(width=width, depth=depth)
|
|
self.specials += storage
|
|
|
|
ptr_width = ceil(log2(depth))
|
|
|
|
# External Signals
|
|
self.len = Signal(ptr_width) # Amount of valid data in the buffer
|
|
self.clear = Signal() # Strobe to clear memory
|
|
self.rd_addr = Signal(ptr_width)
|
|
self.rd_data = Signal(width)
|
|
|
|
self.wr_data = Signal(width)
|
|
self.wr_ready = Signal() # Output, signals buffer is ready to be written to
|
|
self.wr_valid = Signal() # Input, high when data is present to be written
|
|
|
|
wr_ptr = Signal(ptr_width)
|
|
rd_ptr = Signal(ptr_width)
|
|
empty = Signal(reset=1) # Extra signal to distinguish between full and empty condition
|
|
|
|
# Hook write input signals to memory
|
|
wr_port = storage.get_port(write_capable=True)
|
|
# Always ready to write data into memory, so hook these signals straight in
|
|
self.comb += [
|
|
wr_port.adr.eq(wr_ptr),
|
|
wr_port.dat_w.eq(self.wr_data),
|
|
wr_port.we.eq(self.wr_valid),
|
|
self.wr_ready.eq(1), # We are always ready to write data in
|
|
]
|
|
|
|
# Advance write (and potentially read)
|
|
self.sync += [
|
|
If(self.wr_valid,
|
|
# We aren't empty anymore, and we won't be until we are cleared
|
|
empty.eq(0),
|
|
|
|
# Advance write pointer
|
|
If(wr_ptr < (depth - 1),
|
|
wr_ptr.eq(wr_ptr + 1))
|
|
.Else(wr_ptr.eq(0)),
|
|
|
|
# Advance read pointer if we are full (e.g. overwrite old data)
|
|
If(~empty & (wr_ptr == rd_ptr),
|
|
If(rd_ptr < (depth - 1),
|
|
rd_ptr.eq(rd_ptr + 1))
|
|
.Else(rd_ptr.eq(0))
|
|
)
|
|
)
|
|
]
|
|
|
|
# TODO should I actually set async_read?
|
|
rd_port = storage.get_port(async_read=True)
|
|
# Set read addr so 0 starts at rd_ptr and wraps around, and connect read data up
|
|
self.comb += [
|
|
If(self.rd_addr + rd_ptr < depth,
|
|
rd_port.adr.eq(self.rd_addr + rd_ptr))
|
|
.Else(
|
|
rd_port.adr.eq(self.rd_addr - (depth - rd_ptr))
|
|
),
|
|
self.rd_data.eq(rd_port.dat_r),
|
|
]
|
|
|
|
# Export the length present
|
|
self.comb += [
|
|
If(empty, self.len.eq(0))
|
|
.Else(
|
|
If(wr_ptr > rd_ptr,
|
|
self.len.eq(wr_ptr - rd_ptr))
|
|
.Elif(wr_ptr != rd_ptr,
|
|
self.len.eq(depth - (rd_ptr - wr_ptr)))
|
|
.Else(
|
|
self.len.eq(depth)
|
|
)
|
|
),
|
|
]
|
|
|
|
# "Clear" out memory if clear is strobed
|
|
# NOTE really clear should be hooked into reset, but I'm not clear on how to do that.
|
|
# Technically there's some glitches that can happen here if we write data while clear
|
|
# is asserted, but that shouldn't happen and it's fine if it does tbh.
|
|
self.sync += If(self.clear,
|
|
wr_ptr.eq(0), rd_ptr.eq(0), empty.eq(1))
|
|
|
|
|
|
from migen.genlib.cdc import PulseSynchronizer
|
|
|
|
|
|
class Sampler(Module):
|
|
def __init__(self, adc_pins: Record, sampler_clock: Signal):
|
|
# TODO correct addr width
|
|
self.bus = Interface(data_width=32, adr_width=11)
|
|
|
|
# self.clock_domains.foo = ClockDomain() is how to add a new clock domain, accessible at self.foo
|
|
# Connect sampler clock domain
|
|
self.clock_domains.sample_clock = ClockDomain("sample_clock")
|
|
self.comb += self.sample_clock.clk.eq(sampler_clock)
|
|
|
|
# Hook up ADC REFCLK to sample_clock
|
|
self.comb += adc_pins.refclk.eq(sampler_clock)
|
|
|
|
# We can synchronize to the sampler clock, whenever it goes high we can
|
|
# strobe a single valid signal
|
|
synchronizer = PulseSynchronizer("sample_clock", "sys")
|
|
self.submodules += synchronizer
|
|
|
|
self.valid = Signal()
|
|
self.data = Signal(10)
|
|
|
|
self.comb += [
|
|
synchronizer.i.eq(self.sample_clock.clk),
|
|
self.valid.eq(synchronizer.o),
|
|
self.data.eq(adc_pins.data),
|
|
]
|
|
|
|
# Set config pins to constant values
|
|
self.comb += adc_pins.oen_b.eq(0) # Data pins enable
|
|
self.comb += adc_pins.standby.eq(0) # Sampling standby
|
|
self.comb += adc_pins.dfs.eq(0) # DFS (raw or two's complement)
|
|
# The only remaining pin, OTR, is an out of range status indicator
|
|
|
|
# Read directly from the data pins into the wishbone bus for now, just for bringup
|
|
self.sync += If(self.valid, self.bus.dat_r.eq(adc_pins.data))
|
|
self.sync += self.bus.ack.eq(0)
|
|
self.sync += If(self.bus.cyc & self.bus.stb, self.bus.ack.eq(1))
|
|
|
|
|
|
def fifo_testbench():
|
|
dut = CircularBuffer(9, 24)
|
|
def test_fn():
|
|
assert (yield dut.len) == 0
|
|
assert (yield dut.wr_ready) == 1
|
|
|
|
# Clock some data in, check len
|
|
data = [0xDE, 0xAD, 0xBE, 0xEF]
|
|
for b in data:
|
|
(yield dut.wr_data.eq(b))
|
|
(yield dut.wr_valid.eq(1))
|
|
yield
|
|
|
|
# Stop clocking data in
|
|
(yield dut.wr_valid.eq(0))
|
|
# Tick again because setting a value waits until the next clock...
|
|
yield
|
|
|
|
fifo_len = (yield dut.len)
|
|
assert fifo_len == 4, f"len should be 4, is {fifo_len}"
|
|
|
|
# Reset
|
|
(yield dut.clear.eq(1))
|
|
yield
|
|
(yield dut.clear.eq(0))
|
|
yield
|
|
|
|
# Len should be cleared
|
|
assert (yield dut.len) == 0
|
|
|
|
# Clock more data in than capacity, check that we can read out
|
|
# the expected data
|
|
data = [r for r in range(32)] # Yes yes I could use a generator but I want to slice it later
|
|
for b in data:
|
|
(yield dut.wr_data.eq(b))
|
|
(yield dut.wr_valid.eq(1))
|
|
yield
|
|
|
|
# One more clock
|
|
(yield dut.wr_valid.eq(0))
|
|
yield
|
|
|
|
data_len = (yield dut.len)
|
|
assert data_len == 24, f"len should be 24, is {data_len}"
|
|
out_data = []
|
|
for i in range(24):
|
|
(yield dut.rd_addr.eq(i))
|
|
yield
|
|
|
|
out_data.append((yield dut.rd_data))
|
|
|
|
assert out_data[i] == data[i + 8], f"Data mismatch at index {i}, should be {data[i+8]}, is {out_data[i]}"
|
|
|
|
# At this point, everything seems to be good, so I'm leaving more exhaustive testing
|
|
|
|
run_simulation(dut, test_fn())
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
args = argparse.ArgumentParser()
|
|
args.add_argument("--fifo", action="store_true", help="Run FIFO tests")
|
|
args = args.parse_args()
|
|
|
|
if args.fifo:
|
|
fifo_testbench()
|