Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .basedpyright/baseline.json
Original file line number Diff line number Diff line change
Expand Up @@ -19223,14 +19223,6 @@
"lineCount": 1
}
},
{
"code": "reportOptionalMemberAccess",
"range": {
"startColumn": 35,
"endColumn": 53,
"lineCount": 1
}
},
{
"code": "reportArgumentType",
"range": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

FlightIntentName = str

MAX_TEST_RUN_DURATION = timedelta(minutes=45)
"""The longest a test run might take (to estimate flight intent timestamps prior to scenario execution)"""
MAX_SCENARIO_EXEC_DURATION = timedelta(minutes=45)
"""The longest a scenario run might take (to estimate flight intent timestamps prior to scenario execution)"""

MAX_TEST_RUN_DURATION = timedelta(hours=6)
"""The longest a test run might take (to estimate flight intent timestamps prior to test run)"""


@dataclass
Expand All @@ -43,32 +46,69 @@ class ExpectedFlightIntent:
valid_uspace_flight_auth: bool | None = None


def validate_flight_intent_templates(
def estimate_scenario_execution_max_extents(
scenario_time_context: TestTimeContext,
templates: dict[FlightIntentID, FlightInfoTemplate],
expected_intents: list[ExpectedFlightIntent],
) -> Volume4D:
"""
Returns: the bounding extents of the flight intent templates
"""
extents = Volume4DCollection([])

scenario_start = TestTimeContext(
{
TimeDuringTest.StartOfTestRun: scenario_time_context[
TimeDuringTest.StartOfTestRun
],
TimeDuringTest.StartOfScenario: scenario_time_context[
TimeDuringTest.StartOfScenario
],
TimeDuringTest.TimeOfEvaluation: scenario_time_context[
TimeDuringTest.StartOfScenario
],
}
)
flight_intents = {k: v.resolve(scenario_start) for k, v in templates.items()}
for flight_intent in flight_intents.values():
extents.extend(flight_intent.basic_information.area)

scenario_estimated_end = TestTimeContext(
{
TimeDuringTest.StartOfTestRun: scenario_time_context[
TimeDuringTest.StartOfTestRun
],
TimeDuringTest.StartOfScenario: scenario_time_context[
TimeDuringTest.StartOfScenario
],
TimeDuringTest.TimeOfEvaluation: Time(
scenario_time_context[TimeDuringTest.StartOfScenario].datetime
+ MAX_SCENARIO_EXEC_DURATION
),
}
)
flight_intents = {
k: v.resolve(scenario_estimated_end) for k, v in templates.items()
}
for flight_intent in flight_intents.values():
extents.extend(flight_intent.basic_information.area)

return extents.bounding_volume


def validate_flight_intent_templates(
templates: dict[FlightIntentID, FlightInfoTemplate],
expected_intents: list[ExpectedFlightIntent],
):
"""Validate that all intents templates meet the criteria from `expected_intents` over the estimated maximum duration of the test run."""

now = Time(arrow.utcnow().datetime)
context = TestTimeContext.all_times_are(now)
flight_intents = {k: v.resolve(context) for k, v in templates.items()}
for flight_intent in flight_intents.values():
extents.extend(flight_intent.basic_information.area)
validate_flight_intents(flight_intents, expected_intents, now)

later = Time(now.datetime + MAX_TEST_RUN_DURATION)
context = TestTimeContext.all_times_are(later)
context[TimeDuringTest.StartOfTestRun] = now
flight_intents = {k: v.resolve(context) for k, v in templates.items()}
for flight_intent in flight_intents.values():
extents.extend(flight_intent.basic_information.area)
validate_flight_intents(flight_intents, expected_intents, later)

return extents.bounding_volume


def validate_flight_intents(
intents: dict[FlightIntentID, FlightInfo],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
from monitoring.uss_qualifier.resources.astm.f3548.v21 import DSSInstanceResource
from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import DSSInstance
from monitoring.uss_qualifier.resources.flight_planning import FlightIntentsResource
from monitoring.uss_qualifier.resources.flight_planning.flight_intent import (
FlightIntentID,
)
from monitoring.uss_qualifier.resources.flight_planning.flight_intent_validation import (
ExpectedFlightIntent,
estimate_scenario_execution_max_extents,
validate_flight_intent_templates,
)
from monitoring.uss_qualifier.resources.flight_planning.flight_planners import (
Expand Down Expand Up @@ -69,6 +73,7 @@ class ConflictEqualPriorityNotPermitted(TestScenario):
tested_uss: FlightPlannerClient
control_uss: FlightPlannerClient
dss: DSSInstance
flight_intents_templates: dict[FlightIntentID, FlightInfoTemplate]

def __init__(
self,
Expand Down Expand Up @@ -159,10 +164,12 @@ def __init__(
), # Note: this intent expected to produce Nonconforming state, but this is hard to verify without telemetry. UAS state is not actually off-nominal.
]

templates = flight_intents.get_flight_intents()
self.flight_intents_templates = (
flight_intents.get_flight_intents() if flight_intents else {}
)
try:
self._intents_extent = validate_flight_intent_templates(
templates, expected_flight_intents
validate_flight_intent_templates(
self.flight_intents_templates, expected_flight_intents
)
except ValueError as e:
raise ValueError(
Expand All @@ -171,7 +178,9 @@ def __init__(

for efi in expected_flight_intents:
setattr(
self, efi.intent_id.replace("equal_prio_", ""), templates[efi.intent_id]
self,
efi.intent_id.replace("equal_prio_", ""),
self.flight_intents_templates[efi.intent_id],
)

def resolve_flight(self, flight_template: FlightInfoTemplate) -> FlightInfo:
Expand All @@ -191,11 +200,14 @@ def run(self, context: ExecutionContext):

self.begin_test_case("Prerequisites check")
self.begin_test_step("Verify area is clear")
estimated_max_extents = estimate_scenario_execution_max_extents(
self.time_context, self.flight_intents_templates
)
validate_clear_area(
self,
self.dss,
[self._intents_extent],
ignore_self=True,
[estimated_max_extents],
ignore_self=False,
)
self.end_test_step()
self.end_test_case()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
PlanningActivityResult,
)
from monitoring.monitorlib.fetch import QueryError
from monitoring.monitorlib.geotemporal import Volume4D, Volume4DCollection
from monitoring.monitorlib.testing import make_fake_url
from monitoring.uss_qualifier.resources.astm.f3548.v21 import DSSInstanceResource
from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import DSSInstance
from monitoring.uss_qualifier.resources.flight_planning import FlightIntentsResource
from monitoring.uss_qualifier.resources.flight_planning.flight_intent import (
FlightIntentID,
)
from monitoring.uss_qualifier.resources.flight_planning.flight_intent_validation import (
ExpectedFlightIntent,
estimate_scenario_execution_max_extents,
validate_flight_intent_templates,
)
from monitoring.uss_qualifier.resources.flight_planning.flight_planners import (
Expand All @@ -49,10 +54,13 @@ class DownUSS(TestScenario):
flight1_planned: FlightInfoTemplate

uss_qualifier_sub: str
scenario_execution_max_extents: Volume4D | None = None
"""Actual bounding extent of the flight intents created by the USS qualifier acting as a virtual USS during the scenario execution."""

tested_uss: FlightPlannerClient
dss_resource: DSSInstanceResource
dss: DSSInstance
flight_intents_templates: dict[FlightIntentID, FlightInfoTemplate]

def __init__(
self,
Expand All @@ -65,18 +73,18 @@ def __init__(
self.tested_uss = tested_uss.client
self.dss = dss.get_instance(self._dss_req_scopes)

templates = flight_intents.get_flight_intents()
self.flight_intents_templates = flight_intents.get_flight_intents()
try:
self._intents_extent = validate_flight_intent_templates(
templates, self._expected_flight_intents
validate_flight_intent_templates(
self.flight_intents_templates, self._expected_flight_intents
)
except ValueError as e:
raise ValueError(
f"`{self.me()}` TestScenario requirements for flight_intents not met: {e}"
)

for efi in self._expected_flight_intents:
setattr(self, efi.intent_id, templates[efi.intent_id])
setattr(self, efi.intent_id, self.flight_intents_templates[efi.intent_id])

@property
def _dss_req_scopes(self) -> dict[str, str]:
Expand All @@ -97,7 +105,15 @@ def _expected_flight_intents(self) -> list[ExpectedFlightIntent]:
]

def resolve_flight(self, flight_template: FlightInfoTemplate) -> FlightInfo:
return flight_template.resolve(self.time_context.evaluate_now())
flight = flight_template.resolve(self.time_context.evaluate_now())

extents = Volume4DCollection([])
if self.scenario_execution_max_extents is not None:
extents.append(self.scenario_execution_max_extents)
extents.extend(flight.basic_information.area)
self.scenario_execution_max_extents = extents.bounding_volume

return flight

def run(self, context: ExecutionContext):
self.begin_test_scenario(context)
Expand All @@ -120,11 +136,15 @@ def run(self, context: ExecutionContext):
self.end_test_scenario()

def _setup(self):
estimated_max_extents = estimate_scenario_execution_max_extents(
self.time_context, self.flight_intents_templates
)

self.begin_test_step("Resolve USS ID of virtual USS")
with self.check("Successful dummy query", [self.dss.participant_id]) as check:
try:
_, dummy_query = self.dss.find_op_intent(
self._intents_extent.to_f3548v21()
estimated_max_extents.to_f3548v21()
)
self.record_query(dummy_query)
except QueryError as e:
Expand Down Expand Up @@ -154,8 +174,8 @@ def _setup(self):
validate_clear_area(
self,
self.dss,
[self._intents_extent],
ignore_self=True,
[estimated_max_extents],
ignore_self=False,
)
self.end_test_step()

Expand Down Expand Up @@ -279,16 +299,19 @@ def _clear_op_intents(self):
with self.check(
"Successful operational intents cleanup", [self.dss.participant_id]
) as check:
if self.scenario_execution_max_extents is None:
return

try:
oi_refs, find_query = self.dss.find_op_intent(
self._intents_extent.to_f3548v21()
self.scenario_execution_max_extents.to_f3548v21()
)
self.record_query(find_query)
except QueryError as e:
self.record_queries(e.queries)
find_query = e.queries[0]
check.record_failed(
summary=f"Failed to query operational intent references from DSS in {self._intents_extent} for cleanup",
summary=f"Failed to query operational intent references from DSS in {self.scenario_execution_max_extents} for cleanup",
details=f"DSS responded code {find_query.status_code}; {e}",
query_timestamps=[find_query.request.timestamp],
)
Expand Down
Loading