new-sonar/pysonar/command_packets.py

115 lines
2.8 KiB
Python

from crc import Calculator, Crc8
from typing import Optional, Tuple
from dataclasses import dataclass
from enum import IntEnum
from struct import pack, unpack
PKT_START_SEQUENCE = [0xDE, 0xAD]
class Settings(IntEnum):
TriggerThreshold = 0
TriggerPeriod = 1
DecayValue = 2
DecayPeriod = 3
Gain = 4
CenterFreq = 5
SamplingEnabled = 6
TriggerRunLen = 7
@dataclass
class CommandPacket:
is_write: bool
setting: Settings
value: int
def serialize(self) -> bytearray:
buf = bytearray()
# TODO raise exception if setting is invalid here
buf.extend(PKT_START_SEQUENCE)
command = int(self.setting)
if self.is_write:
command |= 1 << 7
buf.append(command)
buf.extend(pack("<I", self.value))
# Append CRC
crc = Calculator(Crc8.MAXIM_DOW).checksum(buf)
buf.append(crc)
return buf
@dataclass
class ResponsePacket:
# Deserialization
is_error: bool
setting: Settings
value: int
class PacketParser:
def __init__(self):
self.crc = Calculator(Crc8.MAXIM_DOW)
self.reset()
def parse_byte(self, b: int) -> Optional[ResponsePacket]:
byte_index = len(self.packet_data)
if byte_index == 0:
if b != PKT_START_SEQUENCE[0]:
self.reset()
return
elif byte_index == 1:
if b != PKT_START_SEQUENCE[1]:
self.reset()
return
elif byte_index == 7:
# Final CRC byte, check and generate data
packet = None
if self.crc.verify(self.packet_data, b):
packet = self.build_response_packet()
self.reset()
return packet
# Pull packet data in otherwise
self.packet_data.append(b)
def parse_bytearray(self, bytes: bytearray) -> Tuple[bytearray, Optional[ResponsePacket]]:
"""
Parse a bytearray for a packet. Returns a bytearray of bytes that haven't been processed.
"""
for offset, byte in enumerate(bytes):
packet = self.parse_byte(byte)
if packet is not None:
# Return packet and consumed bytearray
return (bytes[offset:], packet)
# No packets found, entire buffer has been read
return (bytearray(), None)
def build_response_packet(self) -> ResponsePacket:
"""Builds a response packet out of the data we have received"""
if len(self.packet_data) < 7:
raise Exception("Invalid amount of packet data received!")
is_error = self.packet_data[2] & 0x80 != 0
setting = Settings(self.packet_data[2] & 0x7F)
value = unpack("<I", self.packet_data[3:7])
return ResponsePacket(is_error, setting, value)
def reset(self):
self.packet_data = bytearray()