Source code for missiontools.coverage.coverage

import numpy as np
import numpy.typing as npt

from .sampling import (
    sample_aoi,
    sample_region,
    sample_shapefile,
    sample_from_geometry,
    load_shapefile_geometry,
    sample_geography,
    geography_geometry,
)
from .visibility import (
    CoverageConstraints,
    SensorSpec,
    make_sensor_spec,
    make_sensor_spec_from_fov,
    _compute_vis_batch_multi,
    _build_gs,
    _make_offsets,
    collect_access_intervals_multi,
)


[docs] def coverage_fraction_multi( lat: npt.NDArray, lon: npt.NDArray, sensor_specs: list, t_start: np.datetime64, t_end: np.datetime64, alt: float, constraints: CoverageConstraints, max_step: np.timedelta64, batch_size: int, ) -> dict: lat = np.asarray(lat, dtype=np.float64) lon = np.asarray(lon, dtype=np.float64) M = len(lat) if M == 0: raise ValueError("lat/lon arrays must not be empty") gs_ecef, up = _build_gs(lat, lon, alt) offs, t_start_us = _make_offsets(t_start, t_end, max_step) N = len(offs) if N == 0: empty = np.array([], dtype=np.float32) return { "t": np.array([], dtype="datetime64[us]"), "fraction": empty, "cumulative": empty, "mean_fraction": float("nan"), "final_cumulative": float("nan"), } t_out = t_start_us + offs.astype("timedelta64[us]") frac_out = np.empty(N, dtype=np.float32) cum_out = np.empty(N, dtype=np.float32) ever_covered = np.zeros(M, dtype=np.bool_) n_covered = 0 for b0 in range(0, N, batch_size): b1 = min(b0 + batch_size, N) t_batch = t_out[b0:b1] vis = _compute_vis_batch_multi( t_batch, sensor_specs, gs_ecef, up, constraints, ) frac_out[b0:b1] = vis.mean(axis=1) for local_t in range(b1 - b0): new = vis[local_t] & ~ever_covered if new.any(): ever_covered |= new n_covered += int(new.sum()) cum_out[b0 + local_t] = n_covered / M return { "t": t_out, "fraction": frac_out, "cumulative": cum_out, "mean_fraction": float(np.mean(frac_out)), "final_cumulative": float(cum_out[-1]), }
[docs] def pointwise_coverage_multi( lat: npt.NDArray, lon: npt.NDArray, sensor_specs: list, t_start: np.datetime64, t_end: np.datetime64, alt: float, constraints: CoverageConstraints, max_step: np.timedelta64, batch_size: int, ) -> dict: lat = np.asarray(lat, dtype=np.float64) lon = np.asarray(lon, dtype=np.float64) M = len(lat) if M == 0: raise ValueError("lat/lon arrays must not be empty") gs_ecef, up = _build_gs(lat, lon, alt) offs, t_start_us = _make_offsets(t_start, t_end, max_step) N = len(offs) t_out = t_start_us + offs.astype("timedelta64[us]") if N == 0: return { "t": np.array([], dtype="datetime64[us]"), "lat": lat, "lon": lon, "alt": float(alt), "visible": np.empty((0, M), dtype=np.bool_), } visible = np.empty((N, M), dtype=np.bool_) for b0 in range(0, N, batch_size): b1 = min(b0 + batch_size, N) t_batch = t_out[b0:b1] visible[b0:b1] = _compute_vis_batch_multi( t_batch, sensor_specs, gs_ecef, up, constraints, ) return { "t": t_out, "lat": lat, "lon": lon, "alt": float(alt), "visible": visible, }
def _collect_access_intervals( lat: npt.NDArray, lon: npt.NDArray, keplerian_params: dict, t_start: np.datetime64, t_end: np.datetime64, alt: float, constraints: CoverageConstraints, propagator_type: str, max_step: np.timedelta64, batch_size: int, fov_pointing_lvlh, fov_half_angle, *, close_at_end: bool, ) -> list[list[tuple[np.datetime64, np.datetime64]]]: spec = make_sensor_spec( keplerian_params, propagator_type, fov_pointing_lvlh, fov_half_angle ) return collect_access_intervals_multi( lat, lon, [spec], t_start, t_end, alt, constraints, max_step, batch_size, close_at_end=close_at_end, )
[docs] def coverage_fraction( lat: npt.NDArray[np.floating], lon: npt.NDArray[np.floating], keplerian_params: dict, t_start: np.datetime64, t_end: np.datetime64, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, propagator_type: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), batch_size: int = 1_000, fov_pointing_lvlh: npt.NDArray | None = None, fov_half_angle: float | None = None, sza_max: float | None = None, sza_min: float | None = None, ) -> dict: spec = make_sensor_spec( keplerian_params, propagator_type, fov_pointing_lvlh, fov_half_angle ) constraints = CoverageConstraints.from_angles(el_min, sza_max, sza_min) return coverage_fraction_multi( lat, lon, [spec], t_start, t_end, alt, constraints, max_step, batch_size, )
[docs] def revisit_time( lat: npt.NDArray[np.floating], lon: npt.NDArray[np.floating], keplerian_params: dict, t_start: np.datetime64, t_end: np.datetime64, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, propagator_type: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), batch_size: int = 1_000, fov_pointing_lvlh: npt.NDArray | None = None, fov_half_angle: float | None = None, sza_max: float | None = None, sza_min: float | None = None, ) -> dict: lat = np.asarray(lat, dtype=np.float64) lon = np.asarray(lon, dtype=np.float64) M = len(lat) constraints = CoverageConstraints.from_angles(el_min, sza_max, sza_min) intervals = _collect_access_intervals( lat, lon, keplerian_params, t_start, t_end, alt, constraints, propagator_type, max_step, batch_size, fov_pointing_lvlh, fov_half_angle, close_at_end=False, ) max_revisit = np.full(M, np.nan) mean_revisit = np.full(M, np.nan) for m, ivals in enumerate(intervals): if len(ivals) >= 2: gaps_us = np.array( [ int((ivals[i + 1][0] - ivals[i][1]) / np.timedelta64(1, "us")) for i in range(len(ivals) - 1) ], dtype=np.float64, ) max_revisit[m] = gaps_us.max() / 1e6 mean_revisit[m] = gaps_us.mean() / 1e6 has_gaps = ~np.isnan(max_revisit) return { "max_revisit": max_revisit, "mean_revisit": mean_revisit, "global_max": float(np.nanmax(max_revisit)) if has_gaps.any() else float("nan"), "global_mean": float(np.nanmean(mean_revisit)) if has_gaps.any() else float("nan"), }
[docs] def pointwise_coverage( lat: npt.NDArray[np.floating], lon: npt.NDArray[np.floating], keplerian_params: dict, t_start: np.datetime64, t_end: np.datetime64, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, propagator_type: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), batch_size: int = 1_000, fov_pointing_lvlh: npt.NDArray | None = None, fov_half_angle: float | None = None, sza_max: float | None = None, sza_min: float | None = None, ) -> dict: spec = make_sensor_spec( keplerian_params, propagator_type, fov_pointing_lvlh, fov_half_angle ) constraints = CoverageConstraints.from_angles(el_min, sza_max, sza_min) return pointwise_coverage_multi( lat, lon, [spec], t_start, t_end, alt, constraints, max_step, batch_size, )
[docs] def access_pointwise( lat: npt.NDArray[np.floating], lon: npt.NDArray[np.floating], keplerian_params: dict, t_start: np.datetime64, t_end: np.datetime64, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, propagator_type: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), batch_size: int = 1_000, fov_pointing_lvlh: npt.NDArray | None = None, fov_half_angle: float | None = None, sza_max: float | None = None, sza_min: float | None = None, ) -> list[list[tuple[np.datetime64, np.datetime64]]]: constraints = CoverageConstraints.from_angles(el_min, sza_max, sza_min) return _collect_access_intervals( lat, lon, keplerian_params, t_start, t_end, alt, constraints, propagator_type, max_step, batch_size, fov_pointing_lvlh, fov_half_angle, close_at_end=True, )
[docs] def revisit_pointwise( lat: npt.NDArray[np.floating], lon: npt.NDArray[np.floating], keplerian_params: dict, t_start: np.datetime64, t_end: np.datetime64, alt: float | np.floating = 0.0, el_min: float | np.floating = 0.0, propagator_type: str = "twobody", max_step: np.timedelta64 = np.timedelta64(30, "s"), batch_size: int = 1_000, fov_pointing_lvlh: npt.NDArray | None = None, fov_half_angle: float | None = None, sza_max: float | None = None, sza_min: float | None = None, ) -> list[npt.NDArray[np.timedelta64]]: constraints = CoverageConstraints.from_angles(el_min, sza_max, sza_min) intervals = _collect_access_intervals( lat, lon, keplerian_params, t_start, t_end, alt, constraints, propagator_type, max_step, batch_size, fov_pointing_lvlh, fov_half_angle, close_at_end=False, ) result = [] for ivals in intervals: if len(ivals) < 2: result.append(np.array([], dtype="timedelta64[us]")) else: gaps = np.array( [ivals[i + 1][0] - ivals[i][1] for i in range(len(ivals) - 1)], dtype="timedelta64[us]", ) result.append(gaps) return result