Source code for missiontools.condition.condition

"""
Conditions
==========
Boolean time-domain predicates with built-in caching.

A :class:`AbstractCondition` is a callable with one method,
:meth:`~AbstractCondition.at`, that returns a boolean array indicating
whether the condition holds at each requested time.  Conditions capture
any required external state (spacecraft, ground stations, ...) at
construction time, so the public API depends only on time.

Hierarchy
---------
:class:`AbstractCondition` (ABC)
├── :class:`SpaceGroundAccessCondition`
├── :class:`SunlightCondition`
├── :class:`SubSatelliteRegionCondition`
├── :class:`VisibilityCondition`
├── :class:`AndCondition`
├── :class:`OrCondition`
├── :class:`NotCondition`
└── :class:`XorCondition`

Boolean composition operators (``&``, ``|``, ``^``, ``~``) are available
on every :class:`AbstractCondition` instance, so that::

    condition1 & (condition2 | condition3)

is equivalent to::

    AndCondition(condition1, OrCondition(condition2, condition3))
"""

from __future__ import annotations

import hashlib
from abc import ABC, abstractmethod
from collections import OrderedDict

import numpy as np
import numpy.typing as npt
import shapely

from ..cache import cached_propagate_analytical
from ..orbit.access import earth_access, space_to_space_access
from ..orbit.frames import eci_to_ecef, ecef_to_eci, geodetic_to_ecef, ecef_to_geodetic
from ..orbit.shadow import in_sunlight


[docs] class AbstractCondition(ABC): """Base class for boolean time-domain conditions. Subclasses implement :meth:`_compute` and :meth:`__repr__`. The base class handles input coercion, scalar/array shape tracking, and a small per-instance count-based LRU cache keyed on the SHA-256 digest of the requested time array. Parameters ---------- cache_size : int, optional Maximum number of distinct time arrays whose results are cached. Default 16. Set to 0 to disable caching. Notes ----- Subclasses can bypass caching entirely by overriding :meth:`at` rather than :meth:`_compute`. Boolean operators ``&``, ``|``, ``^``, ``~`` return new composite conditions: * ``a & b`` → :class:`AndCondition` * ``a | b`` → :class:`OrCondition` * ``a ^ b`` → :class:`XorCondition` * ``~a`` → :class:`NotCondition` """ def __init__(self, cache_size: int = 16) -> None: if cache_size < 0: raise ValueError(f"cache_size must be non-negative, got {cache_size}") self._cache_size = cache_size self._cache: OrderedDict[bytes, npt.NDArray[np.bool_]] = OrderedDict() @abstractmethod def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: """Evaluate the condition at the given times. Parameters ---------- t : ndarray of datetime64[us], shape (N,) Times at which to evaluate the condition. Returns ------- ndarray of bool, shape (N,) """ @abstractmethod def __repr__(self) -> str: ...
[docs] def at(self, t: npt.ArrayLike) -> npt.NDArray[np.bool_]: """Evaluate the condition at one or more times. Parameters ---------- t : array_like of datetime64, shape (N,) or scalar Time(s) at which to evaluate. Returns ------- ndarray of bool, shape (N,) or scalar bool True where the condition holds. """ t_in = np.asarray(t, dtype="datetime64[us]") scalar = t_in.ndim == 0 t_arr = np.atleast_1d(t_in) if self._cache_size > 0: key = hashlib.sha256(t_arr.tobytes()).digest() cached = self._cache.get(key) if cached is not None: self._cache.move_to_end(key) return bool(cached[0]) if scalar else cached result = np.asarray(self._compute(t_arr), dtype=bool) self._cache[key] = result self._cache.move_to_end(key) while len(self._cache) > self._cache_size: self._cache.popitem(last=False) else: result = np.asarray(self._compute(t_arr), dtype=bool) return bool(result[0]) if scalar else result
def __and__(self, other: object) -> AndCondition: if not isinstance(other, AbstractCondition): return NotImplemented return AndCondition(self, other) def __or__(self, other: object) -> OrCondition: if not isinstance(other, AbstractCondition): return NotImplemented return OrCondition(self, other) def __xor__(self, other: object) -> XorCondition: if not isinstance(other, AbstractCondition): return NotImplemented return XorCondition(self, other) def __invert__(self) -> NotCondition: return NotCondition(self)
[docs] def intervals( self, t_start: np.datetime64, t_end: np.datetime64, *, max_step: np.timedelta64 = np.timedelta64(10, "s"), tolerance: np.timedelta64 = np.timedelta64(1, "s"), ) -> list[tuple[np.datetime64, np.datetime64]]: """Return edge-refined intervals where the condition is True. Scans ``[t_start, t_end]`` at ``max_step`` resolution, detects rising/falling edges, then bisects each edge to ``tolerance`` precision. Parameters ---------- t_start : np.datetime64 Start of the time window. t_end : np.datetime64 End of the time window (inclusive). max_step : np.timedelta64, optional Scan step for the initial coarse pass (default 10 s). tolerance : np.timedelta64, optional Bisection refinement tolerance (default 1 s). Returns ------- list[tuple[np.datetime64, np.datetime64]] Sorted, non-overlapping ``[(t0, t1), ...]`` intervals where the condition is True. Empty when the condition is never True. """ t_start = np.asarray(t_start, dtype="datetime64[us]") t_end = np.asarray(t_end, dtype="datetime64[us]") max_step = np.asarray(max_step, dtype="timedelta64[us]") tolerance = np.asarray(tolerance, dtype="timedelta64[us]") total_us = int((t_end - t_start) / np.timedelta64(1, "us")) step_us = int(max_step / np.timedelta64(1, "us")) tol_us = int(tolerance / np.timedelta64(1, "us")) if total_us <= 0 or step_us <= 0: return [] offs = np.arange(0, total_us + 1, step_us, dtype=np.int64) if offs[-1] != total_us: offs = np.append(offs, np.int64(total_us)) t_grid = t_start + offs.astype("timedelta64[us]") flags = self.at(t_grid) if flags.all(): return [(t_start, t_end)] if not flags.any(): return [] padded = np.concatenate([[False], flags, [False]]) rises = np.where(np.diff(padded.astype(np.int8)) == 1)[0] falls = np.where(np.diff(padded.astype(np.int8)) == -1)[0] def _bisect( lo: np.datetime64, hi: np.datetime64, target: bool ) -> np.datetime64: lo_us = int((lo - t_start) / np.timedelta64(1, "us")) hi_us = int((hi - t_start) / np.timedelta64(1, "us")) while (hi_us - lo_us) > tol_us: mid_us = (lo_us + hi_us) // 2 mid_t = t_start + np.timedelta64(mid_us, "us") if bool(self.at(mid_t)) == target: hi_us = mid_us else: lo_us = mid_us return t_start + np.timedelta64(hi_us, "us") result = [] for ri, fi in zip(rises, falls): t0 = t_grid[ri] if ri == 0 else _bisect(t_grid[ri - 1], t_grid[ri], True) t1 = ( t_grid[fi - 1] if fi == len(t_grid) else _bisect(t_grid[fi - 1], t_grid[fi], False) ) result.append((t0, t1)) return result
[docs] class SpaceGroundAccessCondition(AbstractCondition): """True when a spacecraft is visible from a ground station. Visibility is the standard above-horizon test: the elevation angle from the geodetic up-direction at the ground station to the spacecraft must meet or exceed ``el_min_deg``. Earth blockage is implicit for ``el_min_deg >= 0``. Parameters ---------- spacecraft : Spacecraft The spacecraft whose visibility is being tested. ground_station : GroundStation The observing ground station. el_min_deg : float, optional Minimum elevation angle (degrees). Default 5.0. Raises ------ TypeError If ``spacecraft`` is not a :class:`~missiontools.Spacecraft` or ``ground_station`` is not a :class:`~missiontools.GroundStation`. Examples -------- :: from missiontools import Spacecraft, GroundStation from missiontools.condition import SpaceGroundAccessCondition sc = Spacecraft(...) gs = GroundStation(lat=51.5, lon=-0.1) cond = SpaceGroundAccessCondition(sc, gs, el_min_deg=5.0) cond.at(np.datetime64('2025-01-01', 'us')) # -> bool """ def __init__(self, spacecraft, ground_station, el_min_deg: float = 5.0) -> None: from ..spacecraft import Spacecraft from ..ground_station import GroundStation if not isinstance(spacecraft, Spacecraft): raise TypeError( f"spacecraft must be a Spacecraft instance, " f"got {type(spacecraft).__name__!r}" ) if not isinstance(ground_station, GroundStation): raise TypeError( f"ground_station must be a GroundStation instance, " f"got {type(ground_station).__name__!r}" ) if not np.isfinite(el_min_deg): raise ValueError(f"el_min_deg must be finite, got {el_min_deg}") super().__init__() self._sc = spacecraft self._gs = ground_station self._el_min_deg = float(el_min_deg) self._el_min_rad = np.radians(self._el_min_deg) def __repr__(self) -> str: return ( f"SpaceGroundAccessCondition(" f"spacecraft={self._sc!r}, ground_station={self._gs!r}, " f"el_min_deg={self._el_min_deg})" ) def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: r, _ = cached_propagate_analytical( t, **self._sc.keplerian_params, propagator_type=self._sc.propagator_type, ) return earth_access( r, lat=np.radians(self._gs.lat), lon=np.radians(self._gs.lon), alt=self._gs.alt, el_min=self._el_min_rad, frame="eci", t=t, )
[docs] class SunlightCondition(AbstractCondition): """True when the object is in sunlight. Uses a cylindrical shadow model centred on the central body. For a spacecraft the position is propagated analytically; for a ground station the fixed ECEF position is converted to ECI. Parameters ---------- obj : Spacecraft | GroundStation The object whose sunlight status is being tested. Raises ------ TypeError If *obj* is not a :class:`~missiontools.Spacecraft` or :class:`~missiontools.GroundStation`. """ def __init__(self, obj) -> None: from ..spacecraft import Spacecraft from ..ground_station import GroundStation if isinstance(obj, Spacecraft): self._is_sc = True elif isinstance(obj, GroundStation): self._is_sc = False else: raise TypeError( f"obj must be a Spacecraft or GroundStation instance, " f"got {type(obj).__name__!r}" ) super().__init__() self._obj = obj if not self._is_sc: self._gs_ecef = geodetic_to_ecef( np.radians(obj.lat), np.radians(obj.lon), obj.alt, ) def __repr__(self) -> str: return f"SunlightCondition(obj={self._obj!r})" def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: if self._is_sc: r, _ = cached_propagate_analytical( t, **self._obj.keplerian_params, propagator_type=self._obj.propagator_type, ) return in_sunlight(r, t, body_radius=self._obj.central_body_radius) else: r_eci = ecef_to_eci( np.broadcast_to(self._gs_ecef, (len(t), 3)), t, ) return in_sunlight(r_eci, t)
[docs] class SubSatelliteRegionCondition(AbstractCondition): """True when the spacecraft's sub-satellite point falls inside an AoI. Parameters ---------- spacecraft : Spacecraft The spacecraft whose sub-satellite point is tested. aoi : AoI Area of interest **with a geometry** (``aoi.geometry is not None``). Raises ------ TypeError If *spacecraft* is not a :class:`~missiontools.Spacecraft`. ValueError If *aoi* does not have a geometry defined. """ def __init__(self, spacecraft, aoi) -> None: from ..spacecraft import Spacecraft from ..aoi import AoI if not isinstance(spacecraft, Spacecraft): raise TypeError( f"spacecraft must be a Spacecraft instance, " f"got {type(spacecraft).__name__!r}" ) if not isinstance(aoi, AoI): raise TypeError(f"aoi must be an AoI instance, got {type(aoi).__name__!r}") if aoi.geometry is None: raise ValueError( "aoi must have a geometry defined; construct with " "AoI.from_region, AoI.from_shapefile, or AoI.from_geography" ) super().__init__() self._sc = spacecraft self._aoi = aoi def __repr__(self) -> str: return ( f"SubSatelliteRegionCondition(spacecraft={self._sc!r}, aoi={self._aoi!r})" ) def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: r, _ = cached_propagate_analytical( t, **self._sc.keplerian_params, propagator_type=self._sc.propagator_type, ) r_ecef = eci_to_ecef(r, t) lat_rad, lon_rad, _ = ecef_to_geodetic(r_ecef) lat_deg = np.degrees(lat_rad) lon_deg = np.degrees(lon_rad) inside = shapely.contains_xy(self._aoi.geometry, lon_deg, lat_deg) crosses_am = (lon_deg.max() - lon_deg.min()) > 180 if crosses_am: inside |= shapely.contains_xy(self._aoi.geometry, lon_deg + 360.0, lat_deg) inside |= shapely.contains_xy(self._aoi.geometry, lon_deg - 360.0, lat_deg) return inside
[docs] class VisibilityCondition(AbstractCondition): """True when two objects have unobstructed line-of-sight. Earth blockage is modelled as a sphere with radius equal to the spacecraft's ``central_body_radius`` (or the default mean Earth radius when neither object is a spacecraft). Parameters ---------- obj1, obj2 : Spacecraft | GroundStation The two objects whose mutual visibility is tested. At least one should be a :class:`~missiontools.Spacecraft` for meaningful results (ground-station-to-ground-station visibility is almost always blocked by Earth). Raises ------ TypeError If either argument is not a :class:`~missiontools.Spacecraft` or :class:`~missiontools.GroundStation`. """ def __init__(self, obj1, obj2) -> None: from ..spacecraft import Spacecraft from ..ground_station import GroundStation from ..orbit.constants import EARTH_MEAN_RADIUS for name, val in (("obj1", obj1), ("obj2", obj2)): if not isinstance(val, (Spacecraft, GroundStation)): raise TypeError( f"{name} must be a Spacecraft or GroundStation instance, " f"got {type(val).__name__!r}" ) super().__init__() self._obj1 = obj1 self._obj2 = obj2 self._is_sc1 = isinstance(obj1, Spacecraft) self._is_sc2 = isinstance(obj2, Spacecraft) if not self._is_sc1: self._gs1_ecef = geodetic_to_ecef( np.radians(obj1.lat), np.radians(obj1.lon), obj1.alt, ) if not self._is_sc2: self._gs2_ecef = geodetic_to_ecef( np.radians(obj2.lat), np.radians(obj2.lon), obj2.alt, ) self._body_radius = EARTH_MEAN_RADIUS for obj in (obj1, obj2): if isinstance(obj, Spacecraft): self._body_radius = obj.central_body_radius break def __repr__(self) -> str: return f"VisibilityCondition(obj1={self._obj1!r}, obj2={self._obj2!r})" def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: if self._is_sc1: r1, _ = cached_propagate_analytical( t, **self._obj1.keplerian_params, propagator_type=self._obj1.propagator_type, ) else: r1 = ecef_to_eci( np.broadcast_to(self._gs1_ecef, (len(t), 3)), t, ) if self._is_sc2: r2, _ = cached_propagate_analytical( t, **self._obj2.keplerian_params, propagator_type=self._obj2.propagator_type, ) else: r2 = ecef_to_eci( np.broadcast_to(self._gs2_ecef, (len(t), 3)), t, ) return space_to_space_access(r1, r2, body_radius=self._body_radius)
[docs] class AndCondition(AbstractCondition): """True when both child conditions are true (logical AND). Parameters ---------- condition1, condition2 : AbstractCondition Child conditions. Raises ------ TypeError If either argument is not an :class:`AbstractCondition`. """ def __init__( self, condition1: AbstractCondition, condition2: AbstractCondition ) -> None: for name, val in (("condition1", condition1), ("condition2", condition2)): if not isinstance(val, AbstractCondition): raise TypeError( f"{name} must be an AbstractCondition instance, " f"got {type(val).__name__!r}" ) super().__init__(cache_size=0) self._c1 = condition1 self._c2 = condition2 def __repr__(self) -> str: return f"AndCondition({self._c1!r}, {self._c2!r})" def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: return self._c1.at(t) & self._c2.at(t)
[docs] class OrCondition(AbstractCondition): """True when either child condition is true (logical OR). Parameters ---------- condition1, condition2 : AbstractCondition Child conditions. Raises ------ TypeError If either argument is not an :class:`AbstractCondition`. """ def __init__( self, condition1: AbstractCondition, condition2: AbstractCondition ) -> None: for name, val in (("condition1", condition1), ("condition2", condition2)): if not isinstance(val, AbstractCondition): raise TypeError( f"{name} must be an AbstractCondition instance, " f"got {type(val).__name__!r}" ) super().__init__(cache_size=0) self._c1 = condition1 self._c2 = condition2 def __repr__(self) -> str: return f"OrCondition({self._c1!r}, {self._c2!r})" def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: return self._c1.at(t) | self._c2.at(t)
[docs] class NotCondition(AbstractCondition): """True when the child condition is false (logical NOT). Parameters ---------- condition : AbstractCondition Child condition to invert. Raises ------ TypeError If *condition* is not an :class:`AbstractCondition`. """ def __init__(self, condition: AbstractCondition) -> None: if not isinstance(condition, AbstractCondition): raise TypeError( f"condition must be an AbstractCondition instance, " f"got {type(condition).__name__!r}" ) super().__init__(cache_size=0) self._c = condition def __repr__(self) -> str: return f"NotCondition({self._c!r})" def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: return ~self._c.at(t)
[docs] class XorCondition(AbstractCondition): """True when exactly one child condition is true (logical XOR). Parameters ---------- condition1, condition2 : AbstractCondition Child conditions. Raises ------ TypeError If either argument is not an :class:`AbstractCondition`. """ def __init__( self, condition1: AbstractCondition, condition2: AbstractCondition ) -> None: for name, val in (("condition1", condition1), ("condition2", condition2)): if not isinstance(val, AbstractCondition): raise TypeError( f"{name} must be an AbstractCondition instance, " f"got {type(val).__name__!r}" ) super().__init__(cache_size=0) self._c1 = condition1 self._c2 = condition2 def __repr__(self) -> str: return f"XorCondition({self._c1!r}, {self._c2!r})" def _compute(self, t: npt.NDArray[np.datetime64]) -> npt.NDArray[np.bool_]: return self._c1.at(t) ^ self._c2.at(t)