from migen import * from litex.soc.interconnect.wishbone import * from math import log2, ceil class CircularBuffer(Module): """ Circular buffer implementation that allows users to read the entire data. Assumptions: - Reading values while writes are ocurring does not need to have well-defined behaviour Implementation is largely based on Migen SyncFIFO, just tweaked to operate how I want """ def __init__(self, width: int, depth: int, with_wb = True) -> None: storage = Memory(width=width, depth=depth) self.specials += storage ptr_width = ceil(log2(depth)) # External Signals self.len = Signal(ptr_width) # Amount of valid data in the buffer self.clear = Signal() # Strobe to clear memory self.rd_addr = Signal(ptr_width) self.rd_data = Signal(width) self.wr_data = Signal(width) self.wr_ready = Signal() # Output, signals buffer is ready to be written to self.wr_valid = Signal() # Input, high when data is present to be written wr_ptr = Signal(ptr_width) rd_ptr = Signal(ptr_width) empty = Signal(reset=1) # Extra signal to distinguish between full and empty condition # Hook write input signals to memory wr_port = storage.get_port(write_capable=True) # Always ready to write data into memory, so hook these signals straight in self.comb += [ wr_port.adr.eq(wr_ptr), wr_port.dat_w.eq(self.wr_data), wr_port.we.eq(self.wr_valid), self.wr_ready.eq(1), # We are always ready to write data in ] # Advance write (and potentially read) self.sync += [ If(self.wr_valid, # We aren't empty anymore, and we won't be until we are cleared empty.eq(0), # Advance write pointer If(wr_ptr < (depth - 1), wr_ptr.eq(wr_ptr + 1)) .Else(wr_ptr.eq(0)), # Advance read pointer if we are full (e.g. overwrite old data) If(~empty & (wr_ptr == rd_ptr), If(rd_ptr < (depth - 1), rd_ptr.eq(rd_ptr + 1)) .Else(rd_ptr.eq(0)) ) ) ] # TODO should I actually set async_read? rd_port = storage.get_port(async_read=True) # Set read addr so 0 starts at rd_ptr and wraps around, and connect read data up self.comb += [ If(self.rd_addr + rd_ptr < depth, rd_port.adr.eq(self.rd_addr + rd_ptr)) .Else( rd_port.adr.eq(self.rd_addr - (depth - rd_ptr)) ), self.rd_data.eq(rd_port.dat_r), ] # Export the length present self.comb += [ If(empty, self.len.eq(0)) .Else( If(wr_ptr > rd_ptr, self.len.eq(wr_ptr - rd_ptr)) .Elif(wr_ptr != rd_ptr, self.len.eq(depth - (rd_ptr - wr_ptr))) .Else( self.len.eq(depth) ) ), ] # "Clear" out memory if clear is strobed # NOTE really clear should be hooked into reset, but I'm not clear on how to do that. # Technically there's some glitches that can happen here if we write data while clear # is asserted, but that shouldn't happen and it's fine if it does tbh. self.sync += If(self.clear, wr_ptr.eq(0), rd_ptr.eq(0), empty.eq(1)) # Add wishbone bus to access data if with_wb: self.bus = Interface(data_width=32, adr_width=ceil(log2(depth))) self.comb += self.rd_addr.eq(self.bus.adr) self.sync += [ self.bus.ack.eq(0), self.bus.dat_r.eq(0), If(~self.bus.we & self.bus.cyc & self.bus.stb, self.bus.ack.eq(1), self.bus.dat_r.eq(self.rd_data)), ] def testbench(): dut = CircularBuffer(9, 24) def test_fn(): assert (yield dut.len) == 0 assert (yield dut.wr_ready) == 1 # Clock some data in, check len data = [0xDE, 0xAD, 0xBE, 0xEF] for b in data: (yield dut.wr_data.eq(b)) (yield dut.wr_valid.eq(1)) yield # Stop clocking data in (yield dut.wr_valid.eq(0)) # Tick again because setting a value waits until the next clock... yield fifo_len = (yield dut.len) assert fifo_len == 4, f"len should be 4, is {fifo_len}" # Reset (yield dut.clear.eq(1)) yield (yield dut.clear.eq(0)) yield # Len should be cleared assert (yield dut.len) == 0 # Clock more data in than capacity, check that we can read out # the expected data data = [r for r in range(32)] # Yes yes I could use a generator but I want to slice it later for b in data: (yield dut.wr_data.eq(b)) (yield dut.wr_valid.eq(1)) yield # One more clock (yield dut.wr_valid.eq(0)) yield data_len = (yield dut.len) assert data_len == 24, f"len should be 24, is {data_len}" out_data = [] for i in range(24): (yield dut.rd_addr.eq(i)) yield out_data.append((yield dut.rd_data)) assert out_data[i] == data[i + 8], f"Data mismatch at index {i}, should be {data[i+8]}, is {out_data[i]}" # At this point, everything seems to be good, so I'm leaving more exhaustive testing run_simulation(dut, test_fn())