Skip to content
Snippets Groups Projects
Commit d0a12b68 authored by Brutzman, Don's avatar Brutzman, Don
Browse files
parents 0ab3bc83 d4c2a6a3
No related branches found
No related tags found
No related merge requests found
File added
......@@ -27,6 +27,8 @@ class Command:
human_readable_order: str
def __init__(self, human_readable_order: str) -> None:
self.human_readable_order = human_readable_order
def human_readable_order_indented(self, line_prefix: str = "") -> str:
return "\n".join([ line_prefix + line for line in self.human_readable_order.splitlines() ])
class Outcome:
......@@ -74,6 +76,7 @@ class SuccessorList(List[PhaseTransition]):
class MissionPhase:
name: str
command: Command
succesor_list: SuccessorList
......@@ -84,9 +87,11 @@ class MissionPhase:
# working backward to the first_phase, in order to have object handles to pass into constructors as needed.
# Using our own "initialize" method instead allows the user to construct the MissionPhase objects in forward order,
# and then initialize all of them in forward order.
def initialize(self, command: Command, upon_success: Optional[MissionPhase],
upon_failure: Optional[MissionPhase],
upon_exception: Optional[MissionPhase]) -> None:
def initialize(self, name: str, command: Command,
upon_success: Optional[MissionPhase],
upon_failure: Optional[MissionPhase],
upon_exception: Optional[MissionPhase]) -> None:
self.name = name
self.command = command
success_phase_transition: PhaseTransition = PhaseTransition("Success.", upon_success)
failure_phase_transition: PhaseTransition = PhaseTransition("Failed.", upon_failure)
......@@ -239,16 +244,21 @@ class TacticalEngine: # Abstract base class.
class HumanInTheLoopTacticalEngine(TacticalEngine):
rbm: RBM
current_execution_phase: MissionPhase
def connect_to_RBM(self, rbm: RBM) -> None:
self.rbm = rbm
def carry_out_command(self, command: Command, phase: MissionPhase) -> bool: # Return true if the Command was successfully received & understood.
self.current_execution_phase = phase
print("\n-----------------------------------------------------------------------\n")
print(command.human_readable_order)
print("")
return True # command received & acknowledged
def wait_for_outcome_of_command(self) -> Outcome:
while True:
user_input: str = input("Did goal succeed (s), fail (f), or abort (x)? ")
user_input: str = input("Did " + self.current_execution_phase.name + " succeed (s), fail (f), or abort (x)? ")
possible_outcome: Outcome
for possible_outcome in [OUTCOME_SUCCESS, OUTCOME_FAILURE, OUTCOME_EXCEPTION]:
if user_input == possible_outcome.name:
......@@ -263,6 +273,9 @@ class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
did_explore_all_paths_in_this_mission: bool
phases_leading_to_potential_cycle: List[MissionPhase] # For detecting cycles (a.k.a. infinite loops)
this_path_is_cycle: bool
phase_just_before_cycle_gets_closed: Optional[MissionPhase]
phase_closing_cycle: Optional[MissionPhase]
cycle_closing_point_step_number: Optional[int]
all_paths_count: int
infinite_loop_paths_count: int
max_runs_limit: Optional[int]
......@@ -274,6 +287,9 @@ class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
self.did_explore_all_paths_in_this_mission = False
self.phases_leading_to_potential_cycle = []
self.this_path_is_cycle = False
self.phase_just_before_cycle_gets_closed = None
self.phase_closing_cycle = None
self.cycle_closing_point_step_number = None
self.all_paths_count = 0
self.infinite_loop_paths_count = 0
self.max_runs_limit = max_runs_limit
......@@ -288,6 +304,13 @@ class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
else:
if phase in self.phases_leading_to_potential_cycle:
self.this_path_is_cycle = True
self.phase_closing_cycle = phase
self.phase_just_before_cycle_gets_closed = self.phases_leading_to_potential_cycle[-1] # We have not yet appended phase to phases_leading_to_potential_cycle (that happens in the next line below), so currently the phase right below the cycle gets closed is the last element of phases_leading_to_potential_cycle
i: int
for i in range(0, len(self.phases_leading_to_potential_cycle)):
if self.phases_leading_to_potential_cycle[i] == phase:
self.cycle_closing_point_step_number = i
break
self.phases_leading_to_potential_cycle.append(phase)
# self.printing_function(command.human_readable_order)
......@@ -328,7 +351,7 @@ class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
def on_abort(self):
# This should be called when the program is going to stop running this mission any more times, but did_complete_all_runs is False.
self.printing_function("Aborted early, after " + str(self.all_paths_count) + " paths tested, of which " + str(self.infinite_loop_paths_count) + " contain infinite loops.")
self.printing_function("Aborted early, after " + str(self.all_paths_count) + " paths tested, of which " + str(self.infinite_loop_paths_count) + " contain infinite loops.")
def mission_terminated(self):
path_just_explored: List[Outcome] = self.path_currently_being_explored
......@@ -340,7 +363,20 @@ class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
self.all_paths_count += 1
if self.this_path_is_cycle:
self.infinite_loop_paths_count += 1
self.printing_function("INFINITE LOOP DETECTED: " + "".join([letter + " " for letter in path_just_explored]) + "...")
last_outcome_letter: str = path_just_explored[-1] # the last outcome which resulted in closing the loop. A lower-case "s", "f", or "x".
inf_loop_description: str = ""
i: int = 0
for i in range(0, len(path_just_explored)):
letter: str = path_just_explored[i]
if i >= self.cycle_closing_point_step_number: letter = letter.upper() # Make the looping part of the sequence be rendered in upper case, so it's easy to visualize.
if i > 0: inf_loop_description += " "
inf_loop_description += letter
self.printing_function("\nINFINITE LOOP DETECTED, CAUSED BY OUTCOME SEQUENCE: " + inf_loop_description + "\n" + \
self.phase_just_before_cycle_gets_closed.command.human_readable_order_indented(" => ") + \
'\nUPON OUTCOME ' + last_outcome_letter + ', LOOPS BACK TO ↓↓\n' +
self.phase_closing_cycle.command.human_readable_order_indented(" <= ") + \
"\nWHICH (counting starting with initial phase = step # 1) was previously visited at step # " + str(1 + self.cycle_closing_point_step_number))
# Backtrack & mark any fully-explored levels of the paths tree as having been explored.
up: str = path_just_explored
......@@ -362,13 +398,18 @@ class ExhaustiveTestingMockTacticalEngine(TacticalEngine):
if len(up) == 0:
self.did_explore_all_paths_in_this_mission = True
if self.infinite_loop_paths_count > 0:
self.printing_function(str(self.all_paths_count) + " paths tested, of which " + str(self.infinite_loop_paths_count) + " contain infinite loops.")
self.printing_function("\n" + str(self.all_paths_count) + " paths tested, of which " + str(self.infinite_loop_paths_count) + " contain infinite loops.")
else:
self.printing_function(str(self.all_paths_count) + " paths tested; no infinite loops.")
# Erase mission state, in preparation for the next time (if any) that the mission is run.
self.path_currently_being_explored = ""
self.phases_leading_to_potential_cycle = []
self.this_path_is_cycle = False
self.phase_just_before_cycle_gets_closed = None
self.phase_closing_cycle = None
self.cycle_closing_point_step_number = None
# done.
# Private
def _did_fully_explore_path(self, path: str):
......@@ -409,8 +450,8 @@ FileBasename = typing.NewType("FileBasename", str) # Unlike RoutableFile
# USER SHOULD PROVIDE THE AVCL_MISSIONS_FOLDER PATH BASED ON THEIR SYSTEM'S FILE LAYOUT:
PYTHON_SCRIPT_DIR: RoutableFilename = RoutableFilename(os.path.dirname(os.path.realpath(__file__)))
AVCL_MISSIONS_FOLDER: RoutableFilename = RoutableFilename( \
os.path.join(PYTHON_SCRIPT_DIR, "..", "avcl"))
AVCL_MISSIONS_FOLDER: RoutableFilename = RoutableFilename(os.path.abspath(\
os.path.join(PYTHON_SCRIPT_DIR, "..", "avcl")))
# ^ This is the path to the folder where the AVCL mission definition files are located.
......@@ -465,7 +506,8 @@ def ask_user_for_mission_file() -> RoutableFilename:
or user_input.lower() + ".xml" == possible_mission_name.lower():
selected_mission = possible_mission_name
break
print("\nInvalid selection.")
if selected_mission is None:
print("\nInvalid selection.")
print()
assert selected_mission in available_missions
......@@ -636,14 +678,15 @@ def load_mission_orders_from_avcl_file(avcl_mission_filename: RoutableFilename,
all_phases.add(p2)
if phase_type == PHASE_TYPE_TERMINAL: terminal_phases.add(p2)
human_readable_order = "\n"
human_readable_order = "GOAL ID: " + p2_name + "\n"
if ATTRIB_MISSION_SEGMENT in p2_attribs: # Optional
human_readable_order += "MISSION SEGMENT: " + p2_attribs[ATTRIB_MISSION_SEGMENT] + "\n"
human_readable_order += "ORDERS: \n"
human_readable_order += " " + p2_attribs[ATTRIB_ORDERS_TITLE] + "\n"
human_readable_order += " " + p2_attribs[ATTRIB_ORDERS_DESCRIPTION]
p2.initialize( command= Command(human_readable_order= human_readable_order),
p2.initialize( name= p2_name,
command= Command(human_readable_order= human_readable_order),
upon_success= next_phase_upon_success,
upon_failure= next_phase_upon_failure,
upon_exception= next_phase_upon_exception )
......@@ -777,29 +820,34 @@ def main():
all_mission_file_paths: List[RoutableFilename] = \
[RoutableFilename(os.path.join(AVCL_MISSIONS_FOLDER, x)) for x in all_mission_file_basenames]
missions_exhaustively_tested: List[RoutableFilename] = []
missions_not_exhaustively_tested: List[RoutableFilename] = []
mission_num: int = 1
n_exhaustive_tests_aborted: int = 0
for avcl_mission_filename in all_mission_file_paths:
try:
completed_exhaustive_test: bool = \
exhaustively_test_mission(avcl_mission_filename, validate, exhaustive_testing_max_runs_per_mission)
if not completed_exhaustive_test:
n_exhaustive_tests_aborted += 1
completed_exhaustive_test: bool = exhaustively_test_mission( avcl_mission_filename, validate, exhaustive_testing_max_runs_per_mission )
if completed_exhaustive_test: missions_exhaustively_tested.append(avcl_mission_filename)
else: missions_not_exhaustively_tested.append(avcl_mission_filename)
except FailedToStartRBM as e:
# At this point, the error will have already been printed, so we do not need to print it again.
n_exhaustive_tests_aborted += 1
# At this point, the error "e" will have already been printed, so we do not need to print it again.
assert mission_num >= 1
sys.exit(mission_num) # Using the mission number of the failed mission as the return code, so if e.g. the program closes with return code of 5, you know that mission 5 is the one that had the issue.
mission_num += 1
# end for avcl_mission_filename
if n_exhaustive_tests_aborted == 0:
print("\nExhaustive testing of all standard library missions completed successfully.")
n_exhaustive_tests_aborted: int = len(missions_not_exhaustively_tested)
print("\n\n")
if len(missions_not_exhaustively_tested) == 0:
print("============================ SUCCEEDED IN EXHAUSTIVELY TESTING ALL " + str(len(missions_exhaustively_tested)) + " OF THE STANDARD LIBRARY MISSIONS: =======================")
for avcl_mission_filename in missions_exhaustively_tested: print(str(avcl_mission_filename))
else:
print("\nExhaustively tested " + str(len(all_mission_file_paths) - n_exhaustive_tests_aborted) + " of the standard library missions successfully,")
print("but " + str(n_exhaustive_tests_aborted) + " standard library missions were aborted and were not tested exhaustively.")
print("============================== SUCCEEDED IN EXHAUSTIVELY TESTING " + str(len(missions_exhaustively_tested)) + " OF THE STANDARD LIBRARY MISSIONS: =========================")
for avcl_mission_filename in missions_exhaustively_tested: print(str(avcl_mission_filename))
print("\n================ FAILED TO EXHAUSTIVELY TEST " + str(len(missions_not_exhaustively_tested)) + " OF THE STANDARD LIBRARY MISSIONS DUE TO TIMEOUT OR USER ABORT: ================")
for avcl_mission_filename in missions_not_exhaustively_tested: print(str(avcl_mission_filename))
else: # loop_over_all_standard_lib_missions == False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment