Source code for missiontools.orbit.access

import numpy as np
import numpy.typing as npt
from typing import Callable

from .constants import EARTH_MEAN_RADIUS
from .frames import geodetic_to_ecef, eci_to_ecef, geodetic_up
from .propagation import propagate_analytical
from ..cache import cached_propagate_analytical


[docs] def earth_access( vecs: npt.NDArray[np.floating], lat: float | np.floating, lon: float | np.floating, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, frame: str = "eci", t: npt.NDArray[np.datetime64] | None = None, ) -> npt.NDArray[np.bool_]: """Determine which positions are visible from a ground station. A position is considered visible when the elevation angle from the ground station to that position is greater than or equal to ``el_min``. .. note:: Earth blockage is not explicitly checked. For a surface station with ``el_min >= 0``, a positive elevation angle implies the line-of-sight clears the Earth. Sub-zero masks or elevated ground stations may require an additional ray–ellipsoid intersection check. Parameters ---------- vecs : npt.NDArray[np.floating] Position vectors, shape ``(N, 3)`` (m). lat : float | np.floating Ground station geodetic latitude (rad). lon : float | np.floating Ground station longitude (rad). alt : float | np.floating, optional Ground station height above the WGS84 ellipsoid (m). Defaults to 0. el_min : float | np.floating, optional Minimum elevation angle (rad). Defaults to 0 (above the horizon). frame : str, optional Reference frame of ``vecs``: ``'eci'`` (default) or ``'ecef'``. t : npt.NDArray[np.datetime64] | None, optional UTC/UT1 observation times as ``datetime64[us]``, shape ``(N,)``. Required when ``frame='eci'``; ignored when ``frame='ecef'``. Returns ------- npt.NDArray[np.bool_] Boolean array of shape ``(N,)``. ``True`` where the position is visible from the ground station above ``el_min``. Raises ------ ValueError If ``frame='eci'`` and ``t`` is ``None``, or if ``frame`` is not ``'eci'`` or ``'ecef'``. """ if frame == "eci": if t is None: raise ValueError("t must be provided when frame='eci'") vecs_ecef = eci_to_ecef(vecs, t) elif frame == "ecef": vecs_ecef = np.atleast_2d(vecs) else: raise ValueError(f"frame must be 'eci' or 'ecef', got '{frame!r}'") # Ground station ECEF position gs_ecef = geodetic_to_ecef(lat, lon, alt) # (3,) # Outward ellipsoid normal at (lat, lon) — the geodetic "up" direction. # Using the geodetic normal (not gs_ecef / |gs_ecef|) gives elevation # angles consistent with a spirit level at the ground station. up = geodetic_up(lat, lon) # Line-of-sight from ground station to each position los = vecs_ecef - gs_ecef # (N, 3) los_unit = los / np.linalg.norm(los, axis=1, keepdims=True) # Elevation = arcsin of the component of los_unit along "up" sin_el = los_unit @ up # (N,) el = np.arcsin(np.clip(sin_el, -1.0, 1.0)) return el >= el_min
# --------------------------------------------------------------------------- # Internal shared interval-finding helper # --------------------------------------------------------------------------- def _find_intervals( access_fn: Callable[[npt.NDArray[np.int64]], npt.NDArray[np.bool_]], t_start: np.datetime64, t_end: np.datetime64, max_step: np.timedelta64, refine_tol: np.timedelta64, batch_size: int, ) -> list[tuple[np.datetime64, np.datetime64]]: """Find contiguous access intervals given a boolean access function. Parameters ---------- access_fn : callable Accepts a 1-D array of µs-offsets from *t_start* and returns a boolean array of the same length indicating access at each offset. t_start, t_end : np.datetime64 Search window. max_step : np.timedelta64 Coarse-scan step size. refine_tol : np.timedelta64 Binary-search convergence tolerance for edge times. batch_size : int Number of scan steps per propagation batch. Returns ------- list[tuple[np.datetime64, np.datetime64]] ``(start, end)`` pairs for each continuous access window. """ t_start = np.asarray(t_start, dtype="datetime64[us]") t_end = np.asarray(t_end, dtype="datetime64[us]") total_us = int((t_end - t_start) / np.timedelta64(1, "us")) step_us = int(max_step / np.timedelta64(1, "us")) tol_us = int(refine_tol / np.timedelta64(1, "us")) if total_us <= 0 or step_us <= 0: return [] # Scan offsets (µs from t_start), always including t_end exactly offsets = np.arange(0, total_us + 1, step_us, dtype=np.int64) if offsets[-1] < total_us: offsets = np.append(offsets, np.int64(total_us)) n_total = len(offsets) def t_at(off: int) -> np.datetime64: return t_start + np.timedelta64(int(off), "us") def _refine_vectorized( transitions: list[tuple[int, int, bool]], ) -> list[int]: """Batch binary search: refine all transitions simultaneously.""" if not transitions: return [] tol = max(tol_us, 1) los = np.array([t[0] for t in transitions], dtype=np.int64) his = np.array([t[1] for t in transitions], dtype=np.int64) rising = np.array([t[2] for t in transitions], dtype=np.bool_) while np.any(his - los > tol): active = his - los > tol mids = los + (his - los) // 2 active_idx = np.where(active)[0] active_mids = mids[active_idx] flags = access_fn(active_mids) match = rising[active_idx] == flags his[active_idx[match]] = active_mids[match] los[active_idx[~match]] = active_mids[~match] return [ int(his[j]) if rising[j] else int(los[j]) for j in range(len(transitions)) ] # --- batched coarse scan --- intervals: list[tuple[np.datetime64, np.datetime64]] = [] interval_start_us: int | None = None prev_flag: bool | None = None prev_offset: int = 0 pending_transitions: list[tuple[int, int, bool]] = [] for batch_start in range(0, n_total, batch_size): batch_end = min(batch_start + batch_size, n_total) batch_offs = offsets[batch_start:batch_end] flags = access_fn(batch_offs) if prev_flag is None: prev_flag = bool(flags[0]) prev_offset = int(batch_offs[0]) if prev_flag: interval_start_us = prev_offset batch_offs = batch_offs[1:] flags = flags[1:] if len(batch_offs) == 0: continue prev_and_flags = np.concatenate([[prev_flag], flags]) change_k = np.where(prev_and_flags[:-1] != prev_and_flags[1:])[0] for k in change_k: lo = prev_offset if k == 0 else int(batch_offs[k - 1]) hi = int(batch_offs[k]) rising = not bool(prev_and_flags[k]) pending_transitions.append((lo, hi, rising)) prev_flag = bool(flags[-1]) prev_offset = int(batch_offs[-1]) # --- vectorized refinement of all transitions at once --- refined = _refine_vectorized(pending_transitions) for idx, (_, _, rising) in enumerate(pending_transitions): if rising: interval_start_us = refined[idx] else: if interval_start_us is not None: intervals.append((t_at(interval_start_us), t_at(refined[idx]))) interval_start_us = None # Close any interval still open at t_end if interval_start_us is not None: intervals.append((t_at(interval_start_us), t_end)) return intervals # --------------------------------------------------------------------------- # Ground access # ---------------------------------------------------------------------------
[docs] def earth_access_intervals( t_start: np.datetime64, t_end: np.datetime64, keplerian_params: dict, lat: float | np.floating, lon: float | np.floating, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, propagator_type: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), refine_tol: np.timedelta64 = np.timedelta64(1, "s"), batch_size: int = 10_000, ) -> list[tuple[np.datetime64, np.datetime64]]: """Find time intervals when a satellite is visible from a ground station. Performs a coarse scan at ``max_step`` cadence to detect access windows, then refines each rising/falling edge with binary search to within ``refine_tol``. .. warning:: Passes shorter than ``max_step`` may be missed entirely. Set ``max_step`` to at most half the shortest expected pass duration. Parameters ---------- t_start : np.datetime64 Start of the search window (``datetime64[us]``). t_end : np.datetime64 End of the search window (``datetime64[us]``). keplerian_params : dict Orbital elements at epoch. Must contain the keys ``epoch``, ``a``, ``e``, ``i``, ``arg_p``, ``raan``, ``ma``. Optionally ``central_body_mu``, ``central_body_j2``, ``central_body_radius``. lat : float | np.floating Ground station geodetic latitude (rad). lon : float | np.floating Ground station longitude (rad). alt : float | np.floating, optional Ground station height above the WGS84 ellipsoid (m). Defaults to 0. el_min : float | np.floating, optional Minimum elevation angle (rad). Defaults to 0 (horizon). propagator_type : str, optional ``'twobody'`` (default) or ``'j2'``. max_step : np.timedelta64, optional Maximum coarse scan step size. Defaults to 30 s. refine_tol : np.timedelta64, optional Binary-search convergence tolerance for edge times. Defaults to 1 s. batch_size : int, optional Number of scan steps per propagation batch. Limits peak memory usage to roughly ``batch_size × 24`` bytes. Defaults to 10 000. Returns ------- list[tuple[np.datetime64, np.datetime64]] List of ``(start, end)`` pairs in ``datetime64[us]``, one per continuous access window. Empty list if no access occurs. """ t_start = np.asarray(t_start, dtype="datetime64[us]") def _access_batch(off_arr: npt.NDArray[np.int64]) -> npt.NDArray[np.bool_]: t_arr = t_start + off_arr.astype("timedelta64[us]") r, _ = cached_propagate_analytical( t_arr, **keplerian_params, propagator_type=propagator_type ) return earth_access(r, lat, lon, alt, el_min, frame="eci", t=t_arr) return _find_intervals( _access_batch, t_start, t_end, max_step, refine_tol, batch_size )
# --------------------------------------------------------------------------- # Space-to-space access # --------------------------------------------------------------------------- def _los_clear( r1_2d: npt.NDArray[np.floating], r2_2d: npt.NDArray[np.floating], body_radius: float, ) -> npt.NDArray[np.bool_]: """True where the line segment r1→r2 clears the spherical central body. Parameters ---------- r1_2d, r2_2d : npt.NDArray[np.floating], shape ``(N, 3)`` Position arrays with the central body centre at the origin. body_radius : float Radius of the obstructing sphere (m). Returns ------- npt.NDArray[np.bool_], shape ``(N,)`` """ d = r2_2d - r1_2d # (N, 3) d2 = np.einsum("ni,ni->n", d, d) # |d|², (N,) # Guard against coincident points (d2 == 0): treat t* = 0, closest = r1. safe = np.where(d2 > 0, d2, 1.0) t_star = np.clip(-np.einsum("ni,ni->n", r1_2d, d) / safe, 0.0, 1.0) closest = r1_2d + t_star[:, np.newaxis] * d # (N, 3) dist2 = np.einsum("ni,ni->n", closest, closest) # (N,) return dist2 >= body_radius**2
[docs] def space_to_space_access( r1: npt.NDArray[np.floating], r2: npt.NDArray[np.floating], body_radius: float = EARTH_MEAN_RADIUS, ) -> npt.NDArray[np.bool_]: """Determine which positions have an unobstructed line-of-sight. Checks whether the straight-line segment between each pair of positions clears the central body, modelled as a sphere of radius ``body_radius``. Both position arrays must be in the same reference frame with the central body centre at the origin (ECI and ECEF both satisfy this). .. note:: A spherical obstruction model is used. The maximum error relative to the WGS84 ellipsoid is ≤ 21 km (< 0.4 % of a typical orbit radius), which is negligible for access analysis. **Choosing** ``body_radius``: - ``EARTH_MEAN_RADIUS`` (default, 6 371 008 m) — best representative average; minimises neither false positives nor false negatives. - ``EARTH_SEMI_MAJOR_AXIS`` (6 378 137 m) — *conservative* for equatorial and mid-latitude paths. A larger sphere is more likely to flag LOS as blocked near the Earth's limb, so this choice under-reports access (safe side for link planning). - A smaller radius such as ``EARTH_SEMI_MINOR_AXIS`` — conservative in the opposite direction; may slightly over-report access near the poles. Parameters ---------- r1 : npt.NDArray[np.floating] First spacecraft position(s) (m), shape ``(N, 3)`` or ``(3,)``. r2 : npt.NDArray[np.floating] Second spacecraft position(s) (m), shape ``(N, 3)`` or ``(3,)``. Must be matched element-wise with ``r1``. body_radius : float, optional Radius of the obstructing central body sphere (m). Default: ``EARTH_MEAN_RADIUS`` (6 371 008 m). Returns ------- npt.NDArray[np.bool_] Shape ``(N,)`` — ``True`` where the LOS is not blocked by the central body. Returns a plain ``bool`` for scalar ``(3,)`` inputs. """ r1a = np.asarray(r1, dtype=np.float64) r2a = np.asarray(r2, dtype=np.float64) scalar = r1a.ndim == 1 result = _los_clear(np.atleast_2d(r1a), np.atleast_2d(r2a), body_radius) return bool(result[0]) if scalar else result
[docs] def space_to_space_access_intervals( t_start: np.datetime64, t_end: np.datetime64, keplerian_params_1: dict, keplerian_params_2: dict, body_radius: float = EARTH_MEAN_RADIUS, propagator_type_1: str = "twobody", propagator_type_2: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), refine_tol: np.timedelta64 = np.timedelta64(1, "s"), batch_size: int = 10_000, ) -> list[tuple[np.datetime64, np.datetime64]]: """Find time intervals when two spacecraft have unobstructed line-of-sight. Performs a coarse scan at ``max_step`` cadence to detect access windows, then refines each rising/falling edge with binary search to within ``refine_tol``. .. warning:: Passes shorter than ``max_step`` may be missed entirely. Set ``max_step`` to at most half the shortest expected pass duration. Parameters ---------- t_start : np.datetime64 Start of the search window (``datetime64[us]``). t_end : np.datetime64 End of the search window (``datetime64[us]``). keplerian_params_1 : dict Orbital elements of spacecraft 1. Same format as :func:`earth_access_intervals`. keplerian_params_2 : dict Orbital elements of spacecraft 2. body_radius : float, optional Radius of the obstructing central body sphere (m). Default: ``EARTH_MEAN_RADIUS``. propagator_type_1 : str, optional Propagator for spacecraft 1: ``'twobody'`` (default) or ``'j2'``. propagator_type_2 : str, optional Propagator for spacecraft 2: ``'twobody'`` (default) or ``'j2'``. max_step : np.timedelta64, optional Maximum coarse scan step size. Defaults to 30 s. refine_tol : np.timedelta64, optional Binary-search convergence tolerance for edge times. Defaults to 1 s. batch_size : int, optional Number of scan steps per propagation batch. Defaults to 10 000. Returns ------- list[tuple[np.datetime64, np.datetime64]] List of ``(start, end)`` pairs in ``datetime64[us]``, one per continuous access window. Empty list if no access occurs. """ t_start = np.asarray(t_start, dtype="datetime64[us]") def _access_batch(off_arr: npt.NDArray[np.int64]) -> npt.NDArray[np.bool_]: t_arr = t_start + off_arr.astype("timedelta64[us]") r1, _ = cached_propagate_analytical( t_arr, **keplerian_params_1, propagator_type=propagator_type_1 ) r2, _ = cached_propagate_analytical( t_arr, **keplerian_params_2, propagator_type=propagator_type_2 ) return space_to_space_access(r1, r2, body_radius) return _find_intervals( _access_batch, t_start, t_end, max_step, refine_tol, batch_size )