diff --git a/gateware/litex_main.py b/gateware/litex_main.py index 0e3c80f..52b13e9 100755 --- a/gateware/litex_main.py +++ b/gateware/litex_main.py @@ -10,7 +10,6 @@ from migen import * from litex.build.io import DDROutput -#from litex_boards.platforms import colorlight_i5 from platforms import sonar as colorlight_i5 from litex.build.lattice.trellis import trellis_args, trellis_argdict @@ -31,18 +30,20 @@ from liteeth.phy.ecp5rgmii import LiteEthPHYRGMII from sampler import Sampler from litex.soc.integration.soc import SoCRegion +from test import run_test, TestResult + # CRG ---------------------------------------------------------------------------------------------- class _CRG(Module): def __init__(self, platform, sys_clk_freq, use_internal_osc=False, with_usb_pll=False, with_video_pll=False, sdram_rate="1:1"): self.rst = Signal() - self.clock_domains.cd_sys = ClockDomain() + self.clock_domains.cd_sys = ClockDomain("sys") self.clock_domains.cd_sample_clock = ClockDomain("sample_clock") if sdram_rate == "1:2": self.clock_domains.cd_sys2x = ClockDomain() self.clock_domains.cd_sys2x_ps = ClockDomain() else: - self.clock_domains.cd_sys_ps = ClockDomain() + self.clock_domains.cd_sys_ps = ClockDomain("sys_ps") # # # @@ -145,8 +146,8 @@ class BaseSoC(SoCCore): self.submodules.sampler = Sampler(platform.request("adc"), self.crg.cd_sample_clock.clk) sampler_region = SoCRegion(origin=None, size=0x1000, cached=False) #self.add_wb_slave(0x9000_0000, self.sampler.bus, 0x1000) - # TODO better way to do this? - self.bus.add_slave(name="sampler", slave=self.sampler.bus, region=sampler_region) + ## TODO better way to do this? + #self.bus.add_slave(name="sampler", slave=self.sampler.bus, region=sampler_region) # Build -------------------------------------------------------------------------------------------- @@ -166,11 +167,31 @@ def main(): viopts = target_group.add_mutually_exclusive_group() viopts.add_argument("--with-video-terminal", action="store_true", help="Enable Video Terminal (HDMI).") viopts.add_argument("--with-video-framebuffer", action="store_true", help="Enable Video Framebuffer (HDMI).") + testopts = parser.add_argument_group(title="Testing Options") + testopts.add_argument("--test", action="store_true", help="Run tests, won't do anything else") builder_args(parser) soc_core_args(parser) trellis_args(parser) args = parser.parse_args() + # Run tests first + if args.test: + from sampler import circular_buffer + from sampler import controller + + results = [] + results.append(run_test("CircularBuffer", circular_buffer.testbench)) + results.append(run_test("SamplerController", controller.test_bus_access)) + + passed = sum((1 for result in results if result.result == TestResult.PASS)) + failed = sum((1 for result in results if result.result == TestResult.FAIL)) + skipped = sum((1 for result in results if result.result == TestResult.SKIP)) + + print(f"{passed}/{passed + failed} passed ({skipped} skipped)") + + # TODO maybe don't do this? + return + # Build firmware import subprocess as sp sp.run(["./build_and_strip.sh"], cwd="../firmware").check_returncode() diff --git a/gateware/platforms/__init__.py b/gateware/platforms/__init__.py index 30c3f9a..e69de29 100644 --- a/gateware/platforms/__init__.py +++ b/gateware/platforms/__init__.py @@ -1 +0,0 @@ -from .colorlight_i9 import * diff --git a/gateware/platforms/colorlight_i9.py b/gateware/platforms/colorlight_i9.py deleted file mode 100644 index 5b15405..0000000 --- a/gateware/platforms/colorlight_i9.py +++ /dev/null @@ -1,136 +0,0 @@ -import os -import subprocess - -from amaranth.build import * -from amaranth.vendor.lattice_ecp5 import * -from amaranth_boards.resources import * - - -__all__ = ["Colorlight_i9_Platform"] - - -class Colorlight_i9_Platform(LatticeECP5Platform): - device = "LFE5U-45F" - package = "BG381" - speed = "6" - default_clk = "clk25" - - resources = [ - Resource("clk25", 0, Pins("P3", dir="i"), Clock(25e6), Attrs(IO_TYPE="LVCMOS33")), - - *LEDResources(pins="L2", invert = True, - attrs=Attrs(IO_TYPE="LVCMOS33", DRIVE="4")), - - #*ButtonResources(pins="M13", invert = True, - # attrs=Attrs(IO_TYPE="LVCMOS33", PULLMODE="UP")), - - UARTResource(0, - tx="E17", - rx="D18", - attrs=Attrs(IO_TYPE="LVCMOS33") - ), - UARTResource(1, - tx="P16", - rx="L5", - attrs=Attrs(IO_TYPE="LVCMOS33") - ), - UARTResource(2, - tx="J18", - rx="J16", - attrs=Attrs(IO_TYPE="LVCMOS33") - ), - - # SPIFlash (W25Q32JV) 1x/2x/4x speed - Resource("spi_flash", 0, - Subsignal("cs", PinsN("R2", dir="o")), - # Subsignal("clk", Pins("", dir="i")), # driven through USRMCLK - Subsignal("cipo", Pins("V2", dir="i")), # Chip: DI/IO0 - Subsignal("copi", Pins("W2", dir="o")), # DO/IO1 - Attrs(IO_TYPE="LVCMOS33") - ), - - # 2x ESMT M12L16161A-5T 1M x 16bit 200MHz SDRAMs (organized as 1M x 32bit) - # 2x WinBond W9816G6JH-6 1M x 16bit 166MHz SDRAMs (organized as 1M x 32bit) are lso reported - SDRAMResource(0, - clk="B9", we_n="A10", cas_n="A9", ras_n="B10", - ba="B11 C8", a="B13 C14 A16 A17 B16 B15 A14 A13 A12 A11 B12", - dq="D15 E14 E13 D12 E12 D11 C10 B17 B8 A8 C7 A7 A6 B6 A5 B5 " - "D5 C5 D6 C6 E7 D7 E8 D8 E9 D9 E11 C11 C12 D13 D14 C15", - attrs=Attrs(PULLMODE="NONE", DRIVE="4", SLEWRATE="FAST", IO_TYPE="LVCMOS33") - ), - - # Broadcom B50612D Gigabit Ethernet Transceiver - Resource("eth_rgmii", 0, - Subsignal("rst", Pins("P4", dir="o")), - Subsignal("mdc", Pins("N5", dir="o")), - Subsignal("mdio", Pins("P5", dir="io")), - Subsignal("tx_clk", Pins("U19", dir="i")), - Subsignal("tx_ctl", Pins("P19", dir="o")), - Subsignal("tx_data", Pins("U20 T19 T20 R20", dir="o")), - Subsignal("rx_clk", Pins("L19", dir="i")), - Subsignal("rx_ctl", Pins("M20", dir="i")), - Subsignal("rx_data", Pins("P20 N19 N20 M19", dir="i")), - Attrs(IO_TYPE="LVCMOS33") - ), - - # Broadcom B50612D Gigabit Ethernet Transceiver - Resource("eth_rgmii", 1, - Subsignal("rst", Pins("P4", dir="o")), - Subsignal("mdc", Pins("N5", dir="o")), - Subsignal("mdio", Pins("P5", dir="io")), - Subsignal("tx_clk", Pins("G1", dir="o")), - Subsignal("tx_ctl", Pins("K1", dir="o")), - Subsignal("tx_data", Pins("G2 H1 J1 J3", dir="o")), - Subsignal("rx_clk", Pins("H2", dir="i")), - Subsignal("rx_ctl", Pins("P2", dir="i")), - Subsignal("rx_data", Pins("K2 L1 N1 P1", dir="i")), - Attrs(IO_TYPE="LVCMOS33") - ), - - Resource("jtag", 0, - Subsignal("trst", Pins("J17", dir="i")), - Subsignal("tck", Pins("G18", dir="i")), - Subsignal("tms", Pins("H16", dir="i")), - Subsignal("tdo", Pins("H17", dir="o")), - Subsignal("tdi", Pins("H18", dir="i")), - Attrs(IO_TYPE="LVCMOS33") - ), - - Resource("i2c", 0, - Subsignal("sda", Pins("D16", dir="io")), - Subsignal("scl", Pins("F5", dir="io")), # Hacky stuff for now, amlib needs it to be io for some reason - Attrs(IO_TYPE="LVCMOS33") - ), - ] - connectors = [] - # Connector("j", 1, "F3 F1 G3 - G2 H3 H5 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 2, "J4 K3 G1 - K4 C2 E3 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 3, "H4 K5 P1 - R1 L5 F2 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 4, "P4 R2 M8 - M9 T6 R6 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 5, "M11 N11 P12 - K15 N12 L16 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 6, "K16 J15 J16 - J12 H15 G16 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 7, "H13 J13 H12 - G14 H14 G15 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 8, "A15 F16 A14 - E13 B14 A13 F15 L2 K1 J5 K2 B16 J14 F12 -"), - # Connector("j", 19, " - M13 - - P11"), - #] - - @property - def required_tools(self): - return super().required_tools + [ - "ecpdap" - ] - - def toolchain_prepare(self, fragment, name, **kwargs): - overrides = dict(ecppack_opts="--compress") - overrides.update(kwargs) - return super().toolchain_prepare(fragment, name, **overrides) - - def toolchain_program(self, products, name): - tool = os.environ.get("ECPDAP", "ecpdap") - with products.extract("{}.bit".format(name)) as bitstream_filename: - subprocess.check_call([tool, "program", bitstream_filename, "--freq", "10M"]) - - -if __name__ == "__main__": - from .test.blinky import * - Colorlight_i9_Platform().build(Blinky(), do_program=True) diff --git a/gateware/sampler.py b/gateware/sampler.py deleted file mode 100644 index a27cca5..0000000 --- a/gateware/sampler.py +++ /dev/null @@ -1,661 +0,0 @@ -from migen import * - -from litex.soc.interconnect.wishbone import * - -from math import log2, ceil -from typing import List - -""" -Random implementation notes: - -- Circular buffers can keep overwriting. We only need a setting to say how many samples to save after - trigger occurs. -- Data valid from samplers to FIFOs can simply be gated via the enable signal. Everything can just run - all the time to keep things simple -- can we correct clock skew on the sample clock via Lattice primitives? I think it's possible. I doubt it - matters. Would need significant calibration effort to even have it be accurate. -- Trigger system should wait a couple clocks after trigger acquired to disable FIFOs, just in case the - CDC sync happens a bit late for some ADC channels - -Configurable parameters: -- trigger_run_len: number of samples to acquire after triggered sample (can technically be arbitrarily -large, circular buffer handles data loss, should be larger than trigger_thresh_time to make sure buffers -don't get weird) -- trigger_thresh_value: minimum peak to peak value to consider triggered -- trigger_thresh_time: minimum num samples that peak must be above threshold to count as a trigger - (trigger sample number is the first sample above the threshold value) (must be >= 1) -- trigger_decay_value: decay value to subtract from peak values to potentially reduce false triggers -- trigger_decay_period: number of samples per decay application - - -Implementation of trigger (psuedocode), happens every sample update: - -if triggered: - if num_samples + 1 >= trigger_run: - disable_trigger() - return - - num_samples += 1 - return - -if sample > max: - max = sample -elif sample < min: - min = sample - -if (max - min) > trigger_thresh_value: - if triggered_for + 1 >= trigger_thresh_time: - triggered = True - num_samples = 0 - return - - triggered_for += 1 - decay_wait = 0 -else: - triggered_for = 0 - decay_wait += 1 - - if trigger_decay_period == 0 or decay_wait == trigger_thresh_time: - decay_wait = 0 - - if (max - trigger_decay_value) > (min + trigger_decay_value): - max -= trigger_decay_value - min += trigger_decay_value -""" - -class CircularBuffer(Module): - """ - Circular buffer implementation that allows users to read the entire data. - - Assumptions: - - Reading values while writes are ocurring does not need to have well-defined behaviour - - Implementation is largely based on Migen SyncFIFO, just tweaked to operate how I want - """ - - def __init__(self, width: int, depth: int, with_wb = True) -> None: - storage = Memory(width=width, depth=depth) - self.specials += storage - - ptr_width = ceil(log2(depth)) - - # External Signals - self.len = Signal(ptr_width) # Amount of valid data in the buffer - self.clear = Signal() # Strobe to clear memory - self.rd_addr = Signal(ptr_width) - self.rd_data = Signal(width) - - self.wr_data = Signal(width) - self.wr_ready = Signal() # Output, signals buffer is ready to be written to - self.wr_valid = Signal() # Input, high when data is present to be written - - wr_ptr = Signal(ptr_width) - rd_ptr = Signal(ptr_width) - empty = Signal(reset=1) # Extra signal to distinguish between full and empty condition - - # Hook write input signals to memory - wr_port = storage.get_port(write_capable=True) - # Always ready to write data into memory, so hook these signals straight in - self.comb += [ - wr_port.adr.eq(wr_ptr), - wr_port.dat_w.eq(self.wr_data), - wr_port.we.eq(self.wr_valid), - self.wr_ready.eq(1), # We are always ready to write data in - ] - - # Advance write (and potentially read) - self.sync += [ - If(self.wr_valid, - # We aren't empty anymore, and we won't be until we are cleared - empty.eq(0), - - # Advance write pointer - If(wr_ptr < (depth - 1), - wr_ptr.eq(wr_ptr + 1)) - .Else(wr_ptr.eq(0)), - - # Advance read pointer if we are full (e.g. overwrite old data) - If(~empty & (wr_ptr == rd_ptr), - If(rd_ptr < (depth - 1), - rd_ptr.eq(rd_ptr + 1)) - .Else(rd_ptr.eq(0)) - ) - ) - ] - - # TODO should I actually set async_read? - rd_port = storage.get_port(async_read=True) - # Set read addr so 0 starts at rd_ptr and wraps around, and connect read data up - self.comb += [ - If(self.rd_addr + rd_ptr < depth, - rd_port.adr.eq(self.rd_addr + rd_ptr)) - .Else( - rd_port.adr.eq(self.rd_addr - (depth - rd_ptr)) - ), - self.rd_data.eq(rd_port.dat_r), - ] - - # Export the length present - self.comb += [ - If(empty, self.len.eq(0)) - .Else( - If(wr_ptr > rd_ptr, - self.len.eq(wr_ptr - rd_ptr)) - .Elif(wr_ptr != rd_ptr, - self.len.eq(depth - (rd_ptr - wr_ptr))) - .Else( - self.len.eq(depth) - ) - ), - ] - - # "Clear" out memory if clear is strobed - # NOTE really clear should be hooked into reset, but I'm not clear on how to do that. - # Technically there's some glitches that can happen here if we write data while clear - # is asserted, but that shouldn't happen and it's fine if it does tbh. - self.sync += If(self.clear, - wr_ptr.eq(0), rd_ptr.eq(0), empty.eq(1)) - - # Add wishbone bus to access data - if with_wb: - self.bus = Interface(data_width=32, adr_width=ceil(log2(depth))) - - self.comb += self.rd_addr.eq(self.bus.adr) - self.sync += [ - self.bus.ack.eq(0), - self.bus.dat_r.eq(0), - If(~self.bus.we & self.bus.cyc & self.bus.stb, - self.bus.ack.eq(1), self.bus.dat_r.eq(self.rd_data)), - ] - - -from migen.genlib.cdc import PulseSynchronizer - - -class Sampler(Module): - def __init__(self, adc_pins: Record, sampler_clock: Signal): - # TODO remove bus - self.bus = Interface(data_width=32, adr_width=11) - - # self.clock_domains.foo = ClockDomain() is how to add a new clock domain, accessible at self.foo - # Connect sampler clock domain - self.clock_domains.sample_clock = ClockDomain("sample_clock") - self.comb += self.sample_clock.clk.eq(sampler_clock) - - # Hook up ADC REFCLK to sample_clock - self.comb += adc_pins.refclk.eq(sampler_clock) - - # We can synchronize to the sampler clock, whenever it goes high we can - # strobe a single valid signal - synchronizer = PulseSynchronizer("sample_clock", "sys") - self.submodules += synchronizer - - self.valid = Signal() - self.data = Signal(10) - - self.comb += [ - synchronizer.i.eq(self.sample_clock.clk), - self.valid.eq(synchronizer.o), - self.data.eq(adc_pins.data), - ] - - # Set config pins to constant values - self.comb += adc_pins.oen_b.eq(0) # Data pins enable - self.comb += adc_pins.standby.eq(0) # Sampling standby - self.comb += adc_pins.dfs.eq(0) # DFS (raw or two's complement) - # The only remaining pin, OTR, is an out of range status indicator - - # Read directly from the data pins into the wishbone bus for now, just for bringup - self.sync += If(self.valid, self.bus.dat_r.eq(adc_pins.data)) - self.sync += self.bus.ack.eq(0) - self.sync += If(self.bus.cyc & self.bus.stb, self.bus.ack.eq(1)) - - -class PeakDetector(Module): - """ - Module to detect when peak to peak voltage is high enough to consider incoming - data to be a valid ping. Configuration is provided by setting the configuration - attributes. Do not change these settings while detector is running. - - Attributes - ---------- - data: (input) - Data signal to use for detection - - data_valid: (input) - Strobed signal that indicates value on `data` is valid to be read - - enable: (input) - Enables running peak detection. De-asserting this will clear all state variables - - triggered: (output) - Signal that indicates peak has been triggered. Only cleared once enable is de-asserted again - - Configuration Attributes - ------------------------ - thresh_value: - Minimum peak to peak value considered triggered - - thresh_time: - Number of consecutive samples above threshold required to consider triggered - - decay_value: - Decay value to subtract from peak values to prevent false triggers - - decay_period: - Number of samples between each application of decay - """ - - def __init__(self, data_width: int): - # Create all state signals - min_val = Signal(data_width) - max_val = Signal(data_width) - diff = Signal(data_width) - triggered_time = Signal(32) - decay_counter = Signal(32) - - # Control signals - self.data = Signal(data_width) - self.data_valid = Signal() - self.enable = Signal() - self.triggered = Signal() - - # Configuration Parameters - self.thresh_value = Signal(data_width) - self.thresh_time = Signal(32) - self.decay_value = Signal(data_width) - self.decay_period = Signal(32) - - self.sync += If(~self.enable, - # Reset halfway. ADCs are 0-2V, and everything should be centered at 1V, so this is approximating the initial value - min_val.eq(int(2**data_width /2)), - max_val.eq(int(2**data_width /2)), - self.triggered.eq(0), - decay_counter.eq(0), - triggered_time.eq(0), - ) - - # Constantly updating diff to simplify some statements - self.comb += diff.eq(max_val - min_val) - - self.sync += If(self.enable & self.data_valid, - # Update maximum value - If(self.data > max_val, max_val.eq(self.data)), - # Update minimum value - If(self.data < min_val, min_val.eq(self.data)), - If(diff > self.thresh_value, - # We have met the threshold for triggering, start counting - triggered_time.eq(triggered_time + 1), - decay_counter.eq(0), - - # We have triggered, so we can set the output. After this point, - # nothing we do matters until enable is de-asserted and we reset - # triggered. - If(triggered_time + 1 >= self.thresh_time, self.triggered.eq(1))) - .Else( - # We have not met the threshold, reset timer and handle decay - triggered_time.eq(0), - decay_counter.eq(decay_counter + 1), - - # Decay threshold has been reached, apply decay to peaks - If(decay_counter >= self.decay_period, - decay_counter.eq(0), - - # Only apply decay if the values would not overlap - If(diff >= (self.decay_value << 1), - max_val.eq(max_val - self.decay_value), - min_val.eq(min_val + self.decay_value))) - ) - ) - - -class SamplerController(Module): - """ - Sampler control - - Attributes - ---------- - bus: - Slave wishbone bus to be connected to a higher-level bus. Has an address width set according to - the provided buffer length. - - buffers: - List of FIFO buffer objects used to store sample data. - - samplers: - List of sampler objects provided by user. - - Registers - -------- - 0x00: Control Register (RW) - Bit 0 - Begin capture. Resets all FIFOs and starts the peak detector - - 0x01: Status Register (RO) - Bit 0 - Capture complete. Set by peak detection block and cleared by software or when - - 0x02: trigger_run_len (RW) - Number of samples to acquire after triggering sample. - - 0x03: thresh_value (RW) - Minimum peak to peak value considered triggered - - 0x04: thresh_time (RW) - Number of consecutive samples above threshold required to consider triggered - - 0x05: decay_value (RW) - Decay value to subtract from peak values to prevent false triggers - - 0x06: decay_period (RW) - Number of samples between each application of decay - - 0x1xx: BUFFER_LEN_X (RO) - Lenght of data in buffer, up to the number of samplers provided. - - """ - def __init__(self, samplers: List[Sampler], buffer_len): - - self.samplers = samplers - num_channels = len(samplers) - - # Enables reading in samples - sample_enable = Signal() - # Pull in only one CDC sync signal - sample_ready = self.samplers[0].valid - - # Generate buffers for each sampler - self.buffers = [CircularBuffer(9, buffer_len) for _ in range(num_channels)] - - # Connect each buffer to each sampler - for buffer, sampler in zip(self.buffers, self.samplers): - self.comb += [ - # Connect only top 9 bits to memory - buffer.wr_data.eq(sampler.data[1:]), - # Writes enter FIFO only when enabled and every clock cycle - buffer.wr_valid.eq(sample_enable & sample_ready), - ] - - - # Each sampler gets some chunk of memory at least large enough to fit - # all of it's data, so use that as a consistent offset - sample_mem_addr_width = ceil(log2(buffer_len)) - # 1 control block + number of channels used = control bits - control_block_addr_width = ceil(log2(num_channels + 1)) - - # Bus address width - addr_width = control_block_addr_width + sample_mem_addr_width - - # "Master" bus - self.bus = Interface(data_width=32, addr_width=addr_width) - - # Wishbone bus used for mapping control registers - self.control_regs_bus = Interface(data_width=32, addr_width=sample_mem_addr_width) - - slaves = [] - slaves.append((lambda adr: adr[sample_mem_addr_width:] == 0, self.control_regs_bus)) - - for i, buffer in enumerate(self.buffers): - # Connect subordinate buses of buffers to decoder - slaves.append((lambda adr: adr[sample_mem_addr_width:] == i + 1, buffer.bus)) - - adr = (i + 1) << sample_mem_addr_width - print(f"Sampler {i} available at 0x{adr:08x}") - - self.decoder = Decoder(self.bus, slaves) - # TODO how to submodule - self.submodules.decoder = self.decoder - - self.peak_detector = PeakDetector(10) - self.comb += [ - # Simply enable whenever we start capturing - self.peak_detector.enable.eq(sample_enable), - # Connect to the first ADC - self.peak_detector.data.eq(self.samplers[0].data), - # Use the same criteria as the fifo buffer - self.peak_detector.data_valid.eq(sample_enable & sample_ready), - ] - - #### Control register logic - - # Storage - control_register = Signal(32) - status_register = Signal(32) - trigger_run_len = Signal(32) - - def rw_register(storage: Signal, *, read: bool = True, write: bool = True): - if read: - read = self.control_regs_bus.dat_r.eq(storage) - else: - read = self.control_regs_bus.ack.eq(0) - - if write: - write = storage.eq(self.control_regs_bus.dat_w) - else: - write = self.control_regs_bus.ack.eq(0) - - return If(self.control_regs_bus.we, write).Else(read) - - # Handle explicit config registers - cases = { - 0: rw_register(control_register), - 1: rw_register(status_register, write=False), - 2: rw_register(trigger_run_len), - 3: rw_register(self.peak_detector.thresh_value), - 4: rw_register(self.peak_detector.thresh_time), - 5: rw_register(self.peak_detector.decay_value), - 6: rw_register(self.peak_detector.decay_period), - - "default": rw_register(None, read=False, write=False) - } - - # Handle length values for each sample buffer - for i, buffer in enumerate(self.buffers): - cases.update({0x100 + i: rw_register(buffer.len, write=False)}) - - # Connect up control registers bus - self.sync += [ - self.control_regs_bus.ack.eq(0), - If(self.control_regs_bus.cyc & self.control_regs_bus.stb, - self.control_regs_bus.ack.eq(1), - Case(self.control_regs_bus.adr, cases)), - ] - - # Handle the control logic - post_trigger_count = Signal(32) - self.sync += [ - # Reset state whenever sampling is disabled - If(~sample_enable, post_trigger_count.eq(0)), - - # Reset triggering status if we have started sampling - # (peak_detector.triggered resets if sample_enable is de-asserted, so - # this is a reliable reset mechanism) - If(sample_enable & ~self.peak_detector.triggered, - status_register[0].eq(0)), - - # Keep sampling past the trigger for the configured number of samples - If(self.peak_detector.triggered & sample_enable & sample_ready, - post_trigger_count.eq(post_trigger_count + 1), - - # We have sampled enough, update status and stop sampling - If(post_trigger_count + 1 >= trigger_run_len, - status_register[0].eq(1), - control_register[0].eq(0))), - ] - - # Update register storage - self.comb += [ - sample_enable.eq(control_register[0]), - ] - - -def fifo_testbench(): - dut = CircularBuffer(9, 24) - def test_fn(): - assert (yield dut.len) == 0 - assert (yield dut.wr_ready) == 1 - - # Clock some data in, check len - data = [0xDE, 0xAD, 0xBE, 0xEF] - for b in data: - (yield dut.wr_data.eq(b)) - (yield dut.wr_valid.eq(1)) - yield - - # Stop clocking data in - (yield dut.wr_valid.eq(0)) - # Tick again because setting a value waits until the next clock... - yield - - fifo_len = (yield dut.len) - assert fifo_len == 4, f"len should be 4, is {fifo_len}" - - # Reset - (yield dut.clear.eq(1)) - yield - (yield dut.clear.eq(0)) - yield - - # Len should be cleared - assert (yield dut.len) == 0 - - # Clock more data in than capacity, check that we can read out - # the expected data - data = [r for r in range(32)] # Yes yes I could use a generator but I want to slice it later - for b in data: - (yield dut.wr_data.eq(b)) - (yield dut.wr_valid.eq(1)) - yield - - # One more clock - (yield dut.wr_valid.eq(0)) - yield - - data_len = (yield dut.len) - assert data_len == 24, f"len should be 24, is {data_len}" - out_data = [] - for i in range(24): - (yield dut.rd_addr.eq(i)) - yield - - out_data.append((yield dut.rd_data)) - - assert out_data[i] == data[i + 8], f"Data mismatch at index {i}, should be {data[i+8]}, is {out_data[i]}" - - # At this point, everything seems to be good, so I'm leaving more exhaustive testing - - run_simulation(dut, test_fn()) - - -def write_wishbone(bus, address, value): - # Set up bus - (yield bus.adr.eq(address)) - (yield bus.dat_w.eq(value)) - (yield bus.stb.eq(1)) - (yield bus.cyc.eq(1)) - (yield bus.we.eq(1)) - yield - - cycles = 0 - while True: - cycles += 1 - assert cycles < 5, "Write fail" - - - if (yield bus.ack) == 1: - # We received a response, clear out bus status and exit - (yield bus.stb.eq(0)) - (yield bus.cyc.eq(0)) - yield - - break - else: - # Tick until we receive an ACK - yield - - -def read_wishbone(bus, address,): - """Sets up a read transaction. Due to limitations of the simulation method, you have to read - from dat_r, and also tick immediately after calling""" - # Set up bus - (yield bus.adr.eq(address)) - (yield bus.stb.eq(1)) - (yield bus.cyc.eq(1)) - (yield bus.we.eq(0)) - yield - - cycles = 0 - while True: - cycles += 1 - assert cycles < 5, "Write fail" - if (yield bus.ack) == 1: - # We received a response, clear out bus status and exit - (yield bus.stb.eq(0)) - (yield bus.cyc.eq(0)) - - break - else: - # Tick until we receive an ACK - yield - -class MockSampler(Module): - """ - Attributes - ---------- - All Sampler attributes by default, plus the following: - - index: - Index of data to use from provided data - """ - def __init__(self, data: List[int]): - memory = Memory(width=10, depth=len(data), init=data) - - self.index = Signal(ceil(log2(len(data)))) - self.data = Signal(10) - self.valid = Signal() - - read_port = memory.get_port(async_read=True) - self.comb += [ - read_port.adr.eq(self.index), - self.data.eq(read_port.dat_r), - ] - -class TestSoC(Module): - def __init__(self, data): - sampler = MockSampler(data) - self.submodules.sampler = sampler - # TODO multiple mock samplers to test that functionality - self.controller = SamplerController([MockSampler(data)], 1024) - self.submodules.controller = self.controller - self.bus = self.controller.bus - - -def controller_test_bus_access(): - dut = TestSoC([2, 3, 4, 5]) - def test_fn(): - yield from write_wishbone(dut.bus, 2, 0xDEADBEEF) - yield from read_wishbone(dut.bus, 2) - assert (yield dut.bus.dat_r) == 0xDEADBEEF, "Read failed!" - - # TODO test writing to RO register fails - - run_simulation(dut, test_fn(), vcd_name="test_bus_access.vcd") - -# TODO test a couple variations on waveforms: -# Just a clean waveform, should pass normally -# Clean waveform w/ some decay -# Some waveform that decay could make not trigger (i.e. a big spike) -# Clean waveform under threshold -# Test that decay operates normally and settles back down to center value - -if __name__ == "__main__": - import argparse - - args = argparse.ArgumentParser() - args.add_argument("--fifo", action="store_true", help="Run FIFO tests") - args.add_argument("--controller", action="store_true", help="Run sampler tests") - args = args.parse_args() - - if args.fifo: - fifo_testbench() - - if args.controller: - controller_test_bus_access() diff --git a/gateware/sampler/__init__.py b/gateware/sampler/__init__.py new file mode 100644 index 0000000..e778d38 --- /dev/null +++ b/gateware/sampler/__init__.py @@ -0,0 +1,61 @@ +from .sampler import Sampler +from .controller import SamplerController + + +""" +Random implementation notes: + +- Circular buffers can keep overwriting. We only need a setting to say how many samples to save after + trigger occurs. +- Data valid from samplers to FIFOs can simply be gated via the enable signal. Everything can just run + all the time to keep things simple +- can we correct clock skew on the sample clock via Lattice primitives? I think it's possible. I doubt it + matters. Would need significant calibration effort to even have it be accurate. +- Trigger system should wait a couple clocks after trigger acquired to disable FIFOs, just in case the + CDC sync happens a bit late for some ADC channels + +Configurable parameters: +- trigger_run_len: number of samples to acquire after triggered sample (can technically be arbitrarily +large, circular buffer handles data loss, should be larger than trigger_thresh_time to make sure buffers +don't get weird) +- trigger_thresh_value: minimum peak to peak value to consider triggered +- trigger_thresh_time: minimum num samples that peak must be above threshold to count as a trigger + (trigger sample number is the first sample above the threshold value) (must be >= 1) +- trigger_decay_value: decay value to subtract from peak values to potentially reduce false triggers +- trigger_decay_period: number of samples per decay application + + +Implementation of trigger (psuedocode), happens every sample update: + +if triggered: + if num_samples + 1 >= trigger_run: + disable_trigger() + return + + num_samples += 1 + return + +if sample > max: + max = sample +elif sample < min: + min = sample + +if (max - min) > trigger_thresh_value: + if triggered_for + 1 >= trigger_thresh_time: + triggered = True + num_samples = 0 + return + + triggered_for += 1 + decay_wait = 0 +else: + triggered_for = 0 + decay_wait += 1 + + if trigger_decay_period == 0 or decay_wait == trigger_thresh_time: + decay_wait = 0 + + if (max - trigger_decay_value) > (min + trigger_decay_value): + max -= trigger_decay_value + min += trigger_decay_value +""" diff --git a/gateware/sampler/circular_buffer.py b/gateware/sampler/circular_buffer.py new file mode 100644 index 0000000..0012697 --- /dev/null +++ b/gateware/sampler/circular_buffer.py @@ -0,0 +1,169 @@ +from migen import * + +from litex.soc.interconnect.wishbone import * + +from math import log2, ceil + +class CircularBuffer(Module): + """ + Circular buffer implementation that allows users to read the entire data. + + Assumptions: + - Reading values while writes are ocurring does not need to have well-defined behaviour + + Implementation is largely based on Migen SyncFIFO, just tweaked to operate how I want + """ + + def __init__(self, width: int, depth: int, with_wb = True) -> None: + storage = Memory(width=width, depth=depth) + self.specials += storage + + ptr_width = ceil(log2(depth)) + + # External Signals + self.len = Signal(ptr_width) # Amount of valid data in the buffer + self.clear = Signal() # Strobe to clear memory + self.rd_addr = Signal(ptr_width) + self.rd_data = Signal(width) + + self.wr_data = Signal(width) + self.wr_ready = Signal() # Output, signals buffer is ready to be written to + self.wr_valid = Signal() # Input, high when data is present to be written + + wr_ptr = Signal(ptr_width) + rd_ptr = Signal(ptr_width) + empty = Signal(reset=1) # Extra signal to distinguish between full and empty condition + + # Hook write input signals to memory + wr_port = storage.get_port(write_capable=True) + # Always ready to write data into memory, so hook these signals straight in + self.comb += [ + wr_port.adr.eq(wr_ptr), + wr_port.dat_w.eq(self.wr_data), + wr_port.we.eq(self.wr_valid), + self.wr_ready.eq(1), # We are always ready to write data in + ] + + # Advance write (and potentially read) + self.sync += [ + If(self.wr_valid, + # We aren't empty anymore, and we won't be until we are cleared + empty.eq(0), + + # Advance write pointer + If(wr_ptr < (depth - 1), + wr_ptr.eq(wr_ptr + 1)) + .Else(wr_ptr.eq(0)), + + # Advance read pointer if we are full (e.g. overwrite old data) + If(~empty & (wr_ptr == rd_ptr), + If(rd_ptr < (depth - 1), + rd_ptr.eq(rd_ptr + 1)) + .Else(rd_ptr.eq(0)) + ) + ) + ] + + # TODO should I actually set async_read? + rd_port = storage.get_port(async_read=True) + # Set read addr so 0 starts at rd_ptr and wraps around, and connect read data up + self.comb += [ + If(self.rd_addr + rd_ptr < depth, + rd_port.adr.eq(self.rd_addr + rd_ptr)) + .Else( + rd_port.adr.eq(self.rd_addr - (depth - rd_ptr)) + ), + self.rd_data.eq(rd_port.dat_r), + ] + + # Export the length present + self.comb += [ + If(empty, self.len.eq(0)) + .Else( + If(wr_ptr > rd_ptr, + self.len.eq(wr_ptr - rd_ptr)) + .Elif(wr_ptr != rd_ptr, + self.len.eq(depth - (rd_ptr - wr_ptr))) + .Else( + self.len.eq(depth) + ) + ), + ] + + # "Clear" out memory if clear is strobed + # NOTE really clear should be hooked into reset, but I'm not clear on how to do that. + # Technically there's some glitches that can happen here if we write data while clear + # is asserted, but that shouldn't happen and it's fine if it does tbh. + self.sync += If(self.clear, + wr_ptr.eq(0), rd_ptr.eq(0), empty.eq(1)) + + # Add wishbone bus to access data + if with_wb: + self.bus = Interface(data_width=32, adr_width=ceil(log2(depth))) + + self.comb += self.rd_addr.eq(self.bus.adr) + self.sync += [ + self.bus.ack.eq(0), + self.bus.dat_r.eq(0), + If(~self.bus.we & self.bus.cyc & self.bus.stb, + self.bus.ack.eq(1), self.bus.dat_r.eq(self.rd_data)), + ] + + +def testbench(): + dut = CircularBuffer(9, 24) + def test_fn(): + assert (yield dut.len) == 0 + assert (yield dut.wr_ready) == 1 + + # Clock some data in, check len + data = [0xDE, 0xAD, 0xBE, 0xEF] + for b in data: + (yield dut.wr_data.eq(b)) + (yield dut.wr_valid.eq(1)) + yield + + # Stop clocking data in + (yield dut.wr_valid.eq(0)) + # Tick again because setting a value waits until the next clock... + yield + + fifo_len = (yield dut.len) + assert fifo_len == 4, f"len should be 4, is {fifo_len}" + + # Reset + (yield dut.clear.eq(1)) + yield + (yield dut.clear.eq(0)) + yield + + # Len should be cleared + assert (yield dut.len) == 0 + + # Clock more data in than capacity, check that we can read out + # the expected data + data = [r for r in range(32)] # Yes yes I could use a generator but I want to slice it later + for b in data: + (yield dut.wr_data.eq(b)) + (yield dut.wr_valid.eq(1)) + yield + + # One more clock + (yield dut.wr_valid.eq(0)) + yield + + data_len = (yield dut.len) + assert data_len == 24, f"len should be 24, is {data_len}" + out_data = [] + for i in range(24): + (yield dut.rd_addr.eq(i)) + yield + + out_data.append((yield dut.rd_data)) + + assert out_data[i] == data[i + 8], f"Data mismatch at index {i}, should be {data[i+8]}, is {out_data[i]}" + + # At this point, everything seems to be good, so I'm leaving more exhaustive testing + + run_simulation(dut, test_fn()) + diff --git a/gateware/sampler/controller.py b/gateware/sampler/controller.py new file mode 100644 index 0000000..1144c4d --- /dev/null +++ b/gateware/sampler/controller.py @@ -0,0 +1,281 @@ +from migen import * + +from litex.soc.interconnect.wishbone import * + +from math import ceil, log2 +from typing import List + +from .sampler import Sampler +from .circular_buffer import CircularBuffer +from .peak_detector import PeakDetector + +class SamplerController(Module): + """ + Sampler control + + Attributes + ---------- + bus: + Slave wishbone bus to be connected to a higher-level bus. Has an address width set according to + the provided buffer length. + + buffers: + List of FIFO buffer objects used to store sample data. + + samplers: + List of sampler objects provided by user. + + Registers + -------- + 0x00: Control Register (RW) + Bit 0 - Begin capture. Resets all FIFOs and starts the peak detector + + 0x01: Status Register (RO) + Bit 0 - Capture complete. Set by peak detection block and cleared by software or when + + 0x02: trigger_run_len (RW) + Number of samples to acquire after triggering sample. + + 0x03: thresh_value (RW) + Minimum peak to peak value considered triggered + + 0x04: thresh_time (RW) + Number of consecutive samples above threshold required to consider triggered + + 0x05: decay_value (RW) + Decay value to subtract from peak values to prevent false triggers + + 0x06: decay_period (RW) + Number of samples between each application of decay + + 0x1xx: BUFFER_LEN_X (RO) + Lenght of data in buffer, up to the number of samplers provided. + + """ + def __init__(self, samplers: List[Sampler], buffer_len): + + self.samplers = samplers + num_channels = len(samplers) + + # Enables reading in samples + sample_enable = Signal() + # Pull in only one CDC sync signal + sample_ready = self.samplers[0].valid + + # Generate buffers for each sampler + self.buffers = [CircularBuffer(9, buffer_len) for _ in range(num_channels)] + + # Connect each buffer to each sampler + for buffer, sampler in zip(self.buffers, self.samplers): + self.comb += [ + # Connect only top 9 bits to memory + buffer.wr_data.eq(sampler.data[1:]), + # Writes enter FIFO only when enabled and every clock cycle + buffer.wr_valid.eq(sample_enable & sample_ready), + ] + + + # Each sampler gets some chunk of memory at least large enough to fit + # all of it's data, so use that as a consistent offset + sample_mem_addr_width = ceil(log2(buffer_len)) + # 1 control block + number of channels used = control bits + control_block_addr_width = ceil(log2(num_channels + 1)) + + # Bus address width + addr_width = control_block_addr_width + sample_mem_addr_width + + # "Master" bus + self.bus = Interface(data_width=32, addr_width=addr_width) + + # Wishbone bus used for mapping control registers + self.control_regs_bus = Interface(data_width=32, addr_width=sample_mem_addr_width) + + slaves = [] + slaves.append((lambda adr: adr[sample_mem_addr_width:] == 0, self.control_regs_bus)) + + for i, buffer in enumerate(self.buffers): + # Connect subordinate buses of buffers to decoder + slaves.append((lambda adr: adr[sample_mem_addr_width:] == i + 1, buffer.bus)) + + adr = (i + 1) << sample_mem_addr_width + print(f"Sampler {i} available at 0x{adr:08x}") + + self.decoder = Decoder(self.bus, slaves) + # TODO how to submodule + self.submodules.decoder = self.decoder + + self.peak_detector = PeakDetector(10) + self.comb += [ + # Simply enable whenever we start capturing + self.peak_detector.enable.eq(sample_enable), + # Connect to the first ADC + self.peak_detector.data.eq(self.samplers[0].data), + # Use the same criteria as the fifo buffer + self.peak_detector.data_valid.eq(sample_enable & sample_ready), + ] + + #### Control register logic + + # Storage + control_register = Signal(32) + status_register = Signal(32) + trigger_run_len = Signal(32) + + def rw_register(storage: Signal, *, read: bool = True, write: bool = True): + if read: + read = self.control_regs_bus.dat_r.eq(storage) + else: + read = self.control_regs_bus.ack.eq(0) + + if write: + write = storage.eq(self.control_regs_bus.dat_w) + else: + write = self.control_regs_bus.ack.eq(0) + + return If(self.control_regs_bus.we, write).Else(read) + + # Handle explicit config registers + cases = { + 0: rw_register(control_register), + 1: rw_register(status_register, write=False), + 2: rw_register(trigger_run_len), + 3: rw_register(self.peak_detector.thresh_value), + 4: rw_register(self.peak_detector.thresh_time), + 5: rw_register(self.peak_detector.decay_value), + 6: rw_register(self.peak_detector.decay_period), + + "default": rw_register(None, read=False, write=False) + } + + # Handle length values for each sample buffer + for i, buffer in enumerate(self.buffers): + cases.update({0x100 + i: rw_register(buffer.len, write=False)}) + + # Connect up control registers bus + self.sync += [ + self.control_regs_bus.ack.eq(0), + If(self.control_regs_bus.cyc & self.control_regs_bus.stb, + self.control_regs_bus.ack.eq(1), + Case(self.control_regs_bus.adr, cases)), + ] + + # Handle the control logic + post_trigger_count = Signal(32) + self.sync += [ + # Reset state whenever sampling is disabled + If(~sample_enable, post_trigger_count.eq(0)), + + # Reset triggering status if we have started sampling + # (peak_detector.triggered resets if sample_enable is de-asserted, so + # this is a reliable reset mechanism) + If(sample_enable & ~self.peak_detector.triggered, + status_register[0].eq(0)), + + # Keep sampling past the trigger for the configured number of samples + If(self.peak_detector.triggered & sample_enable & sample_ready, + post_trigger_count.eq(post_trigger_count + 1), + + # We have sampled enough, update status and stop sampling + If(post_trigger_count + 1 >= trigger_run_len, + status_register[0].eq(1), + control_register[0].eq(0))), + ] + + # Update register storage + self.comb += [ + sample_enable.eq(control_register[0]), + ] + +def write_wishbone(bus, address, value): + # Set up bus + (yield bus.adr.eq(address)) + (yield bus.dat_w.eq(value)) + (yield bus.stb.eq(1)) + (yield bus.cyc.eq(1)) + (yield bus.we.eq(1)) + yield + + cycles = 0 + while True: + cycles += 1 + assert cycles < 5, "Write fail" + + + if (yield bus.ack) == 1: + # We received a response, clear out bus status and exit + (yield bus.stb.eq(0)) + (yield bus.cyc.eq(0)) + yield + + break + else: + # Tick until we receive an ACK + yield + + +def read_wishbone(bus, address,): + """Sets up a read transaction. Due to limitations of the simulation method, you have to read + from dat_r, and also tick immediately after calling""" + # Set up bus + (yield bus.adr.eq(address)) + (yield bus.stb.eq(1)) + (yield bus.cyc.eq(1)) + (yield bus.we.eq(0)) + yield + + cycles = 0 + while True: + cycles += 1 + assert cycles < 5, "Write fail" + if (yield bus.ack) == 1: + # We received a response, clear out bus status and exit + (yield bus.stb.eq(0)) + (yield bus.cyc.eq(0)) + + break + else: + # Tick until we receive an ACK + yield + +class MockSampler(Module): + """ + Attributes + ---------- + All Sampler attributes by default, plus the following: + + index: + Index of data to use from provided data + """ + def __init__(self, data: List[int]): + memory = Memory(width=10, depth=len(data), init=data) + + self.index = Signal(ceil(log2(len(data)))) + self.data = Signal(10) + self.valid = Signal() + + read_port = memory.get_port(async_read=True) + self.comb += [ + read_port.adr.eq(self.index), + self.data.eq(read_port.dat_r), + ] + +class TestSoC(Module): + def __init__(self, data): + sampler = MockSampler(data) + self.submodules.sampler = sampler + # TODO multiple mock samplers to test that functionality + self.controller = SamplerController([MockSampler(data)], 1024) + self.submodules.controller = self.controller + self.bus = self.controller.bus + + +def test_bus_access(): + dut = TestSoC([2, 3, 4, 5]) + def test_fn(): + yield from write_wishbone(dut.bus, 2, 0xDEADBEEF) + yield from read_wishbone(dut.bus, 2) + assert (yield dut.bus.dat_r) == 0xDEADBEEF, "Read failed!" + + # TODO test writing to RO register fails + + run_simulation(dut, test_fn(), vcd_name="test_bus_access.vcd") diff --git a/gateware/sampler/peak_detector.py b/gateware/sampler/peak_detector.py new file mode 100644 index 0000000..bc42b33 --- /dev/null +++ b/gateware/sampler/peak_detector.py @@ -0,0 +1,98 @@ +from migen import * + +class PeakDetector(Module): + """ + Module to detect when peak to peak voltage is high enough to consider incoming + data to be a valid ping. Configuration is provided by setting the configuration + attributes. Do not change these settings while detector is running. + + Attributes + ---------- + data: (input) + Data signal to use for detection + + data_valid: (input) + Strobed signal that indicates value on `data` is valid to be read + + enable: (input) + Enables running peak detection. De-asserting this will clear all state variables + + triggered: (output) + Signal that indicates peak has been triggered. Only cleared once enable is de-asserted again + + Configuration Attributes + ------------------------ + thresh_value: + Minimum peak to peak value considered triggered + + thresh_time: + Number of consecutive samples above threshold required to consider triggered + + decay_value: + Decay value to subtract from peak values to prevent false triggers + + decay_period: + Number of samples between each application of decay + """ + + def __init__(self, data_width: int): + # Create all state signals + min_val = Signal(data_width) + max_val = Signal(data_width) + diff = Signal(data_width) + triggered_time = Signal(32) + decay_counter = Signal(32) + + # Control signals + self.data = Signal(data_width) + self.data_valid = Signal() + self.enable = Signal() + self.triggered = Signal() + + # Configuration Parameters + self.thresh_value = Signal(data_width) + self.thresh_time = Signal(32) + self.decay_value = Signal(data_width) + self.decay_period = Signal(32) + + self.sync += If(~self.enable, + # Reset halfway. ADCs are 0-2V, and everything should be centered at 1V, so this is approximating the initial value + min_val.eq(int(2**data_width /2)), + max_val.eq(int(2**data_width /2)), + self.triggered.eq(0), + decay_counter.eq(0), + triggered_time.eq(0), + ) + + # Constantly updating diff to simplify some statements + self.comb += diff.eq(max_val - min_val) + + self.sync += If(self.enable & self.data_valid, + # Update maximum value + If(self.data > max_val, max_val.eq(self.data)), + # Update minimum value + If(self.data < min_val, min_val.eq(self.data)), + If(diff > self.thresh_value, + # We have met the threshold for triggering, start counting + triggered_time.eq(triggered_time + 1), + decay_counter.eq(0), + + # We have triggered, so we can set the output. After this point, + # nothing we do matters until enable is de-asserted and we reset + # triggered. + If(triggered_time + 1 >= self.thresh_time, self.triggered.eq(1))) + .Else( + # We have not met the threshold, reset timer and handle decay + triggered_time.eq(0), + decay_counter.eq(decay_counter + 1), + + # Decay threshold has been reached, apply decay to peaks + If(decay_counter >= self.decay_period, + decay_counter.eq(0), + + # Only apply decay if the values would not overlap + If(diff >= (self.decay_value << 1), + max_val.eq(max_val - self.decay_value), + min_val.eq(min_val + self.decay_value))) + ) + ) diff --git a/gateware/sampler/sampler.py b/gateware/sampler/sampler.py new file mode 100644 index 0000000..ff16d26 --- /dev/null +++ b/gateware/sampler/sampler.py @@ -0,0 +1,33 @@ +from migen import * +from migen.genlib.cdc import PulseSynchronizer + + +class Sampler(Module): + def __init__(self, adc_pins: Record, sampler_clock: Signal): + # self.clock_domains.foo = ClockDomain() is how to add a new clock domain, accessible at self.foo + # Connect sampler clock domain + self.clock_domains.sample_clock = ClockDomain("sample_clock") + self.comb += self.sample_clock.clk.eq(sampler_clock) + + # Hook up ADC REFCLK to sample_clock + self.comb += adc_pins.refclk.eq(sampler_clock) + + # We can synchronize to the sampler clock, whenever it goes high we can + # strobe a single valid signal + synchronizer = PulseSynchronizer("sample_clock", "sys") + self.submodules += synchronizer + + self.valid = Signal() + self.data = Signal(10) + + self.comb += [ + synchronizer.i.eq(self.sample_clock.clk), + self.valid.eq(synchronizer.o), + self.data.eq(adc_pins.data), + ] + + # Set config pins to constant values + self.comb += adc_pins.oen_b.eq(0) # Data pins enable + self.comb += adc_pins.standby.eq(0) # Sampling standby + self.comb += adc_pins.dfs.eq(0) # DFS (raw or two's complement) + # The only remaining pin, OTR, is an out of range status indicator diff --git a/gateware/test.py b/gateware/test.py new file mode 100644 index 0000000..35c4538 --- /dev/null +++ b/gateware/test.py @@ -0,0 +1,47 @@ +""" +Helper functions for a basic test suite +""" + +from typing import Callable +from enum import StrEnum +from dataclasses import dataclass +from traceback import print_exc + + +class TestResult(StrEnum): + PASS = "PASS" + FAIL = "FAIL" + SKIP = "SKIP" + + +@dataclass +class TestInfo: + suite_name: str + test_name: str + result: TestResult + + def __str__(self): + # TODO colour? + return f"[{self.suite_name}.{self.test_name}] {self.result}" + + + +def run_test(suite_name: str, test_fn: Callable, do_skip = False) -> TestResult: + test_name = test_fn.__name__ + + print(f"[{suite_name}.{test_name}] Running...") + + if do_skip: + res = TestInfo(suite_name, test_name, TestResult.SKIP) + else: + try: + test_fn() + res = TestInfo(suite_name, test_name, TestResult.PASS) + except AssertionError: + res = TestInfo(suite_name, test_name, TestResult.FAIL) + print_exc() + + + + print(res) + return res diff --git a/gateware/test_i2c.py b/gateware/test_i2c.py deleted file mode 100644 index 2013faa..0000000 --- a/gateware/test_i2c.py +++ /dev/null @@ -1,149 +0,0 @@ -from amaranth import * -from i2c import * -from amlib.io.i2c import * -from amaranth.lib.io import pin_layout -from tests import BaseTestClass, provide_testcase_name - - -__all__ = ["i2c_layout", "I2CBusSimulator", "TestHarness", "TestCSROperation"] - - -class TestHarness(Elaboratable): - def __init__(self): - self.i2c = I2CBusSimulator() - - self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface()) - self.i2c_target = I2CTarget(self.i2c.create_interface()) - - self.start_latch = Signal() - self.clear_start = Signal() - - - def elaborate(self, platform): - assert platform is None - - m = Module() - - m.submodules.i2c = self.i2c - m.submodules.uut = self.uut - m.submodules.i2c_target = self.i2c_target - - m.d.comb += self.i2c_target.address.eq(0xAA >> 1) - # Always ACK - m.d.comb += self.i2c_target.ack_o.eq(1) - - with m.If(self.i2c_target.start): - m.d.sync += self.start_latch.eq(self.i2c_target.start) - - with m.If(self.clear_start): - m.d.sync += self.start_latch.eq(0) - - return m - - -class TestCSROperation(BaseTestClass): - def setUp(self): - self.harness = TestHarness() - - - # NOTE So ideally there are more test cases... but the initiator itself is well tested, - # and we only really need it to work for a limited set of use cases, so exhaustive testing - # isn't a huge deal. As well, we can cover all valid uses of the signals with one test. - @provide_testcase_name - def test_operation(self, test_name): - def test(): - #send start (and set ACK) - yield from self._write_csr(self.harness.uut.bus, 0, 1 + (1 << 4) + (1 << 5)) - - yield from self._wait_for_signal(self.harness.uut._initiator.busy, require_edge=True) - - # Set data - yield from self._write_csr(self.harness.uut.bus, 2, 0xAA) - # Write data - yield from self._write_csr(self.harness.uut.bus, 0, 1 << 2) - - yield from self._wait_for_signal(self.harness.uut._initiator.busy) - - # First byte has been written - did_start = yield self.harness.start_latch - self.assertTrue(did_start) - - did_ack = yield self.harness.uut._initiator.ack_o - self.assertTrue(did_ack) - - # Write data again - yield from self._write_csr(self.harness.uut.bus, 0, 1 << 2) - - yield from self._wait_for_signal(self.harness.uut._initiator.busy) - - # Repeated start - yield from self._write_csr(self.harness.uut.bus, 0, 1) - yield from self._wait_for_signal(self.harness.uut._initiator.busy) - - # Write read thing - yield from self._write_csr(self.harness.uut.bus, 2, 0xAB) # Set R/W bit for a read - yield from self._write_csr(self.harness.uut.bus, 0, 1 << 2) - yield from self._wait_for_signal(self.harness.uut._initiator.busy) - - # Read - yield from self._write_csr(self.harness.uut.bus, 0, 1 << 3) - yield from self._wait_for_signal(self.harness.uut._initiator.busy) - - # Stop - yield from self._write_csr(self.harness.uut.bus, 0, 1 << 1) - yield from self._wait_for_signal(self.harness.uut._initiator.busy) - - # I just feel weird seeing it cut out *right* at the end - for i in range(500): - yield Tick() - - self._run_test(test, test_name) - - -i2c_layout = [ - ("sda", pin_layout(1, "io")), - ("scl", pin_layout(1, "io")), -] - - -class I2CBusSimulator(Elaboratable): - def __init__(self): - self.interfaces = [] - self.sda = Signal() - self.scl = Signal() - - - def elaborate(self, target): - assert target is None, "This bus simulator should never be used in real hardware!" - - n = len(self.interfaces) - - m = Module() - - m.d.comb += self.sda.eq(1) - m.d.comb += self.scl.eq(1) - - # TODO maybe output a bus contention signal? - # First interfaces get priority over interfaces added after - for i in reversed(range(n)): - # Emulate bus drivers - with m.If(self.interfaces[i].sda.oe): - m.d.comb += self.sda.eq(self.interfaces[i].sda.o) - with m.If(self.interfaces[i].scl.oe): - m.d.comb += self.scl.eq(self.interfaces[i].scl.o) - pass - - # Connect inputs to bus value - m.d.comb += [ - self.interfaces[i].sda.i.eq(self.sda), - self.interfaces[i].scl.i.eq(self.scl), - ] - - return m - - - def create_interface(self) -> Record: - new_interface = Record(i2c_layout) - self.interfaces.append(new_interface) - return new_interface - diff --git a/gateware/test_uart.py b/gateware/test_uart.py deleted file mode 100644 index 59bed32..0000000 --- a/gateware/test_uart.py +++ /dev/null @@ -1,57 +0,0 @@ -from amaranth import * -from amaranth.sim import * -from amlib.io.serial import * -from uart import * -from tests import BaseTestClass, provide_testcase_name - - -__all__ = ["TestHarness", "TestUART"] - - -class TestHarness(Elaboratable): - def __init__(self): - self.uut = UART(10e6, fifo_depth=16) - self.uart = AsyncSerial(divisor=int(10e6 // 115200), divisor_bits=16, data_bits=8, parity="none") - - - def elaborate(self, platform): - assert platform is None - - m = Module() - m.submodules.uut = self.uut - m.submodules.uart = self.uart - - # Connect UART lines - m.d.comb += [ - self.uut.rx.eq(self.uart.tx.o), - self.uart.rx.i.eq(self.uut.tx), - ] - - # Connect the data lines so we are always pulling data out... for now - m.d.comb += [ - self.uart.rx.ack.eq(1), - ] - - return m - - -class TestUART(BaseTestClass): - def setUp(self): - self.harness = TestHarness() - - - @provide_testcase_name - def test_operation(self, test_name): - def test(): - for i in range(20): - yield from self._write_csr(self.harness.uut.bus, 2, i) - - for _ in range(2000): - yield Tick() - - yield from self._write_csr(self.harness.uut.bus, 0, 1000) - - for _ in range(20000): - yield Tick() - - self._run_test(test, test_name) diff --git a/gateware/tests.py b/gateware/tests.py deleted file mode 100644 index 7ef0bfc..0000000 --- a/gateware/tests.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Set of utilities to build a simple test suite. -""" -from amaranth import * -from amaranth.sim import * - -from typing import Generator -import unittest -import os - -from contextlib import nullcontext - -class BaseTestClass(unittest.TestCase): - """ - Base test class that provides a run_test helper function to do all the nice things. - """ - - def _run_test(self, test: Generator, name: str): - try: - sim = Simulator(self.harness) - except NameError: - raise NotImplementedError(f"Must define a self.harness module for TestCase {self.__class__.__name__}!") - - sim.add_clock(100e-9) - sim.add_sync_process(test) - sim.reset() - - # Pretty hacky way to pass this info in but does it look like I care? - if os.environ.get("TEST_SAVE_VCD"): - ctx = sim.write_vcd(f"vcd_out/{name}.vcd") - else: - ctx = nullcontext() - - with ctx: - sim.run() - - del sim - - - ######### Random Utilities ######## - def _write_csr(self, bus, index, data): - yield bus.addr.eq(index) - yield bus.w_stb.eq(1) - yield bus.w_data.eq(data) - yield Tick() - yield bus.w_stb.eq(0) - yield Tick() - - - def _wait_for_signal(self, signal, polarity=False, require_edge=True, timeout=1000): - ready_for_edge = not require_edge # If we don't require edge, we can just ignore - - while True: - timeout -= 1 - if timeout == 0: - self.fail(f"_wait_for_signal({signal}, {polarity}, {require_edge}, {timeout}, timed out!") - - read = yield signal - if read == polarity: - if ready_for_edge: - break - else: - ready_for_edge = True - - yield Tick() - - -def provide_testcase_name(fn): - """Decorator that provides a function with access to its own class and name.""" - def wrapper(self): - fn(self, f"{self.__class__.__name__}.{fn.__name__}") - - return wrapper diff --git a/gateware/timer.py b/gateware/timer.py deleted file mode 100644 index 337f85f..0000000 --- a/gateware/timer.py +++ /dev/null @@ -1,37 +0,0 @@ -from amaranth import * -from amaranth_soc.wishbone import * -from amaranth_soc.memory import * -from math import ceil, log2 - -class TimerPeripheral(Elaboratable, Interface): - def __init__(self, clock_freq: int, wanted_freq: int): - Interface.__init__(self, addr_width=1, data_width=32, granularity=8) - memory_map = MemoryMap(addr_width=3, data_width=8) - self.memory_map = memory_map - - self.ratio = ceil(clock_freq / wanted_freq) - - def elaborate(self, platform): - m = Module() - - counter = Signal(ceil(log2(self.ratio))) - value = Signal(32) - - # Up count - m.d.sync += counter.eq(counter + 1) - - # Divider value reached, increment - with m.If(counter >= self.ratio): - m.d.sync += [ - value.eq(value + 1), - counter.eq(0), - ] - - m.d.sync += self.ack.eq(0) - with m.If(self.cyc & self.stb): - m.d.sync += [ - self.ack.eq(1), - self.dat_r.eq(value), - ] - - return m \ No newline at end of file diff --git a/gateware/uart.py b/gateware/uart.py deleted file mode 100644 index 3f1aebe..0000000 --- a/gateware/uart.py +++ /dev/null @@ -1,146 +0,0 @@ -from amaranth import * -from amaranth.lib.fifo import SyncFIFO -from amaranth_soc.csr import * -from amlib.io.serial import * - -from math import ceil, log2 - - -class UART(Elaboratable): - """ - CSR-enabled UART TX/RX peripheral. - - Parameters - ---------- - :param clk_freq: - System clock frequency, used for default divisor calculation. - :param default_baud: - Default baud rate to set divisor for. - :param fifo_depth: - Depth (in bytes) of RX and TX FIFOs. - :param pins: - Optional parameter to supply platform pins into module. - - Attributes - ---------- - :attr bus: - CSR bus to provide access to control registers. - :attr tx: - TX signal. Only created if pins=None, connected to AsyncSerial.tx.o - :attr rx: - RX signal. Only created if pins=None, connected to AsyncSerial.rx.o - """ - def __init__(self, clk_freq, default_baud=115200, fifo_depth=128, pins=None): - self.fifo_depth = fifo_depth - self._pins = pins - - # Clock divisor register - # - # Sets input/output baudrate to system clock / divisor. Resets to value - # that provides 115200 baud rate. Writes to this register clear FIFOs. - self.DIVISOR = Element(16, Element.Access.RW, name="UART_DIVISOR") - - # Status register. - # - # Fields: - # [0]: txfifo_full - # [1]: txfifo_empty - # [2]: rxfifo_full - # [3]: rxfifo_empty - self.SR = Element(4, Element.Access.R, name="UART_SR") - - # Data register. - # - # Writes push data into TX FIFO, and are discarded if full, reads pull - # data from RX FIFO, and are invalid if it is empty. Incoming bytes are discarded - # if the RX FIFO is full. - self.DR = Element(8, Element.Access.RW, name="UART_DR") - - # Set up CSR bus - addr_width = ceil(log2(64)) - data_width = 8 - self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width) - div_start, _stop = self._csr_mux.add(self.DIVISOR) - sr_start, _stop = self._csr_mux.add(self.SR) - dr_start, _stop = self._csr_mux.add(self.DR) - print(f"UART added. DIVISOR 0x{div_start:x}, SR 0x{sr_start:x}, DR 0x{dr_start:x}") - self.bus = self._csr_mux.bus - - # Actual business logic - self._serial = AsyncSerial(divisor=int(clk_freq // default_baud), divisor_bits=16, data_bits=8, parity="none", pins=pins) - self._tx_fifo = SyncFIFO(width=8, depth=self.fifo_depth) - self._rx_fifo = SyncFIFO(width=8, depth=self.fifo_depth) - - # Optional RX/TX signals - if self._pins is None: - self.tx = Signal() - self.rx = Signal() - - - def elaborate(self, platform): - m = Module() - - # Seperate clock domain to allow for resetting FIFOs separately - m.domains += ClockDomain("fifo", local=True) - m.d.comb += ClockSignal("fifo").eq(ClockSignal("sync")) - m.d.comb += ResetSignal("fifo").eq(self.DIVISOR.w_stb) # Reset on a write to DIVISOR as well - - fifo_domain = DomainRenamer("fifo") - self._tx_fifo = fifo_domain(self._tx_fifo) - self._rx_fifo = fifo_domain(self._rx_fifo) - - m.submodules.serial = self._serial - m.submodules.tx_fifo = self._tx_fifo - m.submodules.rx_fifo = self._rx_fifo - m.submodules.csr_mux = self._csr_mux - - # Hook up divisor to register. - # TODO do some validation and write a known good value if a dumb value was provided - m.d.comb += self.DIVISOR.r_data.eq(self._serial.divisor) - with m.If(self.DIVISOR.w_stb): - m.d.sync += self._serial.divisor.eq(self.DIVISOR.w_data) - - # SR Hookups - m.d.comb += [ - self.SR.r_data[0].eq(self._tx_fifo.level == self.fifo_depth), # txfifo_full - self.SR.r_data[1].eq(self._tx_fifo.level == 0), # txfifo_empty - self.SR.r_data[2].eq(self._rx_fifo.level == self.fifo_depth), # rxfifo_full - self.SR.r_data[3].eq(self._rx_fifo.level == 0), # rxfifo_empty - ] - - # DR hookups - m.d.comb += [ - # Plumb read data in, and connect CSR read strobe to FIFO r_en. - # We can ignore r_rdy because we specify empty reads are invalid. - self.DR.r_data.eq(self._rx_fifo.r_data), - self._rx_fifo.r_en.eq(self.DR.r_stb), - - # Plumb write data from CSR to FIFO, connect write strobe to FIFO w_en. - # We can ignore w_rdy, because we specify writes to a full FIFO are dropped. - self._tx_fifo.w_data.eq(self.DR.w_data), - self._tx_fifo.w_en.eq(self.DR.w_stb), - ] - - # Hook serial devices into FIFOs - rx_err = Signal() - m.d.comb += [ - # RX - rx_err.eq(self._serial.rx.err.overflow & self._serial.rx.err.frame & self._serial.rx.err.parity), - self._rx_fifo.w_data.eq(self._serial.rx.data), - self._rx_fifo.w_en.eq(self._serial.rx.rdy & ~rx_err), # Only pull data into FIFO if no RX error - self._serial.rx.ack.eq(self._rx_fifo.w_rdy | rx_err), # Pull data out if there is an error anyways - - # TX - self._serial.tx.data.eq(self._tx_fifo.r_data), - self._serial.tx.ack.eq(self._tx_fifo.r_rdy), - self._tx_fifo.r_en.eq(self._serial.tx.rdy), - ] - - # Optionally connect out RX/TX signals, if pins are not provided (likely in sim) - if self._pins is None: - m.d.comb += [ - self._serial.rx.i.eq(self.rx), - self.tx.eq(self._serial.tx.o), - ] - - return m