Compare commits
3 Commits
490b92b1fd
...
f0683e684c
Author | SHA1 | Date | |
---|---|---|---|
f0683e684c | |||
9997db0ac8 | |||
ad3be1f4c7 |
3
gateware/.gitignore
vendored
3
gateware/.gitignore
vendored
@ -4,4 +4,5 @@ __pycache__
|
|||||||
|
|
||||||
# Sim artifacts
|
# Sim artifacts
|
||||||
*.vcd
|
*.vcd
|
||||||
*.gtkw
|
*.gtkw
|
||||||
|
vcd_out/
|
127
gateware/i2c.py
127
gateware/i2c.py
@ -121,130 +121,3 @@ class I2C(Elaboratable):
|
|||||||
|
|
||||||
return m
|
return m
|
||||||
|
|
||||||
from amaranth.lib.io import pin_layout
|
|
||||||
i2c_layout = [
|
|
||||||
("sda", pin_layout(1, "io")),
|
|
||||||
("scl", pin_layout(1, "io")),
|
|
||||||
]
|
|
||||||
|
|
||||||
class I2CBusSimulator(Elaboratable):
|
|
||||||
def __init__(self):
|
|
||||||
self.interfaces = []
|
|
||||||
self.sda = Signal()
|
|
||||||
self.scl = Signal()
|
|
||||||
|
|
||||||
def elaborate(self, target):
|
|
||||||
assert target is None, "This bus simulator should never be used in real hardware!"
|
|
||||||
|
|
||||||
n = len(self.interfaces)
|
|
||||||
|
|
||||||
m = Module()
|
|
||||||
|
|
||||||
m.d.comb += self.sda.eq(1)
|
|
||||||
m.d.comb += self.scl.eq(1)
|
|
||||||
|
|
||||||
# TODO maybe output a bus contention signal?
|
|
||||||
# First interfaces get priority over interfaces added after
|
|
||||||
for i in reversed(range(n)):
|
|
||||||
# Emulate bus drivers
|
|
||||||
with m.If(self.interfaces[i].sda.oe):
|
|
||||||
m.d.comb += self.sda.eq(self.interfaces[i].sda.o)
|
|
||||||
with m.If(self.interfaces[i].scl.oe):
|
|
||||||
m.d.comb += self.scl.eq(self.interfaces[i].scl.o)
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Connect inputs to bus value
|
|
||||||
m.d.comb += [
|
|
||||||
self.interfaces[i].sda.i.eq(self.sda),
|
|
||||||
self.interfaces[i].scl.i.eq(self.scl),
|
|
||||||
]
|
|
||||||
|
|
||||||
return m
|
|
||||||
|
|
||||||
def create_interface(self) -> Record:
|
|
||||||
new_interface = Record(i2c_layout)
|
|
||||||
self.interfaces.append(new_interface)
|
|
||||||
return new_interface
|
|
||||||
|
|
||||||
|
|
||||||
class TestHarness(Elaboratable):
|
|
||||||
def __init__(self):
|
|
||||||
self.i2c = I2CBusSimulator()
|
|
||||||
|
|
||||||
self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface())
|
|
||||||
self.i2c_target = I2CTarget(self.i2c.create_interface())
|
|
||||||
|
|
||||||
|
|
||||||
def elaborate(self, platform):
|
|
||||||
assert platform is None
|
|
||||||
|
|
||||||
m = Module()
|
|
||||||
|
|
||||||
m.submodules.i2c = self.i2c
|
|
||||||
m.submodules.uut = self.uut
|
|
||||||
m.submodules.i2c_target = self.i2c_target
|
|
||||||
|
|
||||||
m.d.comb += self.i2c_target.address.eq(0xAA >> 1)
|
|
||||||
|
|
||||||
return m
|
|
||||||
|
|
||||||
# TODO switch to unittest or something
|
|
||||||
if __name__ == "__main__":
|
|
||||||
def write_csr(bus, index, data):
|
|
||||||
yield bus.addr.eq(index)
|
|
||||||
yield bus.w_stb.eq(1)
|
|
||||||
yield bus.w_data.eq(data)
|
|
||||||
yield Tick()
|
|
||||||
yield bus.w_stb.eq(0)
|
|
||||||
yield Tick()
|
|
||||||
|
|
||||||
def read_csr(bus, index):
|
|
||||||
yield bus.r_strb.eq(1)
|
|
||||||
ret = yield bus.r_data
|
|
||||||
yield Tick()
|
|
||||||
yield bus.r_stb.eq(0)
|
|
||||||
yield Tick()
|
|
||||||
|
|
||||||
harness = TestHarness()
|
|
||||||
sim = Simulator(harness)
|
|
||||||
|
|
||||||
def test_proc():
|
|
||||||
#send start
|
|
||||||
yield from write_csr(harness.uut.bus, 0, 1)
|
|
||||||
|
|
||||||
# TODO shouldn't need to do this, if I did a proper CSR read maybe?
|
|
||||||
yield Tick()
|
|
||||||
yield Tick()
|
|
||||||
|
|
||||||
# TODO I want something like read_csr, unsure how to implement
|
|
||||||
# Wait for
|
|
||||||
while True:
|
|
||||||
busy = yield harness.uut._initiator.busy
|
|
||||||
if not busy:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
yield Tick()
|
|
||||||
|
|
||||||
# Set data
|
|
||||||
yield from write_csr(harness.uut.bus, 2, 0xAA)
|
|
||||||
# Write data
|
|
||||||
yield from write_csr(harness.uut.bus, 0, 1 << 2)
|
|
||||||
|
|
||||||
did_start = False
|
|
||||||
for i in range(1000):
|
|
||||||
start_cond = yield harness.i2c_target.start
|
|
||||||
yield Tick()
|
|
||||||
|
|
||||||
if start_cond:
|
|
||||||
did_start = True
|
|
||||||
break
|
|
||||||
|
|
||||||
#assert did_start, "Idnaoidnwaioudnwaoiun"
|
|
||||||
print(did_start)
|
|
||||||
|
|
||||||
|
|
||||||
sim.add_clock(100e-9)
|
|
||||||
sim.add_sync_process(test_proc)
|
|
||||||
with sim.write_vcd('test.vcd'):
|
|
||||||
sim.reset()
|
|
||||||
sim.run()
|
|
||||||
|
@ -9,10 +9,16 @@ from minerva.core import Minerva
|
|||||||
|
|
||||||
from typing import List
|
from typing import List
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
from pathlib import Path
|
||||||
|
import unittest
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
from memory import *
|
from memory import *
|
||||||
from led import *
|
from led import *
|
||||||
|
|
||||||
|
import i2c
|
||||||
|
import test_i2c
|
||||||
|
|
||||||
# To change clock domain of a module:
|
# To change clock domain of a module:
|
||||||
# new_thing = DomainRenamer("new_clock")(MyElaboratable())
|
# new_thing = DomainRenamer("new_clock")(MyElaboratable())
|
||||||
@ -145,5 +151,14 @@ if __name__ == "__main__":
|
|||||||
colorlight_i9.Colorlight_i9_Platform().build(SoC(), debug_verilog=args.gen_debug_verilog)
|
colorlight_i9.Colorlight_i9_Platform().build(SoC(), debug_verilog=args.gen_debug_verilog)
|
||||||
|
|
||||||
if args.test:
|
if args.test:
|
||||||
# TODO pass save_vcd arg through
|
if args.save_vcd:
|
||||||
run_sim()
|
if not Path("vcd_out").exists():
|
||||||
|
os.mkdir("vcd_out")
|
||||||
|
|
||||||
|
os.environ["TEST_SAVE_VCD"] = "YES"
|
||||||
|
|
||||||
|
# Super hacky... why am I doing this
|
||||||
|
test_modules = [mod for mod in sys.modules if mod.startswith("test_")]
|
||||||
|
for mod in test_modules:
|
||||||
|
unittest.main(module=mod, argv=[sys.argv[0]])
|
||||||
|
|
||||||
|
144
gateware/test_i2c.py
Normal file
144
gateware/test_i2c.py
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
from amaranth import *
|
||||||
|
from i2c import *
|
||||||
|
from amaranth.lib.io import pin_layout
|
||||||
|
from tests import BaseTestClass, provide_testcase_name
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["i2c_layout", "I2CBusSimulator", "TestHarness", "TestCSROperation"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestHarness(Elaboratable):
|
||||||
|
def __init__(self):
|
||||||
|
self.i2c = I2CBusSimulator()
|
||||||
|
|
||||||
|
self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface())
|
||||||
|
self.i2c_target = I2CTarget(self.i2c.create_interface())
|
||||||
|
|
||||||
|
self.start_latch = Signal()
|
||||||
|
self.clear_start = Signal()
|
||||||
|
|
||||||
|
|
||||||
|
def elaborate(self, platform):
|
||||||
|
assert platform is None
|
||||||
|
|
||||||
|
m = Module()
|
||||||
|
|
||||||
|
m.submodules.i2c = self.i2c
|
||||||
|
m.submodules.uut = self.uut
|
||||||
|
m.submodules.i2c_target = self.i2c_target
|
||||||
|
|
||||||
|
m.d.comb += self.i2c_target.address.eq(0xAA >> 1)
|
||||||
|
|
||||||
|
with m.If(self.i2c_target.start):
|
||||||
|
m.d.sync += self.start_latch.eq(self.i2c_target.start)
|
||||||
|
|
||||||
|
with m.If(self.clear_start):
|
||||||
|
m.d.sync += self.start_latch.eq(0)
|
||||||
|
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
class TestCSROperation(BaseTestClass):
|
||||||
|
def setUp(self):
|
||||||
|
self.harness = TestHarness()
|
||||||
|
|
||||||
|
|
||||||
|
def _write_csr(self, bus, index, data):
|
||||||
|
yield bus.addr.eq(index)
|
||||||
|
yield bus.w_stb.eq(1)
|
||||||
|
yield bus.w_data.eq(data)
|
||||||
|
yield Tick()
|
||||||
|
yield bus.w_stb.eq(0)
|
||||||
|
yield Tick()
|
||||||
|
|
||||||
|
|
||||||
|
def _wait_for_signal(self, signal, polarity=False, require_edge=True, timeout=1000):
|
||||||
|
ready_for_edge = not require_edge # If we don't require edge, we can just ignore
|
||||||
|
|
||||||
|
while True:
|
||||||
|
timeout -= 1
|
||||||
|
if timeout == 0:
|
||||||
|
self.fail(f"_wait_for_signal({signal}, {polarity}, {require_edge}, {timeout}, timed out!")
|
||||||
|
|
||||||
|
read = yield signal
|
||||||
|
if read == polarity:
|
||||||
|
if ready_for_edge:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
ready_for_edge = True
|
||||||
|
|
||||||
|
yield Tick()
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE So ideally there are more test cases... but the initiator itself is well tested,
|
||||||
|
# and we only really need it to work for a limited set of use cases, so exhaustive testing
|
||||||
|
# isn't a huge deal. As well, we can cover all valid uses of the signals with one test.
|
||||||
|
@provide_testcase_name
|
||||||
|
def test_operation(self, test_name):
|
||||||
|
def test():
|
||||||
|
#send start
|
||||||
|
yield from self._write_csr(self.harness.uut.bus, 0, 1)
|
||||||
|
|
||||||
|
yield from self._wait_for_signal(self.harness.uut._initiator.busy, require_edge=True)
|
||||||
|
|
||||||
|
# Set data
|
||||||
|
yield from self._write_csr(self.harness.uut.bus, 2, 0xAA)
|
||||||
|
# Write data
|
||||||
|
yield from self._write_csr(self.harness.uut.bus, 0, 1 << 2)
|
||||||
|
|
||||||
|
yield from self._wait_for_signal(self.harness.uut._initiator.busy)
|
||||||
|
|
||||||
|
# First byte has been written
|
||||||
|
did_start = yield self.harness.start_latch
|
||||||
|
self.assertTrue(did_start)
|
||||||
|
|
||||||
|
self._run_test(test, test_name)
|
||||||
|
|
||||||
|
|
||||||
|
i2c_layout = [
|
||||||
|
("sda", pin_layout(1, "io")),
|
||||||
|
("scl", pin_layout(1, "io")),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class I2CBusSimulator(Elaboratable):
|
||||||
|
def __init__(self):
|
||||||
|
self.interfaces = []
|
||||||
|
self.sda = Signal()
|
||||||
|
self.scl = Signal()
|
||||||
|
|
||||||
|
|
||||||
|
def elaborate(self, target):
|
||||||
|
assert target is None, "This bus simulator should never be used in real hardware!"
|
||||||
|
|
||||||
|
n = len(self.interfaces)
|
||||||
|
|
||||||
|
m = Module()
|
||||||
|
|
||||||
|
m.d.comb += self.sda.eq(1)
|
||||||
|
m.d.comb += self.scl.eq(1)
|
||||||
|
|
||||||
|
# TODO maybe output a bus contention signal?
|
||||||
|
# First interfaces get priority over interfaces added after
|
||||||
|
for i in reversed(range(n)):
|
||||||
|
# Emulate bus drivers
|
||||||
|
with m.If(self.interfaces[i].sda.oe):
|
||||||
|
m.d.comb += self.sda.eq(self.interfaces[i].sda.o)
|
||||||
|
with m.If(self.interfaces[i].scl.oe):
|
||||||
|
m.d.comb += self.scl.eq(self.interfaces[i].scl.o)
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Connect inputs to bus value
|
||||||
|
m.d.comb += [
|
||||||
|
self.interfaces[i].sda.i.eq(self.sda),
|
||||||
|
self.interfaces[i].scl.i.eq(self.scl),
|
||||||
|
]
|
||||||
|
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
def create_interface(self) -> Record:
|
||||||
|
new_interface = Record(i2c_layout)
|
||||||
|
self.interfaces.append(new_interface)
|
||||||
|
return new_interface
|
||||||
|
|
45
gateware/tests.py
Normal file
45
gateware/tests.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
"""
|
||||||
|
Set of utilities to build a simple test suite.
|
||||||
|
"""
|
||||||
|
from amaranth import *
|
||||||
|
from amaranth.sim import *
|
||||||
|
|
||||||
|
from typing import Generator
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
|
||||||
|
from contextlib import nullcontext
|
||||||
|
|
||||||
|
class BaseTestClass(unittest.TestCase):
|
||||||
|
"""
|
||||||
|
Base test class that provides a run_test helper function to do all the nice things.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _run_test(self, test: Generator, name: str):
|
||||||
|
try:
|
||||||
|
sim = Simulator(self.harness)
|
||||||
|
except NameError:
|
||||||
|
raise NotImplementedError(f"Must define a self.harness module for TestCase {self.__class__.__name__}!")
|
||||||
|
|
||||||
|
sim.add_clock(100e-9)
|
||||||
|
sim.add_sync_process(test)
|
||||||
|
sim.reset()
|
||||||
|
|
||||||
|
# Pretty hacky way to pass this info in but does it look like I care?
|
||||||
|
if os.environ.get("TEST_SAVE_VCD"):
|
||||||
|
ctx = sim.write_vcd(f"vcd_out/{name}.vcd")
|
||||||
|
else:
|
||||||
|
ctx = nullcontext()
|
||||||
|
|
||||||
|
with ctx:
|
||||||
|
sim.run()
|
||||||
|
|
||||||
|
del sim
|
||||||
|
|
||||||
|
|
||||||
|
def provide_testcase_name(fn):
|
||||||
|
"""Decorator that provides a function with access to its own class and name."""
|
||||||
|
def wrapper(self):
|
||||||
|
fn(self, f"{self.__class__.__name__}.{fn.__name__}")
|
||||||
|
|
||||||
|
return wrapper
|
Loading…
Reference in New Issue
Block a user