From 0aef4f295d2b25fa5a459d8c8b6ac4515803e4f7 Mon Sep 17 00:00:00 2001 From: David Lenfesty Date: Sat, 3 Jun 2023 11:42:56 -0600 Subject: [PATCH] Start python library --- README.md | 33 ++++++++++++ pyproject.toml | 23 ++++++++ pysonar/__init__.py | 2 + pysonar/command_packets.py | 108 +++++++++++++++++++++++++++++++++++++ 4 files changed, 166 insertions(+) create mode 100644 pyproject.toml create mode 100644 pysonar/__init__.py create mode 100644 pysonar/command_packets.py diff --git a/README.md b/README.md index a308498..6e1135a 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,39 @@ improved passive sonar data acquisition system. The goal of this system is simply to be able to capture pinger data, filtered by the preprocessor board, via ethernet, at significantly faster speed than the previous system. +## Pysonar + +This provides a library to interface with the FPGA, both pulling data and configuring it. To install, simply `cd` +into this repo and: + +```shell +pip install ./ +``` + +A CLI/console to test configuration is provided as `sonar_config`. Usage can be found with + +```shell +sonar_config --help +``` + +*Note: This is a pure python package. In an ideal world, I would have implemented all protocol +implementation in rust, and exported via PyO3, to keep potential changes in sync, but it's not +worth the implementation effort and potential installation difficulty here.* + +## Hacky hacks to patch into migen + +migen.fhdl.simplify.py (line 81): add the attributeerror exception + +``` + for port in mem.ports: + try: + sync = f.sync[port.clock.cd] + except KeyError: + sync = f.sync[port.clock.cd] = [] + except AttributeError: + sync = f.sync["sys"] +``` + ## Repo Layout ``` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..73e2fff --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +# Built from this documentation: https://setuptools.pypa.io/en/latest/userguide/pyproject_config.html + +[project] +name = "pysonar" +description = "Library to communicate with ARVP's sonar FPGA" +version = "0.1.0" +authors = [ + { name = "David Lenfesty", email = "lenfesty@ualberta.ca" } +] +requires-python = ">=3.9" +dependencies = ["crc>=4.2"] + +[project.scripts] +# Export CLI to configure FPGA +sonar_config = "pysonar:main" + +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +# Just include the python directory +packages = ["pysonar"] diff --git a/pysonar/__init__.py b/pysonar/__init__.py new file mode 100644 index 0000000..1568639 --- /dev/null +++ b/pysonar/__init__.py @@ -0,0 +1,2 @@ +def main(): + print("Hello world!") \ No newline at end of file diff --git a/pysonar/command_packets.py b/pysonar/command_packets.py new file mode 100644 index 0000000..efe98a1 --- /dev/null +++ b/pysonar/command_packets.py @@ -0,0 +1,108 @@ +from crc import Calculator, Crc8 +from typing import Optional, Tuple +from dataclasses import dataclass +from enum import IntEnum +from struct import pack + + +PKT_START_SEQUENCE = [0xDE, 0xAD] + + +class Settings(IntEnum): + TriggerThreshold = 0 + TriggerPeriod = 1 + DecayValue = 2 + DecayPeriod = 3 + Gain = 4 + CenterFreq = 5 + SamplingEnabled = 6 + + +@dataclass +class CommandPacket: + is_write: bool + setting: Settings + value: int + + def serialize(self) -> bytearray: + buf = bytearray() + + # TODO raise exception if setting is invalid here + buf.extend(PKT_START_SEQUENCE) + command = int(self.setting) + if self.is_write: + command |= 1 << 7 + buf.append(command) + buf.extend(pack(" Optional[ResponsePacket]: + byte_index = len(self.packet_data) + + if byte_index == 0: + if b != PKT_START_SEQUENCE[0]: + self.reset() + return + + elif byte_index == 1: + if b != PKT_START_SEQUENCE[1]: + self.reset() + return + + elif byte_index == 7: + # Final CRC byte, check and generate data + packet = None + if self.crc.verify(self.packet_data, b): + packet = self.build_response_packet() + + self.reset() + return packet + + # Pull packet data in otherwise + self.packet_data.append(b) + + + def parse_bytearray(self, bytes: bytearray) -> Tuple[bytearray, Optional[ResponsePacket]]: + """ + Parse a bytearray for a packet. Returns a bytearray of bytes that haven't been processed. + """ + for offset, byte in enumerate(bytes): + packet = self.parse_byte(byte) + + if packet is not None: + # Return packet and consumed bytearray + return (bytes[offset:], packet) + + # No packets found, entire buffer has been read + return (bytearray(), None) + + + def build_response_packet(self) -> ResponsePacket: + """Builds a response packet out of the data we have received""" + if len(self.packet_data) < 7: + raise Exception("Invalid amount of packet data received!") + + + def reset(self): + self.packet_data = bytearray()