Start python library

This commit is contained in:
David Lenfesty 2023-06-03 11:42:56 -06:00
parent 0034d2e9e7
commit 0aef4f295d
4 changed files with 166 additions and 0 deletions

View File

@ -5,6 +5,39 @@ improved passive sonar data acquisition system. The goal of this system is
simply to be able to capture pinger data, filtered by the preprocessor board,
via ethernet, at significantly faster speed than the previous system.
## Pysonar
This provides a library to interface with the FPGA, both pulling data and configuring it. To install, simply `cd`
into this repo and:
```shell
pip install ./
```
A CLI/console to test configuration is provided as `sonar_config`. Usage can be found with
```shell
sonar_config --help
```
*Note: This is a pure python package. In an ideal world, I would have implemented all protocol
implementation in rust, and exported via PyO3, to keep potential changes in sync, but it's not
worth the implementation effort and potential installation difficulty here.*
## Hacky hacks to patch into migen
migen.fhdl.simplify.py (line 81): add the attributeerror exception
```
for port in mem.ports:
try:
sync = f.sync[port.clock.cd]
except KeyError:
sync = f.sync[port.clock.cd] = []
except AttributeError:
sync = f.sync["sys"]
```
## Repo Layout
```

23
pyproject.toml Normal file
View File

@ -0,0 +1,23 @@
# Built from this documentation: https://setuptools.pypa.io/en/latest/userguide/pyproject_config.html
[project]
name = "pysonar"
description = "Library to communicate with ARVP's sonar FPGA"
version = "0.1.0"
authors = [
{ name = "David Lenfesty", email = "lenfesty@ualberta.ca" }
]
requires-python = ">=3.9"
dependencies = ["crc>=4.2"]
[project.scripts]
# Export CLI to configure FPGA
sonar_config = "pysonar:main"
[build-system]
requires = ["setuptools", "setuptools-scm"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
# Just include the python directory
packages = ["pysonar"]

2
pysonar/__init__.py Normal file
View File

@ -0,0 +1,2 @@
def main():
print("Hello world!")

108
pysonar/command_packets.py Normal file
View File

@ -0,0 +1,108 @@
from crc import Calculator, Crc8
from typing import Optional, Tuple
from dataclasses import dataclass
from enum import IntEnum
from struct import pack
PKT_START_SEQUENCE = [0xDE, 0xAD]
class Settings(IntEnum):
TriggerThreshold = 0
TriggerPeriod = 1
DecayValue = 2
DecayPeriod = 3
Gain = 4
CenterFreq = 5
SamplingEnabled = 6
@dataclass
class CommandPacket:
is_write: bool
setting: Settings
value: int
def serialize(self) -> bytearray:
buf = bytearray()
# TODO raise exception if setting is invalid here
buf.extend(PKT_START_SEQUENCE)
command = int(self.setting)
if self.is_write:
command |= 1 << 7
buf.append(command)
buf.extend(pack("<I", self.value))
# Append CRC
crc = Calculator(Crc8.MAXIM_DOW).checksum(buf)
buf.append(crc)
return buf
@dataclass
class ResponsePacket:
# Deserialization
is_error: bool
setting: Settings
value: int
class PacketParser:
def __init__(self):
self.crc = Calculator(Crc8.MAXIM_DOW)
self.reset()
def parse_byte(self, b: int) -> Optional[ResponsePacket]:
byte_index = len(self.packet_data)
if byte_index == 0:
if b != PKT_START_SEQUENCE[0]:
self.reset()
return
elif byte_index == 1:
if b != PKT_START_SEQUENCE[1]:
self.reset()
return
elif byte_index == 7:
# Final CRC byte, check and generate data
packet = None
if self.crc.verify(self.packet_data, b):
packet = self.build_response_packet()
self.reset()
return packet
# Pull packet data in otherwise
self.packet_data.append(b)
def parse_bytearray(self, bytes: bytearray) -> Tuple[bytearray, Optional[ResponsePacket]]:
"""
Parse a bytearray for a packet. Returns a bytearray of bytes that haven't been processed.
"""
for offset, byte in enumerate(bytes):
packet = self.parse_byte(byte)
if packet is not None:
# Return packet and consumed bytearray
return (bytes[offset:], packet)
# No packets found, entire buffer has been read
return (bytearray(), None)
def build_response_packet(self) -> ResponsePacket:
"""Builds a response packet out of the data we have received"""
if len(self.packet_data) < 7:
raise Exception("Invalid amount of packet data received!")
def reset(self):
self.packet_data = bytearray()