gw: start writing circular buffer

This commit is contained in:
David Lenfesty 2023-04-23 19:39:07 -06:00
parent 05fed9e28e
commit dc762bca8d

View File

@ -1,7 +1,113 @@
from migen import *
from litex.soc.interconnect.wishbone import *
from litex.soc.integration.soc import SoCRegion
from math import log2, ceil
"""
Random implementation notes:
- Circular buffers can keep overwriting. We only need a setting to say how many samples to save after
trigger occurs.
- Data valid from samplers to FIFOs can simply be gated via the enable signal. Everything can just run
all the time to keep things simple
- can we correct clock skew on the sample clock via Lattice primitives? I think it's possible. I doubt it
matters. Would need significant calibration effort to even have it be accurate.
"""
class CircularBuffer(Module):
"""
Circular buffer implementation that allows users to read the entire data.
Assumptions:
- Reading values while writes are ocurring does not need to have well-defined behaviour
Implementation is largely based on Migen SyncFIFO, just tweaked to operate how I want
"""
def __init__(self, width: int, depth: int) -> None:
storage = Memory(width=width, depth=depth)
self.specials += storage
ptr_width = ceil(log2(depth))
# External Signals
self.len = Signal(ptr_width) # Amount of valid data in the buffer
self.clear = Signal() # Strobe to clear memory
self.rd_addr = Signal(ptr_width)
self.rd_data = Signal(width)
self.wr_data = Signal(width)
self.wr_ready = Signal() # Output, signals buffer is ready to be written to
self.wr_valid = Signal() # Input, high when data is present to be written
wr_ptr = Signal(ptr_width)
rd_ptr = Signal(ptr_width)
empty = Signal(reset=1) # Extra signal to distinguish between full and empty condition
# Hook write input signals to memory
wr_port = storage.get_port(write_capable=True)
# Always ready to write data into memory, so hook these signals straight in
self.comb += [
wr_port.adr.eq(wr_ptr),
wr_port.dat_w.eq(self.wr_data),
wr_port.we.eq(self.wr_valid),
self.wr_ready.eq(1), # We are always ready to write data in
]
# Advance write (and potentially read)
self.sync += [
If(self.wr_valid,
# We aren't empty anymore, and we won't be until we are cleared
empty.eq(0),
# Advance write pointer
If(wr_ptr < depth,
wr_ptr.eq(wr_ptr + 1))
.Else(wr_ptr.eq(0)),
# Advance read pointer if we are full (e.g. overwrite old data)
If(~empty & wr_ptr == rd_ptr,
If(rd_ptr < depth,
rd_ptr.eq(rd_ptr + 1))
.Else(rd_ptr.eq(0))
)
)
]
# TODO should I actually set async_read?
rd_port = storage.get_port(async_read=True)
# Set read addr so 0 starts at rd_ptr and wraps around, and connect read data up
self.comb += [
If(self.rd_addr + rd_ptr < depth,
rd_port.adr.eq(self.rd_addr + rd_ptr))
.Else(
rd_port.adr.eq(self.rd_addr - (depth - rd_ptr))
),
self.rd_data.eq(rd_port.dat_r),
]
# Export the length present
self.comb += [
If(empty, self.len.eq(0))
.Else(
If(wr_ptr >= rd_ptr,
self.len.eq(wr_ptr - rd_ptr))
.Elif(wr_ptr != rd_ptr,
self.len.eq(depth - (rd_ptr - wr_ptr)))
.Else(
self.len.eq(depth)
)
),
]
# "Clear" out memory if clear is strobed
# NOTE really clear should be hooked into reset, but I'm not clear on how to do that.
# Technically there's some glitches that can happen here if we write data while clear
# is asserted, but that shouldn't happen and it's fine if it does tbh.
self.sync += If(self.clear,
wr_ptr.eq(0), rd_ptr.eq(0), empty.eq(0))
class Sampler(Module):
def __init__(self, adc_pins):
@ -26,3 +132,63 @@ class Sampler(Module):
self.sync += self.bus.ack.eq(0)
self.sync += If(self.bus.cyc & self.bus.stb, self.bus.ack.eq(1))
def fifo_testbench():
dut = CircularBuffer(9, 24)
def test_fn():
assert (yield dut.len) == 0
assert (yield dut.wr_ready) == 1
# Clock some data in, check len
data = [0xDE, 0xAD, 0xBE, 0xEF]
for b in data:
(yield dut.wr_data.eq(b))
(yield dut.wr_valid.eq(1))
yield
fifo_len = (yield dut.len)
assert fifo_len == 4, f"len should be 4, is {fifo_len}"
# Stop clocking data in
(yield dut.wr_valid.eq(0))
# Reset
(yield dut.clear.eq(1))
yield
(yield dut.cleart.eq(0))
# Len should be cleared
assert (yield dut.len) == 0
# Clock more data in than capacity, check that we can read out
# the expected data
data = [r for r in range(32)] # Yes yes I could use a generator but I want to slice it later
for b in data:
(yield dut.wr_data.eq(b))
(yield dut.wr_valid.eq(1))
yield
data_len = (yield dut.len)
assert data_len == 24
out_data = []
for i in range(24):
(yield dut.rd_addr.eq(i))
out_data.append((yield dut.rd_data))
assert out_data[i] == data[i + 8], f"Data mismatch at index {i}"
# At this point, seems to be working
run_simulation(dut, test_fn())
if __name__ == "__main__":
import argparse
args = argparse.ArgumentParser()
args.add_argument("--fifo", action="store_true", help="Run FIFO tests")
args = args.parse_args()
if args.fifo:
fifo_testbench()