Skip to content
Snippets Groups Projects
Commit 3a636512 authored by Jon Cefalu's avatar Jon Cefalu
Browse files

Mission Execution Engine implementation in python, which takes AVCL files as...

Mission Execution Engine implementation in python, which takes AVCL files as input, and parses and runs them.
parent 000ef1bc
No related branches found
No related tags found
1 merge request!2Mission Execution Engine implementation in python, which takes AVCL files as...
......@@ -673,10 +673,15 @@ POSSIBILITY OF SUCH DAMAGE.
<exec executable="python" dir="." vmlauncher="false">
<arg value="--version"/>
</exec>
<!-- Note that Ant will not work with interactive command-line invocation of missions -->
<!-- TODO Jon will add single-mission and all-mission execution options -->
<exec executable="python" dir="." vmlauncher="false">
<arg value="missions/python/MissionExecutionEngine.py"/>
<arg value="--help"/>
</exec>
<exec executable="python" dir="." vmlauncher="false">
<arg value="missions/python/MissionExecutionEngine.py"/>
<arg value="--exhaustively-test-all-standard-library-missions"/>
<arg value="--exhaustive-testing-max-runs-per-mission"/>
<arg value="2000"/>
</exec>
<!-- verbose results are OK, they go into the build log for ongoing comparison/confirmation -->
</target>
......
from __future__ import annotations # this is necessary to avoid this issue: https://stackoverflow.com/questions/33837918/type-hints-solve-circular-dependency
# Please use the latest python version (which as of this writing is 3.9.0) to make sure you have all of the security patches!
import sys; assert sys.version_info >= (3, 0, 0) # checks python version (major, minor, patch)
import os
import argparse
from copy import deepcopy
from xml.etree import ElementTree as ET
import typing
from typing import Set, List, Dict, Optional
# import typeguard # TODO add the required method @decorators to activate typeguard
# Warning about unsafe XML files:
# https://docs.python.org/3/library/xml.html#xml-vulnerabilities
# TODO consider using defusedxml instead of xml.etree: https://pypi.org/project/defusedxml/
# ---------------------------------------------------------------------------------------- #
# Begin Universal Mission Execution Engine #
# ---------------------------------------------------------------------------------------- #
# The idea with this Command class is that currently it only contains a human readable order for a human to execute manually,
# but in the future it could contain an instruction to a robot instead.
# This can be achieved by replacing the internal details of the Command class without having to affect the code of the MissionExecutionEngine.
class Command:
human_readable_order: str
def __init__(self, human_readable_order: str) -> None:
self.human_readable_order = human_readable_order
class Outcome:
name: str # "s", "f", or "x"
index: int # 0 , 1 , or 2 respectively.
def __init__(self, name: str, index: int) -> None:
self.name = name
self.index = index
#
OUTCOME_SUCCESS: Outcome = Outcome("s", 0)
OUTCOME_FAILURE: Outcome = Outcome("f", 1)
OUTCOME_EXCEPTION: Outcome = Outcome("x", 2)
# An allowed transition between one MissionPhase and a valid subsequent phase.
class PhaseTransition:
description: str
next_phase: Optional[MissionPhase] # next_phase is of type Optional[MissionPhase] because it can be null. It's null if the current phase is a terminal phase.
def __init__(self, description: str, next_phase: Optional[MissionPhase]) -> None:
self.description = description
self.next_phase = next_phase
class SuccessorList(List[PhaseTransition]):
NULL_PLACEHOLDER: PhaseTransition = PhaseTransition("NULL", None)
def __init__(self):
self.append(self.NULL_PLACEHOLDER)
self.append(self.NULL_PLACEHOLDER)
self.append(self.NULL_PLACEHOLDER)
assert len(self) == 3
def validate(self):
# We need the SuccessorList to have length of exactly 3, corresponding (in order) to OUTCOME_SUCCESS, OUTCOME_FAILURE, and OUTCOME_EXCEPTION.
assert len(self) == 3
assert self.NULL_PLACEHOLDER not in self # Make sure the NULL_PLACEHOLDER is not still an element of the list.
is_valid_nonterminal_phase = (self[0].next_phase is not None)\
and (self[1].next_phase is not None)\
and (self[2].next_phase is not None)
is_valid_terminal_phase = (self[0].next_phase is None)\
and (self[1].next_phase is None)\
and (self[2].next_phase is None)
assert is_valid_nonterminal_phase or is_valid_terminal_phase
class MissionPhase:
command: Command
succesor_list: SuccessorList
# We need to use a custom "initialize" method instead of the actual constructor ("__init__")
# because we are passing in subsequent MissionPhases as parameters,
# which means that if we passed the parameters in the constructor,
# the user would have to specify the mission in reverse, starting with the terminal_phases and
# working backward to the first_phase, in order to have object handles to pass into constructors as needed.
# Using our own "initialize" method instead allows the user to construct the MissionPhase objects in forward order,
# and then initialize all of them in forward order.
def initialize(self, command: Command, upon_success: Optional[MissionPhase],
upon_failure: Optional[MissionPhase],
upon_exception: Optional[MissionPhase]) -> None:
self.command = command
success_phase_transition: PhaseTransition = PhaseTransition("Success.", upon_success)
failure_phase_transition: PhaseTransition = PhaseTransition("Failed.", upon_failure)
exception_phase_transition: PhaseTransition = PhaseTransition("Exception.", upon_exception)
self.succesor_list = SuccessorList()
self.succesor_list[OUTCOME_SUCCESS.index] = success_phase_transition
self.succesor_list[OUTCOME_FAILURE.index] = failure_phase_transition
self.succesor_list[OUTCOME_EXCEPTION.index] = exception_phase_transition
self.succesor_list.validate()
# TODO this is a new data structure; document it.
class MissionOrders:
all_phases: Set[MissionPhase]
first_phase: MissionPhase
terminal_phases: Set[MissionPhase]
# If validation fails, this method should throw an exception.
def validate(self) -> None:
# Validation Step 1:
# Validate that all terminal phases have no outgoing transitions,
# and that all non-terminal phases have three outgoing transitions.
p: MissionPhase
for p in self.all_phases:
if p in self.terminal_phases:
assert p.succesor_list[0].next_phase is None
assert p.succesor_list[1].next_phase is None
assert p.succesor_list[2].next_phase is None
else:
# non-terminal phase
assert p.succesor_list[0].next_phase is not None
assert p.succesor_list[1].next_phase is not None
assert p.succesor_list[2].next_phase is not None
# Validation Step 2:
# TODO additional forms of validation, such as:
# - Ensuring that there are no cycles. (optional)
# - Ensuring that there are no unreachable phases. (optional but HIGHLY recommended, because an unreachable phase is likely a mistake.)
# TODO document this class.
class RBM:
strategic_level: MissionExecutionEngine
tactical_level: TacticalEngine
execution_level: ExecutionEngine
strat_tac_link_busy: bool
tac_exec_link_busy: bool
def __init__(self, strategic_level: MissionExecutionEngine, tactical_level: TacticalEngine, execution_level: ExecutionEngine) -> None:
self.strategic_level = strategic_level
self.tactical_level = tactical_level
self.execution_level = execution_level
self.strat_tac_link_busy = False
self.tac_exec_link_busy = False
def ExecuteMission(self) -> None:
self.strategic_level.execute_mission()
# Returns whether the command was successfully sent.
# After calling this function to issue a Command,
# the Strategic Level of the RBM must next call StratLvl_WaitForOutcomeOfCommand.
def StratLvl_SendCommandTo_TacLvl(self, command: Command, phase: MissionPhase) -> bool:
assert self.strat_tac_link_busy == False
self.strat_tac_link_busy = True
command_transmitted_successfully: bool = self.tactical_level.carry_out_command(command, phase)
if not command_transmitted_successfully:
self.strat_tac_link_busy = False
return command_transmitted_successfully
# Blocks until the Command issued with StratLvl_SendCommandTo_TacLvl has finished running.
# Then, returns the Outcome.
def StratLvl_WaitForOutcomeOfCommand(self) -> Outcome:
assert self.strat_tac_link_busy == True
self.strat_tac_link_busy = False
outcome: Outcome = self.tactical_level.wait_for_outcome_of_command()
return outcome
""" Link between tactical_level and execution_level not implemented. """
# end class RBM
# This represents the Strategic Level of the RBM.
class MissionExecutionEngine:
rbm: RBM
mission_orders: MissionOrders
previous_execution_phase: Optional[MissionPhase] # Optional because it is initially None when the mission first starts, and there is no previous phase yet.
current_execution_phase: MissionPhase
current_phase_outcome: Optional[Outcome]
successor_list_index: int
def __init__(self, mission_orders: MissionOrders) -> None:
self.mission_orders = mission_orders
self.previous_execution_phase = None
self.current_execution_phase = mission_orders.first_phase
self.current_phase_outcome = None
self.successor_list_index = -1
def connect_to_RBM(self, rbm: RBM) -> None:
self.rbm = rbm
def execute_phase(self, is_terminal: bool) -> None:
if is_terminal: assert self.current_execution_phase in self.mission_orders.terminal_phases
else: assert self.current_execution_phase not in self.mission_orders.terminal_phases
command_acknowledged: bool = \
self.rbm.StratLvl_SendCommandTo_TacLvl(self.current_execution_phase.command,
self.current_execution_phase)
outcome: Outcome
if command_acknowledged:
outcome = self.rbm.StratLvl_WaitForOutcomeOfCommand()
else:
outcome = OUTCOME_EXCEPTION
self.current_phase_outcome = outcome
if not is_terminal:
self.set_successor_list_index(outcome)
self.set_next_phase()
def set_successor_list_index(self, external_agent_response: Outcome) -> None:
self.successor_list_index = external_agent_response.index
def set_next_phase(self) -> None:
succesor_list: SuccessorList = self.current_execution_phase.succesor_list
phase_transition: PhaseTransition = succesor_list[self.successor_list_index]
self.previous_execution_phase = self.current_execution_phase
assert phase_transition.next_phase is not None
self.current_execution_phase = phase_transition.next_phase
self.current_phase_outcome = None
self.successor_list_index = -1
def execute_mission(self) -> None:
self.mission_orders.validate()
while self.current_execution_phase not in self.mission_orders.terminal_phases:
self.execute_phase(is_terminal=False)
self.execute_phase(is_terminal=True)
# end class MissionExecutionEngine
class TacticalEngine: # Abstract base class.
def connect_to_RBM(self, rbm: RBM) -> None:
raise NotImplementedError()
def carry_out_command(self, command: Command, phase: MissionPhase) -> bool: # Return true if the Command was successfully received & understood.
raise NotImplementedError()
def wait_for_outcome_of_command(self) -> Outcome:
raise NotImplementedError()
class HumanInTheLoopTacticalEngine(TacticalEngine):
rbm: RBM
def connect_to_RBM(self, rbm: RBM) -> None:
self.rbm = rbm
def carry_out_command(self, command: Command, phase: MissionPhase) -> bool: # Return true if the Command was successfully received & understood.
print(command.human_readable_order)
return True # command received & acknowledged
def wait_for_outcome_of_command(self) -> Outcome:
while True:
user_input: str = input("Did goal succeed (s), fail (f), or abort (x)? ")
possible_outcome: Outcome
for possible_outcome in [OUTCOME_SUCCESS, OUTCOME_FAILURE, OUTCOME_EXCEPTION]:
if user_input == possible_outcome.name:
return possible_outcome
# Uses depth-first search to exhaustively test the mission.
class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
rbm: RBM
path_currently_being_explored: str
fully_explored_paths: Dict[str, bool]
did_explore_all_paths_in_this_mission: bool
phases_leading_to_potential_cycle: List[MissionPhase] # For detecting cycles (a.k.a. infinite loops)
this_path_is_cycle: bool
all_paths_count: int
infinite_loop_paths_count: int
max_runs_limit: Optional[int]
printing_function: function
def __init__(self, printing_function: function, max_runs_limit: Optional[int]):
self.path_currently_being_explored = ""
self.fully_explored_paths = {}
self.did_explore_all_paths_in_this_mission = False
self.phases_leading_to_potential_cycle = []
self.this_path_is_cycle = False
self.all_paths_count = 0
self.infinite_loop_paths_count = 0
self.max_runs_limit = max_runs_limit
self.printing_function = printing_function
def connect_to_RBM(self, rbm: RBM) -> None:
self.rbm = rbm
def carry_out_command(self, command: Command, phase: MissionPhase) -> bool: # Return true if the Command was successfully received & understood.
if self.this_path_is_cycle:
pass # Do not append phase to phases_leading_to_potential_cycle because we want to leave phases_leading_to_potential_cycle ending with the phase that closes the cycle.
else:
if phase in self.phases_leading_to_potential_cycle:
self.this_path_is_cycle = True
self.phases_leading_to_potential_cycle.append(phase)
# self.printing_function(command.human_readable_order)
return True # command received & acknowledged
def wait_for_outcome_of_command(self) -> Outcome:
outcome: Outcome
chose_a_new_outcome: bool = False
if self.this_path_is_cycle:
outcome = OUTCOME_EXCEPTION # Keep returning OUTCOME_EXCEPTION to get out of the infinite loop and escape the mission. If even the damn exception handler is an infinite loop we are well and truly screwed and the program will hang, but God help us if someone wrote a mission that bad.
chose_a_new_outcome = True
else:
possible_outcome: Outcome
for possible_outcome in [OUTCOME_SUCCESS, OUTCOME_FAILURE, OUTCOME_EXCEPTION]:
theoretical_path: str = self.path_currently_being_explored + possible_outcome.name
if self._did_fully_explore_path(theoretical_path):
continue # try the next possible_outcome
else:
outcome = possible_outcome
chose_a_new_outcome = True
break
assert chose_a_new_outcome
self.path_currently_being_explored += outcome.name
return outcome
# Special methods used for the depth-first-search based exhaustive testing:
def did_complete_all_runs(self):
return self.did_explore_all_paths_in_this_mission
def should_abort(self):
return self.max_runs_limit is not None \
and self.all_paths_count >= self.max_runs_limit
def on_abort(self):
# This should be called when the program is going to stop running this mission any more times, but did_complete_all_runs is False.
self.printing_function("Aborted early, after " + str(self.all_paths_count) + " paths tested, of which " + str(self.infinite_loop_paths_count) + " contain infinite loops.")
def mission_terminated(self):
path_just_explored: List[Outcome] = self.path_currently_being_explored
if self.this_path_is_cycle:
# Truncate path_just_explored so it ends right at the point where the cycle wrapped around.
path_just_explored = path_just_explored[:len(self.phases_leading_to_potential_cycle) - 1]
self.fully_explored_paths[path_just_explored] = True
self.all_paths_count += 1
if self.this_path_is_cycle:
self.infinite_loop_paths_count += 1
self.printing_function("INFINITE LOOP DETECTED: " + "".join([letter + " " for letter in path_just_explored]) + "...")
# Backtrack & mark any fully-explored levels of the paths tree as having been explored.
up: str = path_just_explored
while len(up) > 0:
# assert len(OUTCOME_SUCCESS.name) == 1 and len(OUTCOME_FAILURE.name) == 1 and len(OUTCOME_EXCEPTION.name) == 1
up = up[:-1]
if self._did_fully_explore_path(up + OUTCOME_SUCCESS.name) \
and self._did_fully_explore_path(up + OUTCOME_FAILURE.name) \
and self._did_fully_explore_path(up + OUTCOME_EXCEPTION.name):
#
self.fully_explored_paths[up] = True
# Try to mitigate the issue of running out of memory and crashing as the fully_explored_paths dictionary grows:
del self.fully_explored_paths[up + OUTCOME_SUCCESS.name]
del self.fully_explored_paths[up + OUTCOME_FAILURE.name]
del self.fully_explored_paths[up + OUTCOME_EXCEPTION.name]
else:
break
# end while
if len(up) == 0:
self.did_explore_all_paths_in_this_mission = True
if self.infinite_loop_paths_count > 0:
self.printing_function(str(self.all_paths_count) + " paths tested, of which " + str(self.infinite_loop_paths_count) + " contain infinite loops.")
else:
self.printing_function(str(self.all_paths_count) + " paths tested; no infinite loops.")
self.path_currently_being_explored = ""
self.phases_leading_to_potential_cycle = []
self.this_path_is_cycle = False
# Private
def _did_fully_explore_path(self, path: str):
if path in self.fully_explored_paths and self.fully_explored_paths[path] == True:
return True
else:
return False
class ExecutionEngine: # Abstract base class.
rbm: RBM
def connect_to_RBM(self, rbm: RBM) -> None:
self.rbm = rbm
# NullExecutionEngine is a placeholder to use for now since we have not implemented any ExecutionEngines.
# A real ExecutionEngine would be e.g. a drone flight controller.
# For more information about this layer of the RBM stack, read this paper: https://calhoun.nps.edu/handle/10945/44438
class NullExecutionEngine(ExecutionEngine):
rbm: RBM
def connect_to_RBM(self, rbm: RBM) -> None:
self.rbm = rbm
# ---------------------------------------------------------------------------------------- #
# End Universal Mission Execution Engine #
# ---------------------------------------------------------------------------------------- #
# ---------------------------------------------------------------------------------------- #
# Begin AVCL(XML) Mission Specification File Loading and Parsing #
# ---------------------------------------------------------------------------------------- #
RoutableFilename = typing.NewType("RoutableFilename", str) # A path to a file we can locate. (A relative path or absolute path)
FileBasename = typing.NewType("FileBasename", str) # Unlike RoutableFilename, FileBasename doesn't tell you how to get to the file (doesn't tell you what folder it's in). The file could live anywhere on any disk or server.
# USER SHOULD PROVIDE THE AVCL_MISSIONS_FOLDER PATH BASED ON THEIR SYSTEM'S FILE LAYOUT:
PYTHON_SCRIPT_DIR: RoutableFilename = RoutableFilename(os.path.dirname(os.path.realpath(__file__)))
AVCL_MISSIONS_FOLDER: RoutableFilename = RoutableFilename( \
os.path.join(PYTHON_SCRIPT_DIR, "..", "avcl"))
# ^ This is the path to the folder where the AVCL mission definition files are located.
# Speed optimization
printing_buffer: str = ""
printing_buffer_line_ct = 0
PRINTING_BUFFER_FLUSH_AFTER_LINES: int = 100
def buffered_print(s: str):
global printing_buffer, printing_buffer_line_ct
printing_buffer += (s + "\n")
printing_buffer_line_ct += 1
if printing_buffer_line_ct >= PRINTING_BUFFER_FLUSH_AFTER_LINES:
flush_buffered_print()
def flush_buffered_print():
global printing_buffer
print(printing_buffer, end="")
printing_buffer = ""
printing_buffer_line_ct = 0
# Returns a string of the path to the selected AVCL(XML) file.
def ask_user_for_mission_file() -> RoutableFilename:
print("Looking in this folder for AVCL files:")
print(AVCL_MISSIONS_FOLDER)
print("If you want to look in a different folder, please change the variable AVCL_MISSIONS_FOLDER in MissionExecutionEngine.py \n")
available_missions: List[FileBasename] = \
[FileBasename(x) for x in os.listdir(AVCL_MISSIONS_FOLDER) if os.path.splitext(x)[1].lower() == ".xml"]
n_missions: int = len(available_missions)
print("FYI: Instead of using this menu, you can pass the mission filename as a command line argument.")
print("\nAVAILABLE MISSIONS:")
for i in range(0, n_missions):
print("[" + str(i + 1) + "] " + available_missions[i])
selected_mission: Optional[FileBasename] = None
while selected_mission is None:
print("\nSelect a mission.\nInput the number or the filename: ")
user_input: str = input()
try:
int_selection: int = int(user_input)
if int_selection in range(1, n_missions + 1):
selected_mission = available_missions[int_selection - 1]
break
except ValueError: pass # user_input is not an int. That's fine, keep going; we'll see if it's a filename instead.
possible_mission_name: FileBasename
for possible_mission_name in available_missions:
if user_input.lower() == possible_mission_name.lower() \
or user_input.lower() + ".xml" == possible_mission_name.lower():
selected_mission = possible_mission_name
break
print("\nInvalid selection.")
print()
assert selected_mission in available_missions
selected_mission_path: RoutableFilename = RoutableFilename(os.path.join(AVCL_MISSIONS_FOLDER, selected_mission))
return selected_mission_path
class FailedToStartRBM(Exception):
pass # This is a catch-all exception which means that the RBM cannot start for any reason.
class InvalidAVCL(FailedToStartRBM):
pass # This exception is thrown if the parser is told to parse an invalid AVCL file which is not compliant with the file format spec or which represents an incorrectly defined mission.
# Raises InvalidAVCL if requested node is missing.
def traverse_AVCL_XML_tree(parent_element: ET.Element, *node_names: str) -> ET.Element:
node: Optional[ET.Element] = parent_element
for name in node_names:
assert node is not None # This is to make the type checker happy about us calling .find
node = node.find(name)
if node is None: raise InvalidAVCL("Element " + name + " is missing.")
assert node is not None # This is to make the type checker happy about us returning a non-optional ET.Element.
return node
# If the file can successfully be parsed, returns a MissionOrders object.
# If not, prints debug information to stdout and throws an InvalidAVCL exception.
def load_mission_orders_from_avcl_file(avcl_mission_filename: RoutableFilename, validate: bool) -> MissionOrders:
# Keywords from the file format spec: https://savage.nps.edu/Savage/AuvWorkbench/AVCL/AVCL.html
NODE_BODY: str = "body"
NODE_MISSION_PREPARATION: str = "MissionPreparation"
NODE_AGENDA_MISSIONS: str = "AgendaMission"
NODE_GOAL_LIST: str = "GoalList"
ATTRIB_NAME: str = "id"
ATTRIB_NEXT_ON_SUCCESS: str = "nextOnSuccess"
ATTRIB_NEXT_ON_FAILURE: str = "nextOnFailure"
ATTRIB_NEXT_ON_EXCEPTION: str = "nextOnException"
ATTRIB_ORDERS_TITLE: str = "title"
ATTRIB_ORDERS_DESCRIPTION: str = "description"
ATTRIB_MISSION_SEGMENT: str = "phase" # The use of the word "phase" here in the AVCL file format spec is a misnomer. This attribute is not unique to a particular mission phase, but rather is shared between a group of similar phases (what we are calling a "mission segment").
# Parse the file.
try:
xml_tree: ET.ElementTree = ET.parse(avcl_mission_filename)
xml_root: ET.Element = xml_tree.getroot()
xml_goal_list: ET.Element = traverse_AVCL_XML_tree(\
xml_root, NODE_BODY, NODE_MISSION_PREPARATION, NODE_AGENDA_MISSIONS, NODE_GOAL_LIST)
goal_list: List[ET.Element] = list(xml_goal_list) # This syntax gets xml_goal_list's child Elements
phases_by_name: Dict[str, MissionPhase] = {}
# Outputs from parsing
all_phases: Set[MissionPhase] = set()
terminal_phases: Set[MissionPhase] = set()
first_phase: MissionPhase
# Create blank new phases
xml_phase: ET.Element
for xml_phase in goal_list:
p1: MissionPhase = MissionPhase()
p1_name: str = xml_phase.attrib[ATTRIB_NAME]
if p1_name in phases_by_name:
exception1_description: str = "Error: Mission file is invalid; it contains multiple phases with the same name: \n"
exception1_description += p1_name + "\n\n"
print(exception1_description, end="")
# NOT TESTED
raise InvalidAVCL(exception1_description)
else:
phases_by_name[ p1_name ] = p1
# Configure the phases and wire up the directed acyclic graph of PhaseTransitions
first_phase = phases_by_name[ goal_list[0].attrib[ATTRIB_NAME] ]
#
for xml_phase in goal_list:
p2_attribs: Dict[str, str] = xml_phase.attrib
p2_name: str = xml_phase.attrib[ATTRIB_NAME]
p2: MissionPhase = phases_by_name[p2_name]
# Attempt to figure out what type of phase this phase is intended to be, even if the user misconfigured the phase
PHASE_TYPE_UNKNOWN = 0
PHASE_TYPE_NONTERMINAL = 1
PHASE_TYPE_TERMINAL = 2
phase_type: int = PHASE_TYPE_UNKNOWN
next_phase_upon_success: Optional[MissionPhase] = None
next_phase_upon_failure: Optional[MissionPhase] = None
next_phase_upon_exception: Optional[MissionPhase] = None
#
if ATTRIB_NEXT_ON_SUCCESS in p2_attribs:
phase_type = PHASE_TYPE_NONTERMINAL
next_phase_upon_success = phases_by_name[ p2_attribs[ATTRIB_NEXT_ON_SUCCESS] ]
#
if ATTRIB_NEXT_ON_FAILURE in p2_attribs:
next_phase_upon_failure = phases_by_name[ p2_attribs[ATTRIB_NEXT_ON_FAILURE] ]
else:
if validate:
print("\n||===========================================================================================||")
print("!!! SEVERE WARNING !!!")
print("Phase:")
print(" " + p2_name)
print("IS MISSING A FAILURE HANDLER!")
print("The AVCL specification says that if a failure occurs in this phase,")
print("it will be SILENTLY treated as a success, which can be EXTREMELY DANGEROUS!!!!!!!!")
print("E.g., if a phase called \"Turn off circuit breakers\" fails but is silently treated as success,")
print("and the next phase is \"Tell lineman to repair frayed power wire\",")
print("the lineman could get killed.")
print("Please consider fixing your AVCL mission definition file to have explicit failure handlers!")
print("^^^ SEVERE WARNING ^^^")
print("Are you sure you want to proceed?")
yn = None
while yn not in ["y", "n"]:
yn = input("(y/n) ").lower()
print("||===========================================================================================||\n")
if yn != "y":
print("\n\n\nThank you.\nPlease open your AVCL file \"" + os.path.basename(avcl_mission_filename) + "\" in a mission editor (e.g. \"AUV Workbench\" from the Naval Postgraduate School) or a text editor.")
print("Please define the attribute:")
print(" " + ATTRIB_NEXT_ON_FAILURE)
print("for each one of your mission phases.\n\nThank you.\n\n")
raise InvalidAVCL("Phase \"" + p2_name + "\" does not have a failure handler defined, and user elects not to proceed. Please repair the AVCL mission definition! For safe behavior, all non-terminal phases should have a failure handler.")
next_phase_upon_failure = next_phase_upon_success
#
if ATTRIB_NEXT_ON_EXCEPTION in p2_attribs:
next_phase_upon_exception = phases_by_name[ p2_attribs[ATTRIB_NEXT_ON_EXCEPTION] ]
else:
if ATTRIB_NEXT_ON_FAILURE in p2_attribs:
# Exception handler is not defined by Failure handler is defined,
# so we will use the Failure handler to handle Exceptions as well.
# This is fine and is safe, so we will not give a warning about this.
next_phase_upon_exception = next_phase_upon_failure
else:
# Both Failure and Exception handlers are undefined.
# We already gave the user a SEVERE WARNING about the lack of a failure handler.
# Here we will give a smaller warning about the exception handler being missing,
# since the missing failure handler is the really bad issue.
if validate:
print("\nWarning:")
print("Phase \"" + p2_name + "\" does not have an exception handler defined,")
print("and because it is also missing a failure hander (see severe warning above),")
print("then the AVCL specification says that if an exception occurs in this phase,")
print("it will be SILENTLY treated as a success.")
print("Please consider fixing your AVCL mission definition file to have explicit failure and exception handlers!")
print("If you are only going to do one or the other,")
print("the MOST IMPORTANT thing is to at least define FAILURE handlers for every phase. Thank you.\n")
next_phase_upon_exception = next_phase_upon_success
#
else:
# ATTRIB_NEXT_ON_SUCCESS not in p_attribs
phase_type = PHASE_TYPE_TERMINAL
if ATTRIB_NEXT_ON_FAILURE in p2_attribs \
or ATTRIB_NEXT_ON_EXCEPTION in p2_attribs:
exception2_description: str = "Error: Mission file is invalid.\n"
exception2_description += "This phase: " + p2_name + "\n"
exception2_description +="is a terminal phase (does not have a success handler defined),\n"
exception2_description += "but has failure and/or exception handlers defined, which a terminal phase is not allowed to have.\n\n"
print(exception2_description, end="")
raise InvalidAVCL(exception2_description)
# Make sure we have properly defined the phase transitions:
assert phase_type != PHASE_TYPE_UNKNOWN
if phase_type == PHASE_TYPE_NONTERMINAL:
assert next_phase_upon_success is not None
assert next_phase_upon_failure is not None
assert next_phase_upon_exception is not None
else: # PHASE_TYPE_TERMINAL
assert next_phase_upon_success is None
assert next_phase_upon_failure is None
assert next_phase_upon_exception is None
# Wire up the phase object.
all_phases.add(p2)
if phase_type == PHASE_TYPE_TERMINAL: terminal_phases.add(p2)
human_readable_order = "\n"
if ATTRIB_MISSION_SEGMENT in p2_attribs: # Optional
human_readable_order += "MISSION SEGMENT: " + p2_attribs[ATTRIB_MISSION_SEGMENT] + "\n"
human_readable_order += "ORDERS: \n"
human_readable_order += " " + p2_attribs[ATTRIB_ORDERS_TITLE] + "\n"
human_readable_order += " " + p2_attribs[ATTRIB_ORDERS_DESCRIPTION]
p2.initialize( command= Command(human_readable_order= human_readable_order),
upon_success= next_phase_upon_success,
upon_failure= next_phase_upon_failure,
upon_exception= next_phase_upon_exception )
# end for xml_phase in goal_list
# Done parsing.
# Construct and return MissionOrders object.
mission_orders = MissionOrders()
mission_orders.all_phases = all_phases
mission_orders.first_phase = first_phase
mission_orders.terminal_phases = terminal_phases
return mission_orders
except (ET.ParseError, AttributeError) as e:
exception3_description: str = "Error: Mission file is not a valid AVCL (Autonomous Vehicle Control Language) file. \n"
exception3_description += "See AVCL file format spec here: \n"
exception3_description += "https://savage.nps.edu/Savage/AuvWorkbench/AVCL/AVCL.html \n"
exception3_description += "Internal details of exception inside xml.etree library: \n"
exception3_description += str(e) + "\n\n"
print(exception3_description, end="")
raise InvalidAVCL(exception3_description)
# Creates a new RBM stack with the provided avcl_mission_filename as the mission.
# If any issue prevents the creation or running of the RBM stack,
# FailedToStartRBM will be raised.
def run_RBM_with_mission_orders(mission_orders: MissionOrders,
# The strategic_level_engine is constructed by run_RBM_with_avcl_mission_file, so we do not allow the caller to provide it.
optional_tactical_level_engine_instance: Optional[TacticalEngine],
optional_execution_level_engine_instance: Optional[ExecutionEngine],
tactical_level_engine_class: type,
execution_level_engine_class: type) -> None:
strategic_level_engine: MissionExecutionEngine = MissionExecutionEngine(mission_orders)
tactical_level_engine: TacticalEngine
if optional_tactical_level_engine_instance is not None:
assert type(optional_tactical_level_engine_instance) == tactical_level_engine_class
tactical_level_engine = optional_tactical_level_engine_instance
else:
tactical_level_engine = tactical_level_engine_class()
execution_level_engine: ExecutionEngine
if optional_execution_level_engine_instance is not None:
assert type(optional_execution_level_engine_instance) == execution_level_engine_class
execution_level_engine = optional_execution_level_engine_instance
else:
execution_level_engine = execution_level_engine_class()
## CONSTRUCT RBM.
rational_behavior_model: RBM
rational_behavior_model = RBM(strategic_level_engine, tactical_level_engine, execution_level_engine)
strategic_level_engine .connect_to_RBM(rational_behavior_model)
tactical_level_engine .connect_to_RBM(rational_behavior_model)
execution_level_engine .connect_to_RBM(rational_behavior_model)
## RUN RBM.
rational_behavior_model.ExecuteMission()
return
# Can raise InvalidAVCL exception which is a subclass of FailedToStartRBM exception.
def exhaustively_test_mission(avcl_mission_filename: RoutableFilename, validate: bool, max_runs_limit: Optional[int]) -> bool:
print("\nExhaustively testing mission: " + os.path.abspath(avcl_mission_filename))
mission_orders: MissionOrders
try:
mission_orders = load_mission_orders_from_avcl_file(avcl_mission_filename, validate)
except InvalidAVCL as e:
raise e
depth_first_search_exhaustive_tester: ExhaustiveTestingMockTacticalEngine \
= ExhaustiveTestingMockTacticalEngine(printing_function=buffered_print, \
max_runs_limit=max_runs_limit)
null_execution_engine: NullExecutionEngine = NullExecutionEngine() # Keeping & reusing one of these to avoid the performance hit from re-instantiating the class over and over during the exhaustive test.
try:
while (not depth_first_search_exhaustive_tester.did_complete_all_runs()) \
and (not depth_first_search_exhaustive_tester.should_abort()):
run_RBM_with_mission_orders(mission_orders =mission_orders,
optional_tactical_level_engine_instance =depth_first_search_exhaustive_tester,
optional_execution_level_engine_instance =null_execution_engine,
tactical_level_engine_class =ExhaustiveTestingMockTacticalEngine,
execution_level_engine_class =NullExecutionEngine)
# Done running mission.
depth_first_search_exhaustive_tester.mission_terminated()
# end while
if not depth_first_search_exhaustive_tester.did_complete_all_runs():
depth_first_search_exhaustive_tester.on_abort()
#
except KeyboardInterrupt:
# If the user is doing --exhaustively-test-all-standard-library-missions, it's convenient to be able to KeyboardInterrupt just one mission that is taking forever to test, without quitting the whole program, so that the program can go on to test the other missions.
buffered_print("KeyboardInterrupt")
depth_first_search_exhaustive_tester.on_abort()
# Done with all runs.
flush_buffered_print()
successfully_completed_exhaustive_test = depth_first_search_exhaustive_tester.did_complete_all_runs()
return successfully_completed_exhaustive_test
def main():
parser: argparse.ArgumentParser = argparse.ArgumentParser( allow_abbrev=False, description="Mission Execution Engine for the Rational Behavior Model system for ethical control of autonomy. See project website at: https://savage.nps.edu/EthicalControl/")
group1: argparse._MutuallyExclusiveGroup = parser.add_mutually_exclusive_group()
parser .add_argument("--validate", action="store_true", help="Warn the user if the mission is dangerously under-specified.")
parser .add_argument("--exhaustive-testing-max-runs-per-mission", type=int, help="Sets the exhaustive tester to abort testing a mission after testing a certain number of runs through the mission. This can be useful if your mission is large and would take days, years, or decades to exhaustively test.")
parser .add_argument("--exhaustively-test", action="store_true", help="Exhaustively test the selected mission file, using depth-first search to exhaustively run through all possible mission flows.")
group1 .add_argument("--exhaustively-test-all-standard-library-missions", action="store_true", help="Exhaustively test every mission in the standard mission library folder " + AVCL_MISSIONS_FOLDER + ". If you use this option you should not specify a specific mission filename.")
group1 .add_argument("mission_file_path", nargs="?") # If no default value is specified, the default is None. The default type is essentially Optional[str], because either a str or None will be returned.
args: argparse.Namespace = parser.parse_args()
validate: bool = ( args.validate == True )
exhaustive_test: bool = ( args.exhaustively_test == True ) or ( args.exhaustively_test_all_standard_library_missions == True )
loop_over_all_standard_lib_missions: bool = ( args.exhaustively_test_all_standard_library_missions == True )
exhaustive_testing_max_runs_per_mission: Optional[int] = args.exhaustive_testing_max_runs_per_mission
avcl_mission_filename: RoutableFilename
print()
print("For help, run:\npython MissionExecutionEngine.py --help\n")
if loop_over_all_standard_lib_missions:
assert exhaustive_test == True
all_mission_file_basenames: List[FileBasename] = \
[FileBasename(x) for x in os.listdir(AVCL_MISSIONS_FOLDER) if os.path.splitext(x)[1].lower() == ".xml"]
all_mission_file_paths: List[RoutableFilename] = \
[RoutableFilename(os.path.join(AVCL_MISSIONS_FOLDER, x)) for x in all_mission_file_basenames]
mission_num: int = 1
n_exhaustive_tests_aborted: int = 0
for avcl_mission_filename in all_mission_file_paths:
try:
completed_exhaustive_test: bool = \
exhaustively_test_mission(avcl_mission_filename, validate, exhaustive_testing_max_runs_per_mission)
if not completed_exhaustive_test:
n_exhaustive_tests_aborted += 1
except FailedToStartRBM as e:
# At this point, the error will have already been printed, so we do not need to print it again.
n_exhaustive_tests_aborted += 1
assert mission_num >= 1
sys.exit(mission_num) # Using the mission number of the failed mission as the return code, so if e.g. the program closes with return code of 5, you know that mission 5 is the one that had the issue.
mission_num += 1
# end for avcl_mission_filename
if n_exhaustive_tests_aborted == 0:
print("\nExhaustive testing of all standard library missions completed successfully.")
else:
print("\nExhaustively tested " + str(len(all_mission_file_paths) - n_exhaustive_tests_aborted) + " of the standard library missions successfully,")
print("but " + str(n_exhaustive_tests_aborted) + " standard library missions were aborted and were not tested exhaustively.")
else: # loop_over_all_standard_lib_missions == False
if args.mission_file_path is not None: avcl_mission_filename = args.mission_file_path
else: avcl_mission_filename = ask_user_for_mission_file()
try:
if exhaustive_test:
exhaustively_test_mission(avcl_mission_filename, validate, exhaustive_testing_max_runs_per_mission)
else:
print("\nSelected mission: " + os.path.abspath(avcl_mission_filename) + "\n")
mission_orders: MissionOrders = load_mission_orders_from_avcl_file(avcl_mission_filename, validate)
run_RBM_with_mission_orders(mission_orders =mission_orders,
optional_tactical_level_engine_instance =None,
optional_execution_level_engine_instance =None,
tactical_level_engine_class =HumanInTheLoopTacticalEngine,
execution_level_engine_class =NullExecutionEngine)
# Done running mission.
except FailedToStartRBM as e:
# At this point, the error will have already been printed, so we do not need to print it again.
sys.exit(1)
# end if/else loop_over_all_standard_lib_missions
print("\n\n")
sys.exit(0)
# end function main.
if __name__ == "__main__": # <-- this line is a standard python idiom to see if we are running as a user-invoked shell script (rather than having been loaded as a submodule of some larger program)
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment