from crc import Calculator, Crc8 from typing import Optional, Tuple from dataclasses import dataclass from enum import IntEnum from struct import pack, unpack PKT_START_SEQUENCE = [0xDE, 0xAD] class Settings(IntEnum): TriggerThreshold = 0 TriggerPeriod = 1 DecayValue = 2 DecayPeriod = 3 Gain = 4 CenterFreq = 5 SamplingEnabled = 6 @dataclass class CommandPacket: is_write: bool setting: Settings value: int def serialize(self) -> bytearray: buf = bytearray() # TODO raise exception if setting is invalid here buf.extend(PKT_START_SEQUENCE) command = int(self.setting) if self.is_write: command |= 1 << 7 buf.append(command) buf.extend(pack(" Optional[ResponsePacket]: byte_index = len(self.packet_data) if byte_index == 0: if b != PKT_START_SEQUENCE[0]: self.reset() return elif byte_index == 1: if b != PKT_START_SEQUENCE[1]: self.reset() return elif byte_index == 7: # Final CRC byte, check and generate data packet = None if self.crc.verify(self.packet_data, b): packet = self.build_response_packet() self.reset() return packet # Pull packet data in otherwise self.packet_data.append(b) def parse_bytearray(self, bytes: bytearray) -> Tuple[bytearray, Optional[ResponsePacket]]: """ Parse a bytearray for a packet. Returns a bytearray of bytes that haven't been processed. """ for offset, byte in enumerate(bytes): packet = self.parse_byte(byte) if packet is not None: # Return packet and consumed bytearray return (bytes[offset:], packet) # No packets found, entire buffer has been read return (bytearray(), None) def build_response_packet(self) -> ResponsePacket: """Builds a response packet out of the data we have received""" if len(self.packet_data) < 7: raise Exception("Invalid amount of packet data received!") is_error = self.packet_data[2] & 0x80 != 0 setting = Settings(self.packet_data[2] & 0x7F) value = unpack("