diff --git a/pyproject.toml b/pyproject.toml index 73e2fff..053681a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ authors = [ { name = "David Lenfesty", email = "lenfesty@ualberta.ca" } ] requires-python = ">=3.9" -dependencies = ["crc>=4.2"] +dependencies = ["crc>=4.2", "prompt-toolkit>=3.0"] [project.scripts] # Export CLI to configure FPGA diff --git a/pysonar/__init__.py b/pysonar/__init__.py index d0b9020..8bba107 100644 --- a/pysonar/__init__.py +++ b/pysonar/__init__.py @@ -1,17 +1,134 @@ -IP = "192.168.88.69" -PORT = 2000 - import socket +from argparse import ArgumentParser + +from prompt_toolkit import PromptSession +from prompt_toolkit.completion import NestedCompleter +from prompt_toolkit.validation import Validator, ValidationError + from .command_packets import CommandPacket, Settings, PacketParser + +settings = { + "trigger_threshold": Settings.TriggerThreshold, + "trigger_period": Settings.TriggerPeriod, + "decay_value": Settings.DecayValue, + "decay_period": Settings.DecayPeriod, + "gain": Settings.Gain, + "center_frequency": Settings.CenterFreq, + "sampling_enabled": Settings.SamplingEnabled, +} + + +commands = { + "set": set(settings.keys()), + "get": set(settings.keys()), + "quit": None, + "help": None, + "?": None, +} + + +class CommandValidator(Validator): + def validate(self, document): + command = document.text.strip().split() + if len(command) == 0: + return + + if command[0] not in commands.keys(): + raise ValidationError(message="Unrecognized command") + + if command[0] not in ["set", "get"]: + if len(command) > 1: + raise ValidationError(message="Too many arguments") + else: + if command[0] == "set": + num_args = 3 + else: + num_args = 2 + + if len(command) < num_args: + raise ValidationError(message="Not enough arguments") + + if len(command) > num_args: + raise ValidationError(message="Too many arguments") + + if command[1] not in settings: + raise ValidationError(message="Unrecognized setting") + + if num_args > 2: + try: + int(command[2]) + except ValueError: + raise ValidationError(message="Setting value not an integer") + + +def print_help(): + print("==== Sonar Configuration CLI ====") + print("Commands:") + print("\tset - Sets the integer value of a setting") + print("\tget - gets the integer value of a setting") + print("\tquit - exit this prompt") + print("\thelp, ? - this help") + print() + print("Available Settings:") + for setting in settings.keys(): + print(f"\t{setting}") + print() + + +def send_command(sock, cmd: CommandPacket): + """Sends a command, and pretty-prints the response""" + sock.send(cmd.serialize()) + + data = sock.recv(16) + parser = PacketParser() + data, packet = parser.parse_bytearray(data) + if packet is not None: + print(packet) + else: + print("No response received! Must be a bug...") + + def main(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.connect((IP, PORT)) - sock.send(CommandPacket(False, Settings.Gain, 125).serialize()) + args = ArgumentParser(prog="sonar_config", description="Configuration utility for ARVP sonar") + args.add_argument("IP", type=str, help="IP of sonar system") + args.add_argument("--port", "-p", type=int, help="Port of configuration socket", default=2000) + args = args.parse_args() - data = sock.recv(1024) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((args.IP, args.port)) - parser = PacketParser() - data, packet = parser.parse_bytearray(data) - if packet is not None: - print(packet) + # Start prompt + completer = NestedCompleter.from_nested_dict(commands) + validator = CommandValidator() + session = PromptSession("> ", completer=completer, validator=validator, validate_while_typing=True) + while True: + try: + command = session.prompt().strip().split() + # We assume the command is a valid command at this point, + # as long as the validator is doing it's job. Don't + # try and validate anything here! + + if command[0] in ["help", "?"]: + print_help() + elif command[0] == "get": + send_command(sock, CommandPacket(False, settings[command[1]], 0)) + elif command[0] == "set": + send_command(sock, CommandPacket(True, settings[command[1]], int(command[2]))) + elif command[0] == "quit": + break + + except KeyboardInterrupt: + # Ignore the current prompt, move on + pass + + except EOFError: + # Ctrl-D exits + break + + print("Disconnecting from socket...", end="") + # TODO this doesn't really close the socket the way I want it to... + # unsure if it's a FW issue, python issue, or weird docker interaction + sock.shutdown(socket.SHUT_RDWR) + sock.close() + print(" Goodbye!") diff --git a/pysonar/command_packets.py b/pysonar/command_packets.py index efe98a1..d5db2f5 100644 --- a/pysonar/command_packets.py +++ b/pysonar/command_packets.py @@ -2,7 +2,7 @@ from crc import Calculator, Crc8 from typing import Optional, Tuple from dataclasses import dataclass from enum import IntEnum -from struct import pack +from struct import pack, unpack PKT_START_SEQUENCE = [0xDE, 0xAD] @@ -103,6 +103,11 @@ class PacketParser: if len(self.packet_data) < 7: raise Exception("Invalid amount of packet data received!") + is_error = self.packet_data[2] & 0x80 != 0 + setting = Settings(self.packet_data[2] & 0x7F) + value = unpack("