125 lines
4.7 KiB
Python
125 lines
4.7 KiB
Python
from amaranth import *
|
|
from amaranth_soc.csr import *
|
|
from amlib.io.i2c import I2CInitiator
|
|
|
|
from amaranth.sim import *
|
|
|
|
from math import floor, log2, ceil
|
|
|
|
|
|
class I2C(Elaboratable):
|
|
"""
|
|
CSR-based I2C master peripheral.
|
|
|
|
Based on the amlib I2CInitiator, simply provides a simple Wishbone register interface.
|
|
I could add more logic and create a proper bus master that handles the main state machine
|
|
by itself, but we don't need performance when running I2C so the time spent on that in
|
|
gateware is not worth it.
|
|
|
|
Parameters
|
|
----------
|
|
:param system_freq:
|
|
System clock frequency, i.e. frequency present on Wishbone bus.
|
|
:param bus_freq:
|
|
Requested I2C frequency. TODO describe how we create this frequency
|
|
:param clk_stretch:
|
|
Passed through to amlib device, whether or not to monitor for slaves clock stretching.
|
|
|
|
Attributes
|
|
----------
|
|
:attr bus:
|
|
CSR bus to access peripheral registers on.
|
|
"""
|
|
def __init__(self, system_freq, bus_freq, pads, clk_stretch: bool = True):
|
|
# Control register, writes to this are strobed into the command interface presented by the
|
|
# amlib implementation
|
|
#
|
|
# Fields:
|
|
# [0]: start - Generate a start or repeated start condition on the bus. Ignored if busy is high
|
|
# [1]: stop - Generate a stop condition on the bus. Ignored if busy is high
|
|
# [2]: write - write a byte out on the bus
|
|
# [3]: read - read a byte from the bus
|
|
# [4]: read_ack - ACK value that gets written out during a read operation
|
|
# [5]: read_ack_en - Hacky solution to determine if we want to save read_ack
|
|
self.CR = Element(6, Element.Access.W, name="I2C_CR")
|
|
|
|
# Status register
|
|
#
|
|
# Fields:
|
|
# [0]: busy - bus is busy operating
|
|
# [1]: ack - an ACK has been received from a bus slave
|
|
# [2]: read_ack - a convenience read field to see value of CR->read_ack
|
|
self.SR = Element(3, Element.Access.R, name="I2C_SR")
|
|
|
|
# Data write register
|
|
#
|
|
# Latches in data to be written when write signal is applied.
|
|
self.DWR = Element(8, Element.Access.W, name="I2C_DWR")
|
|
|
|
# Data read register
|
|
#
|
|
# Only presents valid data after 'read' has started, and once 'busy' is no longer asserted.
|
|
self.DRR = Element(8, Element.Access.R, name="I2C_DRR")
|
|
|
|
# Set up CSR bus
|
|
addr_width = ceil(log2(64)) # Support up to 64 registers just because
|
|
data_width = 8 # 32 bit bus
|
|
self._csr_mux = Multiplexer(addr_width=addr_width, data_width=data_width)
|
|
# TODO export these addresses into some config file
|
|
cr_start, _stop = self._csr_mux.add(self.CR)
|
|
sr_start, _stop = self._csr_mux.add(self.SR)
|
|
dwr_start, _stop = self._csr_mux.add(self.DWR)
|
|
drr_start, _stop = self._csr_mux.add(self.DRR)
|
|
print(f"I2C added. CR 0x{cr_start:x}, SR 0x{sr_start:x}, DWR 0x{dwr_start:x}, DRR 0x{drr_start:x}")
|
|
self.bus = self._csr_mux.bus
|
|
|
|
# Set up I2C initiator submodule
|
|
period_cyc = floor(system_freq / bus_freq)
|
|
self._initiator = I2CInitiator(pads, period_cyc, clk_stretch)
|
|
|
|
def elaborate(self, _platform):
|
|
m = Module()
|
|
|
|
m.submodules.initiator = self._initiator
|
|
m.submodules.csr_mux = self._csr_mux
|
|
|
|
# Strobe control signals in
|
|
with m.If(self.CR.w_stb):
|
|
m.d.sync += [
|
|
self._initiator.start.eq(self.CR.w_data[0]),
|
|
self._initiator.stop.eq(self.CR.w_data[1]),
|
|
self._initiator.write.eq(self.CR.w_data[2]),
|
|
self._initiator.read.eq(self.CR.w_data[3]),
|
|
]
|
|
|
|
# We don't *always* want to change read_ack
|
|
with m.If(self.CR.w_data[5]):
|
|
m.d.sync += self._initiator.ack_i.eq(self.CR.w_data[4])
|
|
|
|
with m.Else():
|
|
m.d.sync += [
|
|
self._initiator.start.eq(0),
|
|
self._initiator.stop.eq(0),
|
|
self._initiator.write.eq(0),
|
|
self._initiator.read.eq(0),
|
|
# We don't strobe ack_i, this is a persistant setting
|
|
]
|
|
|
|
# Status signals may as well just be a continuous assignment
|
|
m.d.comb += [
|
|
self.SR.r_data[0].eq(self._initiator.busy),
|
|
self.SR.r_data[1].eq(self._initiator.ack_o),
|
|
self.SR.r_data[2].eq(self._initiator.ack_i),
|
|
]
|
|
|
|
# Strobe in this byte to send out
|
|
with m.If(self.DWR.w_stb):
|
|
m.d.sync += self._initiator.data_i.eq(self.DWR.w_data)
|
|
|
|
# Read data only becomes valid when not busy (i.e. the read has completed)
|
|
with m.If(~self._initiator.busy):
|
|
m.d.sync += self.DRR.r_data.eq(self._initiator.data_o)
|
|
|
|
return m
|
|
|