import socket from argparse import ArgumentParser from prompt_toolkit import PromptSession from prompt_toolkit.completion import NestedCompleter from prompt_toolkit.validation import Validator, ValidationError from .command_packets import CommandPacket, Settings, PacketParser settings = { "trigger_threshold": Settings.TriggerThreshold, "trigger_period": Settings.TriggerPeriod, "decay_value": Settings.DecayValue, "decay_period": Settings.DecayPeriod, "gain": Settings.Gain, "center_frequency": Settings.CenterFreq, "sampling_enabled": Settings.SamplingEnabled, "trigger_run_len": Settings.TriggerRunLen, } commands = { "set": set(settings.keys()), "get": set(settings.keys()), "quit": None, "help": None, "?": None, } class CommandValidator(Validator): def validate(self, document): command = document.text.strip().split() if len(command) == 0: return if command[0] not in commands.keys(): raise ValidationError(message="Unrecognized command") if command[0] not in ["set", "get"]: if len(command) > 1: raise ValidationError(message="Too many arguments") else: if command[0] == "set": num_args = 3 else: num_args = 2 if len(command) < num_args: raise ValidationError(message="Not enough arguments") if len(command) > num_args: raise ValidationError(message="Too many arguments") if command[1] not in settings: raise ValidationError(message="Unrecognized setting") if num_args > 2: try: int(command[2]) except ValueError: raise ValidationError(message="Setting value not an integer") def print_help(): print("==== Sonar Configuration CLI ====") print("Commands:") print("\tset - Sets the integer value of a setting") print("\tget - gets the integer value of a setting") print("\tquit - exit this prompt") print("\thelp, ? - this help") print() print("Available Settings:") for setting in settings.keys(): print(f"\t{setting}") print() def send_command(sock, cmd: CommandPacket): """Sends a command, and pretty-prints the response""" sock.send(cmd.serialize()) data = sock.recv(16) parser = PacketParser() data, packet = parser.parse_bytearray(data) if packet is not None: print(packet) else: print("No response received! Must be a bug...") def main(): args = ArgumentParser(prog="sonar_config", description="Configuration utility for ARVP sonar") args.add_argument("IP", type=str, help="IP of sonar system") args.add_argument("--port", "-p", type=int, help="Port of configuration socket", default=2000) args = args.parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((args.IP, args.port)) # Start prompt completer = NestedCompleter.from_nested_dict(commands) validator = CommandValidator() session = PromptSession("> ", completer=completer, validator=validator, validate_while_typing=True) while True: try: command = session.prompt().strip().split() # We assume the command is a valid command at this point, # as long as the validator is doing it's job. Don't # try and validate anything here! if command[0] in ["help", "?"]: print_help() elif command[0] == "get": send_command(sock, CommandPacket(False, settings[command[1]], 0)) elif command[0] == "set": send_command(sock, CommandPacket(True, settings[command[1]], int(command[2]))) elif command[0] == "quit": break except KeyboardInterrupt: # Ignore the current prompt, move on pass except EOFError: # Ctrl-D exits break print("Disconnecting from socket...", end="") # TODO this doesn't really close the socket the way I want it to... # unsure if it's a FW issue, python issue, or weird docker interaction sock.shutdown(socket.SHUT_RDWR) sock.close() print(" Goodbye!")