new-sonar/gateware/i2c.py
David Lenfesty 490b92b1fd gateware: first work with I2C peripheral
Basic details seem to be working, and I'm on my way with testing.

Next steps:
- Build out more local testing utilities
- Build out unittest infrastructure
- Flesh out more tests to verify operation
2023-01-29 16:54:29 -07:00

251 lines
7.9 KiB
Python

from amaranth import *
from amaranth.lib.coding import PriorityEncoder
from amaranth_soc.csr import *
from amaranth_soc.memory import *
from amlib.io.i2c import I2CInitiator, I2CTarget
from amaranth.sim import *
from math import floor, log2, ceil
class I2C(Elaboratable):
"""
CSR-based I2C master peripheral.
Based on the amlib I2CInitiator, simply provides a simple Wishbone register interface.
I could add more logic and create a proper bus master that handles the main state machine
by itself, but we don't need performance when running I2C so the time spent on that in
gateware is not worth it.
Parameters
----------
:param system_freq:
System clock frequency, i.e. frequency present on Wishbone bus.
:param bus_freq:
Requested I2C frequency. TODO describe how we create this frequency
:param clk_stretch:
Passed through to amlib device, whether or not to monitor for slaves clock stretching.
Attributes
----------
:attr bus:
CSR bus to access peripheral registers on.
"""
def __init__(self, system_freq, bus_freq, pads, clk_stretch: bool = True):
# Control register, writes to this are strobed into the command interface presented by the
# amlib implementation
#
# Fields:
# [0]: start - Generate a start or repeated start condition on the bus. Ignored if busy is high
# [1]: stop - Generate a stop condition on the bus. Ignored if busy is high
# [2]: write - write a byte out on the bus
# [3]: read - read a byte from the bus
# [4]: read_ack - ACK value that gets written out during a read operation
# [5]: read_ack_en - Hacky solution to determine if we want to save read_ack
self.CR = Element(6, Element.Access.W, name="CR")
# Status register
#
# Fields:
# [0]: busy - bus is busy operating
# [1]: ack - an ACK has been received from a bus slave
# [2]: read_ack - a convenience read field to see value of CR->read_ack
self.SR = Element(3, Element.Access.R, name="SR")
# Data write register
#
# Latches in data to be written when write signal is applied.
self.DWR = Element(8, Element.Access.W, name="DWR")
# Data read register
#
# Only presents valid data after 'read' has started, and once 'busy' is no longer asserted.
self.DRR = Element(8, Element.Access.R, name="DRR")
# Set up CSR bus
addr_width = ceil(log2(64)) # Support up to 64 registers just because
data_width = 32 # 32 bit bus
self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width)
# TODO export the addresses of these somehow
self._csr_mux.add(self.CR)
self._csr_mux.add(self.SR)
self._csr_mux.add(self.DWR)
self._csr_mux.add(self.DRR)
self.bus = self._csr_mux.bus
# Set up I2C initiator submodule
period_cyc = floor(system_freq / bus_freq)
self._initiator = I2CInitiator(pads, period_cyc, clk_stretch)
def elaborate(self, _platform):
m = Module()
m.submodules.initiator = self._initiator
m.submodules.csr_mux = self._csr_mux
# Strobe control signals in
with m.If(self.CR.w_stb):
m.d.sync += [
self._initiator.start.eq(self.CR.w_data[0]),
self._initiator.stop.eq(self.CR.w_data[1]),
self._initiator.write.eq(self.CR.w_data[2]),
self._initiator.read.eq(self.CR.w_data[3]),
]
# We don't *always* want to change read_ack
with m.If(self.CR.w_data[5]):
m.d.sync += self._initiator.ack_i.eq(self.CR.w_data[4])
with m.Else():
m.d.sync += [
self._initiator.start.eq(0),
self._initiator.stop.eq(0),
self._initiator.write.eq(0),
self._initiator.read.eq(0),
# We don't strobe ack_i, this is a persistant setting
]
# Status signals may as well just be a continuous assignment
self.SR.r_data[0].eq(self._initiator.busy)
self.SR.r_data[1].eq(self._initiator.ack_o)
self.SR.r_data[2].eq(self._initiator.ack_i)
# Strobe in this byte to send out
with m.If(self.DWR.w_stb):
m.d.sync += self._initiator.data_i.eq(self.DWR.w_data)
# Read data only becomes valid when not busy (i.e. the read has completed)
with m.If(~self._initiator.busy):
m.d.sync += self.DRR.r_data.eq(self._initiator.data_o)
return m
from amaranth.lib.io import pin_layout
i2c_layout = [
("sda", pin_layout(1, "io")),
("scl", pin_layout(1, "io")),
]
class I2CBusSimulator(Elaboratable):
def __init__(self):
self.interfaces = []
self.sda = Signal()
self.scl = Signal()
def elaborate(self, target):
assert target is None, "This bus simulator should never be used in real hardware!"
n = len(self.interfaces)
m = Module()
m.d.comb += self.sda.eq(1)
m.d.comb += self.scl.eq(1)
# TODO maybe output a bus contention signal?
# First interfaces get priority over interfaces added after
for i in reversed(range(n)):
# Emulate bus drivers
with m.If(self.interfaces[i].sda.oe):
m.d.comb += self.sda.eq(self.interfaces[i].sda.o)
with m.If(self.interfaces[i].scl.oe):
m.d.comb += self.scl.eq(self.interfaces[i].scl.o)
pass
# Connect inputs to bus value
m.d.comb += [
self.interfaces[i].sda.i.eq(self.sda),
self.interfaces[i].scl.i.eq(self.scl),
]
return m
def create_interface(self) -> Record:
new_interface = Record(i2c_layout)
self.interfaces.append(new_interface)
return new_interface
class TestHarness(Elaboratable):
def __init__(self):
self.i2c = I2CBusSimulator()
self.uut = I2C(10_000_000, 100_000, self.i2c.create_interface())
self.i2c_target = I2CTarget(self.i2c.create_interface())
def elaborate(self, platform):
assert platform is None
m = Module()
m.submodules.i2c = self.i2c
m.submodules.uut = self.uut
m.submodules.i2c_target = self.i2c_target
m.d.comb += self.i2c_target.address.eq(0xAA >> 1)
return m
# TODO switch to unittest or something
if __name__ == "__main__":
def write_csr(bus, index, data):
yield bus.addr.eq(index)
yield bus.w_stb.eq(1)
yield bus.w_data.eq(data)
yield Tick()
yield bus.w_stb.eq(0)
yield Tick()
def read_csr(bus, index):
yield bus.r_strb.eq(1)
ret = yield bus.r_data
yield Tick()
yield bus.r_stb.eq(0)
yield Tick()
harness = TestHarness()
sim = Simulator(harness)
def test_proc():
#send start
yield from write_csr(harness.uut.bus, 0, 1)
# TODO shouldn't need to do this, if I did a proper CSR read maybe?
yield Tick()
yield Tick()
# TODO I want something like read_csr, unsure how to implement
# Wait for
while True:
busy = yield harness.uut._initiator.busy
if not busy:
break
else:
yield Tick()
# Set data
yield from write_csr(harness.uut.bus, 2, 0xAA)
# Write data
yield from write_csr(harness.uut.bus, 0, 1 << 2)
did_start = False
for i in range(1000):
start_cond = yield harness.i2c_target.start
yield Tick()
if start_cond:
did_start = True
break
#assert did_start, "Idnaoidnwaioudnwaoiun"
print(did_start)
sim.add_clock(100e-9)
sim.add_sync_process(test_proc)
with sim.write_vcd('test.vcd'):
sim.reset()
sim.run()