136 lines
4.3 KiB
Python
136 lines
4.3 KiB
Python
import socket
|
|
from argparse import ArgumentParser
|
|
|
|
from prompt_toolkit import PromptSession
|
|
from prompt_toolkit.completion import NestedCompleter
|
|
from prompt_toolkit.validation import Validator, ValidationError
|
|
|
|
from .command_packets import CommandPacket, Settings, PacketParser
|
|
|
|
|
|
settings = {
|
|
"trigger_threshold": Settings.TriggerThreshold,
|
|
"trigger_period": Settings.TriggerPeriod,
|
|
"decay_value": Settings.DecayValue,
|
|
"decay_period": Settings.DecayPeriod,
|
|
"gain": Settings.Gain,
|
|
"center_frequency": Settings.CenterFreq,
|
|
"sampling_enabled": Settings.SamplingEnabled,
|
|
"trigger_run_len": Settings.TriggerRunLen,
|
|
}
|
|
|
|
|
|
commands = {
|
|
"set": set(settings.keys()),
|
|
"get": set(settings.keys()),
|
|
"quit": None,
|
|
"help": None,
|
|
"?": None,
|
|
}
|
|
|
|
|
|
class CommandValidator(Validator):
|
|
def validate(self, document):
|
|
command = document.text.strip().split()
|
|
if len(command) == 0:
|
|
return
|
|
|
|
if command[0] not in commands.keys():
|
|
raise ValidationError(message="Unrecognized command")
|
|
|
|
if command[0] not in ["set", "get"]:
|
|
if len(command) > 1:
|
|
raise ValidationError(message="Too many arguments")
|
|
else:
|
|
if command[0] == "set":
|
|
num_args = 3
|
|
else:
|
|
num_args = 2
|
|
|
|
if len(command) < num_args:
|
|
raise ValidationError(message="Not enough arguments")
|
|
|
|
if len(command) > num_args:
|
|
raise ValidationError(message="Too many arguments")
|
|
|
|
if command[1] not in settings:
|
|
raise ValidationError(message="Unrecognized setting")
|
|
|
|
if num_args > 2:
|
|
try:
|
|
int(command[2])
|
|
except ValueError:
|
|
raise ValidationError(message="Setting value not an integer")
|
|
|
|
|
|
def print_help():
|
|
print("==== Sonar Configuration CLI ====")
|
|
print("Commands:")
|
|
print("\tset <setting> <value> - Sets the integer value of a setting")
|
|
print("\tget <setting> - gets the integer value of a setting")
|
|
print("\tquit - exit this prompt")
|
|
print("\thelp, ? - this help")
|
|
print()
|
|
print("Available Settings:")
|
|
for setting in settings.keys():
|
|
print(f"\t{setting}")
|
|
print()
|
|
|
|
|
|
def send_command(sock, cmd: CommandPacket):
|
|
"""Sends a command, and pretty-prints the response"""
|
|
sock.send(cmd.serialize())
|
|
|
|
data = sock.recv(16)
|
|
parser = PacketParser()
|
|
data, packet = parser.parse_bytearray(data)
|
|
if packet is not None:
|
|
print(packet)
|
|
else:
|
|
print("No response received! Must be a bug...")
|
|
|
|
|
|
def main():
|
|
args = ArgumentParser(prog="sonar_config", description="Configuration utility for ARVP sonar")
|
|
args.add_argument("IP", type=str, help="IP of sonar system")
|
|
args.add_argument("--port", "-p", type=int, help="Port of configuration socket", default=2000)
|
|
args = args.parse_args()
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((args.IP, args.port))
|
|
|
|
# Start prompt
|
|
completer = NestedCompleter.from_nested_dict(commands)
|
|
validator = CommandValidator()
|
|
session = PromptSession("> ", completer=completer, validator=validator, validate_while_typing=True)
|
|
while True:
|
|
try:
|
|
command = session.prompt().strip().split()
|
|
# We assume the command is a valid command at this point,
|
|
# as long as the validator is doing it's job. Don't
|
|
# try and validate anything here!
|
|
|
|
if command[0] in ["help", "?"]:
|
|
print_help()
|
|
elif command[0] == "get":
|
|
send_command(sock, CommandPacket(False, settings[command[1]], 0))
|
|
elif command[0] == "set":
|
|
send_command(sock, CommandPacket(True, settings[command[1]], int(command[2])))
|
|
elif command[0] == "quit":
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
# Ignore the current prompt, move on
|
|
pass
|
|
|
|
except EOFError:
|
|
# Ctrl-D exits
|
|
break
|
|
|
|
print("Disconnecting from socket...", end="")
|
|
# TODO this doesn't really close the socket the way I want it to...
|
|
# unsure if it's a FW issue, python issue, or weird docker interaction
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
sock.close()
|
|
print(" Goodbye!")
|