import re as _re
from pathlib import Path
import numpy as np
import numpy.typing as npt
from matplotlib.path import Path as _MplPath
from ..orbit.frames import geodetic_to_ecef, eci_to_ecef, lvlh_to_eci, sun_vec_eci
from ..orbit.propagation import propagate_analytical
from ..cache import cached_propagate_analytical
from ..orbit.constants import EARTH_MEAN_RADIUS
# ---------------------------------------------------------------------------
# Bundled Natural Earth geodata paths
# ---------------------------------------------------------------------------
_GEODATA_DIR = Path(__file__).parent / 'geodata'
_NE_ADM0 = _GEODATA_DIR / 'ne_map_units' / 'ne_50m_admin_0_map_units.shp'
_NE_ADM1 = _GEODATA_DIR / 'ne_states_provinces' / 'ne_50m_admin_1_states_provinces.shp'
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _fibonacci_sphere(n: int) -> tuple[npt.NDArray, npt.NDArray]:
"""Equal-area Fibonacci lattice: n points on the unit sphere (radians)."""
if n == 1:
return np.array([0.0]), np.array([0.0])
phi = (1.0 + np.sqrt(5.0)) / 2.0
i = np.arange(n, dtype=np.float64)
lat = np.arcsin(np.clip(2.0 * i / (n - 1) - 1.0, -1.0, 1.0))
lon = (2.0 * np.pi * i / phi) % (2.0 * np.pi) - np.pi # → (−π, π]
return lat, lon
def _pip(polygon: npt.NDArray,
lat: npt.NDArray,
lon: npt.NDArray) -> npt.NDArray[np.bool_]:
"""Planar point-in-polygon test in lat/lon space (radians)."""
# MplPath uses (x, y) = (lon, lat)
path = _MplPath(polygon[:, ::-1])
return path.contains_points(np.column_stack([lon, lat]))
def _build_gs(lat: npt.NDArray,
lon: npt.NDArray,
alt: float | npt.NDArray,
) -> tuple[npt.NDArray, npt.NDArray]:
"""Return ground-point ECEF positions (M,3) and geodetic up-vectors (M,3)."""
lat = np.asarray(lat, dtype=np.float64)
lon = np.asarray(lon, dtype=np.float64)
gs_ecef = geodetic_to_ecef(lat, lon, alt) # (M, 3)
up = np.stack([np.cos(lat) * np.cos(lon),
np.cos(lat) * np.sin(lon),
np.sin(lat)], axis=-1) # (M, 3)
return gs_ecef, up
def _sun_ecef(t_arr: npt.NDArray) -> npt.NDArray: # (T, 3)
"""Sun unit vector in ECEF at each timestep."""
return eci_to_ecef(sun_vec_eci(t_arr), t_arr)
def _parse_constraints(
fov_pointing_lvlh, fov_half_angle, sza_max, sza_min,
) -> tuple:
"""Validate FOV and SZA parameters; return precomputed values."""
_fov_given = (fov_pointing_lvlh is not None, fov_half_angle is not None)
if any(_fov_given) and not all(_fov_given):
raise ValueError("fov_pointing_lvlh and fov_half_angle must both be "
"provided together")
use_fov = all(_fov_given)
pointing_lvlh_norm = cos_fov = None
if use_fov:
pointing_lvlh_norm = (np.asarray(fov_pointing_lvlh, dtype=np.float64)
/ np.linalg.norm(fov_pointing_lvlh))
cos_fov = float(np.cos(fov_half_angle))
use_sza = sza_max is not None or sza_min is not None
cos_sza_max_ = float(np.cos(sza_max)) if sza_max is not None else None
cos_sza_min_ = float(np.cos(sza_min)) if sza_min is not None else None
return use_fov, pointing_lvlh_norm, cos_fov, use_sza, cos_sza_max_, cos_sza_min_
[docs]
def make_sensor_spec(
keplerian_params: dict,
propagator_type: str,
fov_pointing_lvlh: npt.NDArray | None,
fov_half_angle: float | None,
) -> tuple:
"""Convert per-sensor params into a compact spec tuple.
Returns ``(keplerian_params, propagator_type, use_fov, pointing_lvlh_norm, cos_fov)``.
"""
_fov_given = (fov_pointing_lvlh is not None, fov_half_angle is not None)
if any(_fov_given) and not all(_fov_given):
raise ValueError("fov_pointing_lvlh and fov_half_angle must both be "
"provided together")
if fov_pointing_lvlh is not None and fov_half_angle is not None:
pointing_norm = (np.asarray(fov_pointing_lvlh, dtype=np.float64)
/ np.linalg.norm(fov_pointing_lvlh))
cos_fov = float(np.cos(fov_half_angle))
use_fov = True
else:
pointing_norm = None
cos_fov = None
use_fov = False
return (keplerian_params, propagator_type, use_fov, pointing_norm, cos_fov)
def _compute_vis_batch_multi(
t_batch: npt.NDArray,
sensor_specs: list,
gs_ecef: npt.NDArray,
up: npt.NDArray,
sin_el_min: float,
use_sza: bool,
cos_sza_max_: float | None,
cos_sza_min_: float | None,
) -> npt.NDArray[np.bool_]: # (T, M)
"""Combined visibility for multiple sensors (union of FOVs).
For each sensor spec, propagates its spacecraft and computes elevation + FOV
visibility. The per-sensor results are OR-reduced; then the global SZA
constraint (if any) is applied once to the union.
"""
T = len(t_batch)
M = gs_ecef.shape[0]
vis = np.zeros((T, M), dtype=np.bool_)
for kp, prop_type, use_fov, pointing_norm, cos_fov in sensor_specs:
r, v = cached_propagate_analytical(t_batch, **kp, propagator_type=prop_type)
pt_ecef = (_pointing_ecef(pointing_norm, r, v, t_batch)
if use_fov else None)
vis |= _visibility(r, t_batch, gs_ecef, up, sin_el_min,
pointing_ecef=pt_ecef,
cos_fov=cos_fov if use_fov else None)
if use_sza:
sun_e = _sun_ecef(t_batch)
cos_sza = np.einsum('ti,mi->tm', sun_e, up)
if cos_sza_max_ is not None:
vis &= cos_sza >= cos_sza_max_
if cos_sza_min_ is not None:
vis &= cos_sza <= cos_sza_min_
return vis
def _compute_vis_batch(
t_batch,
keplerian_params: dict,
propagator_type: str,
gs_ecef: npt.NDArray,
up: npt.NDArray,
sin_el_min: float,
use_fov: bool,
pointing_lvlh_norm: npt.NDArray | None,
cos_fov: float | None,
use_sza: bool,
cos_sza_max_: float | None,
cos_sza_min_: float | None,
) -> npt.NDArray[np.bool_]: # (T, M)
"""Propagate one batch and return the visibility matrix (single sensor)."""
spec = (keplerian_params, propagator_type, use_fov, pointing_lvlh_norm, cos_fov)
return _compute_vis_batch_multi(t_batch, [spec], gs_ecef, up, sin_el_min,
use_sza, cos_sza_max_, cos_sza_min_)
def _pointing_ecef(pointing_lvlh: npt.NDArray, # (3,) unit vector in LVLH
r_eci: npt.NDArray, # (T, 3)
v_eci: npt.NDArray, # (T, 3)
t_arr: npt.NDArray, # (T,) datetime64[us]
) -> npt.NDArray: # (T, 3)
"""Convert a fixed LVLH pointing direction to ECEF at each timestep."""
T = len(t_arr)
p_eci = lvlh_to_eci(np.tile(pointing_lvlh, (T, 1)), r_eci, v_eci)
return eci_to_ecef(p_eci, t_arr)
def _visibility(r_eci: npt.NDArray, # (T, 3)
t_arr: npt.NDArray, # (T,) datetime64[us]
gs_ecef: npt.NDArray, # (M, 3)
up: npt.NDArray, # (M, 3)
sin_el_min: float,
pointing_ecef: npt.NDArray | None = None, # (T, 3)
cos_fov: float | None = None,
sun_ecef: npt.NDArray | None = None, # (T, 3)
cos_sza_max: float | None = None,
cos_sza_min: float | None = None,
) -> npt.NDArray[np.bool_]: # (T, M)
"""Vectorised visibility: T satellite positions × M ground points.
Optional constraints (each ANDed with the elevation mask):
* ``pointing_ecef`` / ``cos_fov`` — sensor FOV cone in ECEF
* ``sun_ecef`` / ``cos_sza_max`` / ``cos_sza_min`` — solar zenith angle
"""
r_ecef = eci_to_ecef(r_eci, t_arr) # (T, 3)
los = r_ecef[:, np.newaxis, :] - gs_ecef[np.newaxis, :, :] # (T, M, 3)
norm = np.maximum(np.linalg.norm(los, axis=2, keepdims=True), 1e-10)
los_hat = los / norm # (T, M, 3)
sin_el = np.einsum('tmi,mi->tm', los_hat, up) # (T, M)
vis = sin_el >= sin_el_min # (T, M)
if pointing_ecef is not None:
# dot(sat→ground, pointing) = dot(-los_hat, pointing_ecef)
fov_cos = np.einsum('tmi,ti->tm', -los_hat, pointing_ecef) # (T, M)
vis &= fov_cos >= cos_fov
if sun_ecef is not None:
# cos(SZA) = dot(sun_ecef, up); cos decreasing so ≤ SZA ↔ ≥ cos
cos_sza = np.einsum('ti,mi->tm', sun_ecef, up) # (T, M)
if cos_sza_max is not None:
vis &= cos_sza >= cos_sza_max
if cos_sza_min is not None:
vis &= cos_sza <= cos_sza_min
return vis
def _make_offsets(t_start: np.datetime64,
t_end: np.datetime64,
max_step: np.timedelta64,
) -> tuple[npt.NDArray[np.int64], np.datetime64]:
"""Integer µs offsets from t_start, always including t_end."""
t_start = np.asarray(t_start, dtype='datetime64[us]')
t_end = np.asarray(t_end, dtype='datetime64[us]')
total_us = int((t_end - t_start) / np.timedelta64(1, 'us'))
step_us = int(max_step / np.timedelta64(1, 'us'))
if total_us <= 0 or step_us <= 0:
return np.array([], dtype=np.int64), t_start
offs = np.arange(0, total_us + 1, step_us, dtype=np.int64)
if offs[-1] != total_us:
offs = np.append(offs, np.int64(total_us))
return offs, t_start
def _detect_transitions(
vis_batch: npt.NDArray[np.bool_], # (T, M)
in_access: npt.NDArray[np.bool_], # (M,)
t_batch: npt.NDArray, # (T,) datetime64[us]
) -> list[tuple[np.datetime64, int, bool]]:
"""Return time-ordered ``(t, point_idx, is_rising)`` for every state change."""
augmented = np.vstack([in_access[np.newaxis],
vis_batch.astype(np.int8)]).astype(np.int8)
diffs = np.diff(augmented, axis=0) # (T, M) +1=AOS -1=LOS
rising_t, rising_m = np.where(diffs > 0)
falling_t, falling_m = np.where(diffs < 0)
all_t = np.concatenate([t_batch[rising_t], t_batch[falling_t]])
all_m = np.concatenate([rising_m, falling_m])
all_r = np.concatenate([np.ones(len(rising_t), dtype=bool),
np.zeros(len(falling_t), dtype=bool)])
order = np.argsort(all_t, kind='stable')
return [(all_t[k], int(all_m[k]), bool(all_r[k])) for k in order]
[docs]
def collect_access_intervals_multi(
lat: npt.NDArray,
lon: npt.NDArray,
sensor_specs: list,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float,
el_min: float,
sza_max,
sza_min,
max_step: np.timedelta64,
batch_size: int,
*,
close_at_end: bool,
) -> list[list[tuple[np.datetime64, np.datetime64]]]:
"""Collect per-point ``(AOS, LOS)`` intervals for multiple sensors.
Visibility at each timestep is the union of all sensor FOVs; SZA is
applied globally after the union.
Parameters
----------
close_at_end : bool
If ``True``, any interval still open at ``t_end`` is closed with
``t_end``. If ``False``, open-ended intervals are discarded.
"""
use_sza = sza_max is not None or sza_min is not None
cos_sza_max_ = float(np.cos(sza_max)) if sza_max is not None else None
cos_sza_min_ = float(np.cos(sza_min)) if sza_min is not None else None
lat = np.asarray(lat, dtype=np.float64)
lon = np.asarray(lon, dtype=np.float64)
M = len(lat)
if M == 0:
raise ValueError("lat/lon arrays must not be empty")
gs_ecef, up = _build_gs(lat, lon, alt)
sin_el_min = float(np.sin(el_min))
offs, t_start_us = _make_offsets(t_start, t_end, max_step)
N = len(offs)
intervals: list[list[tuple[np.datetime64, np.datetime64]]] = [[] for _ in range(M)]
current_aos: list[np.datetime64 | None] = [None] * M
in_access = np.zeros(M, dtype=np.bool_)
if N == 0:
return intervals
t_out = t_start_us + offs.astype('timedelta64[us]')
for b0 in range(0, N, batch_size):
b1 = min(b0 + batch_size, N)
t_batch = t_out[b0:b1]
vis = _compute_vis_batch_multi(t_batch, sensor_specs,
gs_ecef, up, sin_el_min,
use_sza, cos_sza_max_, cos_sza_min_)
for t_evt, m, is_rising in _detect_transitions(vis, in_access, t_batch):
if is_rising:
current_aos[m] = t_evt
else:
if current_aos[m] is not None:
intervals[m].append((current_aos[m], t_evt))
current_aos[m] = None
in_access = vis[-1]
if close_at_end:
t_end_dt = np.datetime64(t_end, 'us')
for m in range(M):
if in_access[m] and current_aos[m] is not None:
intervals[m].append((current_aos[m], t_end_dt))
return intervals
def _collect_access_intervals(
lat: npt.NDArray,
lon: npt.NDArray,
keplerian_params: dict,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float,
el_min: float,
propagator_type: str,
max_step: np.timedelta64,
batch_size: int,
fov_pointing_lvlh, fov_half_angle,
sza_max, sza_min,
*,
close_at_end: bool,
) -> list[list[tuple[np.datetime64, np.datetime64]]]:
"""Collect per-point ``(AOS, LOS)`` ``datetime64[us]`` interval pairs (single sensor).
Delegates to :func:`collect_access_intervals_multi` with a one-element
sensor spec list.
"""
spec = make_sensor_spec(keplerian_params, propagator_type,
fov_pointing_lvlh, fov_half_angle)
return collect_access_intervals_multi(
lat, lon, [spec], t_start, t_end, alt, el_min,
sza_max, sza_min, max_step, batch_size, close_at_end=close_at_end,
)
[docs]
def coverage_fraction_multi(
lat: npt.NDArray,
lon: npt.NDArray,
sensor_specs: list,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float,
el_min: float,
sza_max,
sza_min,
max_step: np.timedelta64,
batch_size: int,
) -> dict:
"""Multi-sensor implementation of :func:`coverage_fraction`."""
use_sza = sza_max is not None or sza_min is not None
cos_sza_max_ = float(np.cos(sza_max)) if sza_max is not None else None
cos_sza_min_ = float(np.cos(sza_min)) if sza_min is not None else None
lat = np.asarray(lat, dtype=np.float64)
lon = np.asarray(lon, dtype=np.float64)
M = len(lat)
if M == 0:
raise ValueError("lat/lon arrays must not be empty")
gs_ecef, up = _build_gs(lat, lon, alt)
sin_el_min = float(np.sin(el_min))
offs, t_start_us = _make_offsets(t_start, t_end, max_step)
N = len(offs)
if N == 0:
empty = np.array([], dtype=np.float32)
return {
't': np.array([], dtype='datetime64[us]'),
'fraction': empty, 'cumulative': empty,
'mean_fraction': float('nan'), 'final_cumulative': float('nan'),
}
t_out = t_start_us + offs.astype('timedelta64[us]')
frac_out = np.empty(N, dtype=np.float32)
cum_out = np.empty(N, dtype=np.float32)
ever_covered = np.zeros(M, dtype=np.bool_)
n_covered = 0
for b0 in range(0, N, batch_size):
b1 = min(b0 + batch_size, N)
t_batch = t_out[b0:b1]
vis = _compute_vis_batch_multi(t_batch, sensor_specs,
gs_ecef, up, sin_el_min,
use_sza, cos_sza_max_, cos_sza_min_)
frac_out[b0:b1] = vis.mean(axis=1)
for local_t in range(b1 - b0):
new = vis[local_t] & ~ever_covered
if new.any():
ever_covered |= new
n_covered += int(new.sum())
cum_out[b0 + local_t] = n_covered / M
return {
't': t_out,
'fraction': frac_out,
'cumulative': cum_out,
'mean_fraction': float(np.mean(frac_out)),
'final_cumulative': float(cum_out[-1]),
}
[docs]
def pointwise_coverage_multi(
lat: npt.NDArray,
lon: npt.NDArray,
sensor_specs: list,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float,
el_min: float,
sza_max,
sza_min,
max_step: np.timedelta64,
batch_size: int,
) -> dict:
"""Multi-sensor implementation of :func:`pointwise_coverage`."""
use_sza = sza_max is not None or sza_min is not None
cos_sza_max_ = float(np.cos(sza_max)) if sza_max is not None else None
cos_sza_min_ = float(np.cos(sza_min)) if sza_min is not None else None
lat = np.asarray(lat, dtype=np.float64)
lon = np.asarray(lon, dtype=np.float64)
M = len(lat)
if M == 0:
raise ValueError("lat/lon arrays must not be empty")
gs_ecef, up = _build_gs(lat, lon, alt)
sin_el_min = float(np.sin(el_min))
offs, t_start_us = _make_offsets(t_start, t_end, max_step)
N = len(offs)
t_out = t_start_us + offs.astype('timedelta64[us]')
if N == 0:
return {
't': np.array([], dtype='datetime64[us]'),
'lat': lat,
'lon': lon,
'alt': float(alt),
'visible': np.empty((0, M), dtype=np.bool_),
}
visible = np.empty((N, M), dtype=np.bool_)
for b0 in range(0, N, batch_size):
b1 = min(b0 + batch_size, N)
t_batch = t_out[b0:b1]
visible[b0:b1] = _compute_vis_batch_multi(
t_batch, sensor_specs, gs_ecef, up, sin_el_min,
use_sza, cos_sza_max_, cos_sza_min_,
)
return {
't': t_out,
'lat': lat,
'lon': lon,
'alt': float(alt),
'visible': visible,
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def sample_aoi(
polygon: npt.NDArray[np.floating],
n: int,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
"""Sample *n* approximately equal-area points inside an AoI polygon.
Uses a Fibonacci lattice to generate a near-uniform global distribution,
then filters to the points inside the polygon. The returned count may
differ slightly from *n* depending on AoI shape; use a larger *n* for
denser or irregular regions.
.. note::
The polygon test is planar (lat/lon space in radians). Results are
accurate for regions that do not span the anti-meridian or enclose a
pole. Split such regions into separate polygons and combine samples.
Parameters
----------
polygon : npt.NDArray[np.floating]
Polygon vertices, shape ``(V, 2)``, each row ``[lat, lon]`` in
**radians**.
n : int
Target number of sample points.
Returns
-------
lat : npt.NDArray[np.floating]
Sample latitudes (rad), shape ``(M,)``.
lon : npt.NDArray[np.floating]
Sample longitudes (rad), shape ``(M,)``.
"""
polygon = np.asarray(polygon, dtype=np.float64)
if polygon.ndim != 2 or polygon.shape[1] != 2:
raise ValueError("polygon must have shape (V, 2) with columns [lat, lon]")
if n < 1:
raise ValueError(f"n must be at least 1, got {n}")
# Estimate AoI fraction from a pilot lattice
n_pilot = max(n * 10, 5_000)
lat_p, lon_p = _fibonacci_sphere(n_pilot)
frac = float(_pip(polygon, lat_p, lon_p).mean())
if frac < 1e-6:
raise ValueError(
"AoI polygon encloses too few global lattice points — "
"check that coordinates are in radians and the polygon is not "
"degenerate."
)
# Generate enough global points to get approximately n inside
n_global = int(np.ceil(n / frac * 1.3))
lat_all, lon_all = _fibonacci_sphere(n_global)
inside = _pip(polygon, lat_all, lon_all)
lat_in = lat_all[inside]
lon_in = lon_all[inside]
# Evenly subsample if we ended up with more than n points
if len(lat_in) > n:
idx = np.round(np.linspace(0, len(lat_in) - 1, n)).astype(int)
lat_in = lat_in[idx]
lon_in = lon_in[idx]
return lat_in, lon_in
[docs]
def sample_region(
lat_min: float | None = None,
lat_max: float | None = None,
lon_min: float | None = None,
lon_max: float | None = None,
point_density: float = 1e11,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
"""Sample an approximately equal-area Fibonacci lattice over a lat/lon band.
A convenience wrapper around the Fibonacci lattice for rectangular
regions. The number of points is derived from the spherical zone area
and the requested ``point_density``, so the grid automatically becomes
denser for small regions and sparser for large ones.
Unlike :func:`sample_aoi`, this function filters the lattice directly by
coordinate bounds, so it handles anti-meridian-crossing regions and global
or banded coverage correctly.
Parameters
----------
lat_min : float | None, optional
Southern boundary (rad). ``None`` extends to the South Pole (−π/2).
lat_max : float | None, optional
Northern boundary (rad). ``None`` extends to the North Pole (+π/2).
lon_min : float | None, optional
Western boundary (rad). Must be paired with ``lon_max``; ``None``
(together with ``lon_max=None``) includes all longitudes.
lon_max : float | None, optional
Eastern boundary (rad). Must be paired with ``lon_min``; ``None``
(together with ``lon_min=None``) includes all longitudes.
May be less than ``lon_min`` to indicate a region that crosses the
anti-meridian (e.g. ``lon_min=np.radians(170)``,
``lon_max=np.radians(-170)``).
point_density : float, optional
Approximate area represented by each sample point (m²).
Defaults to 1×10¹¹ m² (~100 000 km² per point, ~5 100 points
globally).
Returns
-------
lat : npt.NDArray[np.floating]
Sample latitudes (rad), shape ``(M,)``.
lon : npt.NDArray[np.floating]
Sample longitudes (rad), shape ``(M,)``.
Raises
------
ValueError
If exactly one of ``lon_min`` / ``lon_max`` is ``None``, if
``lat_min >= lat_max``, or if ``point_density`` is not positive.
Examples
--------
Global coverage at ~100 000 km²/point::
lat, lon = sample_region()
Europe (approximate)::
lat, lon = sample_region(np.radians(35), np.radians(70),
np.radians(-10), np.radians(40),
point_density=1e9)
Pacific Ocean band crossing the anti-meridian::
lat, lon = sample_region(np.radians(-30), np.radians(30),
np.radians(150), np.radians(-120),
point_density=1e10)
"""
# --- validate longitude pairing ---
if (lon_min is None) != (lon_max is None):
raise ValueError(
"lon_min and lon_max must both be None (all longitudes) or both "
"be specified; got lon_min={} lon_max={}".format(lon_min, lon_max)
)
if point_density <= 0:
raise ValueError(
f"point_density must be positive, got {point_density}"
)
# --- resolve defaults ---
lat_lo = float(lat_min) if lat_min is not None else -np.pi / 2.0
lat_hi = float(lat_max) if lat_max is not None else np.pi / 2.0
full_lon = lon_min is None
if lat_lo >= lat_hi:
raise ValueError(
f"lat_min ({lat_lo:.6f} rad) must be less than "
f"lat_max ({lat_hi:.6f} rad)"
)
lon_lo = float(lon_min) if lon_min is not None else 0.0
lon_hi = float(lon_max) if lon_max is not None else 0.0
antimeridian = (not full_lon) and (lon_lo > lon_hi)
# --- compute AoI area (spherical zone formula) ---
# Area = 2π R² Δ(sin lat) × (lon span / 2π)
if full_lon:
lon_frac = 1.0
elif antimeridian:
lon_frac = (2.0 * np.pi - (lon_lo - lon_hi)) / (2.0 * np.pi)
else:
lon_frac = (lon_hi - lon_lo) / (2.0 * np.pi)
area = (4.0 * np.pi * EARTH_MEAN_RADIUS**2
* (np.sin(lat_hi) - np.sin(lat_lo)) / 2.0
* lon_frac)
n = max(1, int(np.round(area / point_density)))
# --- generate a global Fibonacci lattice and filter ---
# Oversample globally so that after filtering we have ≥ n points.
area_fraction = area / (4.0 * np.pi * EARTH_MEAN_RADIUS**2)
n_global = max(n * 5, int(np.ceil(n / area_fraction * 1.3)))
lat_all, lon_all = _fibonacci_sphere(n_global)
# Latitude filter
lat_ok = (lat_all >= lat_lo) & (lat_all <= lat_hi)
# Longitude filter
if full_lon:
lon_ok = np.ones(n_global, dtype=np.bool_)
elif antimeridian:
lon_ok = (lon_all >= lon_lo) | (lon_all <= lon_hi)
else:
lon_ok = (lon_all >= lon_lo) & (lon_all <= lon_hi)
lat_in = lat_all[lat_ok & lon_ok]
lon_in = lon_all[lat_ok & lon_ok]
# Evenly subsample if we have more points than requested
if len(lat_in) > n:
idx = np.round(np.linspace(0, len(lat_in) - 1, n)).astype(int)
lat_in = lat_in[idx]
lon_in = lon_in[idx]
return lat_in, lon_in
[docs]
def coverage_fraction(
lat: npt.NDArray[np.floating],
lon: npt.NDArray[np.floating],
keplerian_params: dict,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float | np.floating = 0.0,
el_min: float | np.floating = 0.0,
propagator_type: str = 'twobody',
max_step: np.timedelta64 = np.timedelta64(30, 's'),
batch_size: int = 1_000,
fov_pointing_lvlh: npt.NDArray | None = None,
fov_half_angle: float | None = None,
sza_max: float | None = None,
sza_min: float | None = None,
) -> dict:
"""Compute instantaneous and cumulative coverage fraction over a time window.
For each sample epoch, the *instantaneous* fraction is the proportion of
ground points with the satellite above ``el_min``. The *cumulative*
fraction is the proportion of ground points seen **at least once** up to
that epoch.
Parameters
----------
lat, lon : npt.NDArray[np.floating]
Ground-point latitudes/longitudes (rad), shape ``(M,)``. Typically
from :func:`sample_aoi`.
keplerian_params : dict
Orbital elements dict, e.g. from :func:`~missiontools.orbit.propagation.sun_synchronous_orbit`.
t_start, t_end : np.datetime64
Analysis window (``datetime64[us]``).
alt : float | np.floating, optional
Ground-point altitude above WGS84 (m). Defaults to 0.
el_min : float | np.floating, optional
Minimum elevation angle (rad). Defaults to 0 (horizon).
propagator_type : str, optional
``'twobody'`` (default) or ``'j2'``.
max_step : np.timedelta64, optional
Scan time step. Defaults to 30 s.
batch_size : int, optional
Time steps per propagation batch. Defaults to 1 000.
Returns
-------
dict
``t`` : ``(N,)`` ``datetime64[us]`` — sample timestamps.
``fraction`` : ``(N,)`` float — instantaneous coverage fraction.
``cumulative`` : ``(N,)`` float — cumulative coverage fraction.
``mean_fraction`` : float — time-averaged instantaneous fraction.
``final_cumulative`` : float — fraction of points covered ≥ once.
"""
spec = make_sensor_spec(keplerian_params, propagator_type,
fov_pointing_lvlh, fov_half_angle)
return coverage_fraction_multi(
lat, lon, [spec], t_start, t_end,
alt, el_min, sza_max, sza_min, max_step, batch_size,
)
[docs]
def revisit_time(
lat: npt.NDArray[np.floating],
lon: npt.NDArray[np.floating],
keplerian_params: dict,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float | np.floating = 0.0,
el_min: float | np.floating = 0.0,
propagator_type: str = 'twobody',
max_step: np.timedelta64 = np.timedelta64(30, 's'),
batch_size: int = 1_000,
fov_pointing_lvlh: npt.NDArray | None = None,
fov_half_angle: float | None = None,
sza_max: float | None = None,
sza_min: float | None = None,
) -> dict:
"""Compute per-point revisit time statistics over a time window.
The *revisit time* for a ground point is the gap between loss of signal
(LOS, last visible step) and the next acquisition of signal (AOS, first
visible step on the following pass). The initial gap from ``t_start`` to
the first AOS is not included.
.. note::
Accuracy is limited to ``max_step``. For exact edge times on
individual points of interest use
:func:`~missiontools.orbit.access.earth_access_intervals`.
Parameters
----------
lat, lon : npt.NDArray[np.floating]
Ground-point latitudes/longitudes (rad), shape ``(M,)``.
keplerian_params : dict
Orbital elements dict.
t_start, t_end : np.datetime64
Analysis window.
alt : float | np.floating, optional
Ground-point altitude above WGS84 (m). Defaults to 0.
el_min : float | np.floating, optional
Minimum elevation angle (rad). Defaults to 0 (horizon).
propagator_type : str, optional
``'twobody'`` or ``'j2'``.
max_step : np.timedelta64, optional
Scan step. Defaults to 30 s.
batch_size : int, optional
Time steps per propagation batch. Defaults to 1 000.
Returns
-------
dict
``max_revisit`` : ``(M,)`` float — per-point max revisit time (s).
``nan`` for points accessed fewer than twice.
``mean_revisit`` : ``(M,)`` float — per-point mean revisit time (s).
``nan`` for points accessed fewer than twice.
``global_max`` : float — worst-case revisit time across all points (s).
``global_mean`` : float — mean of per-point mean revisit times (s).
"""
lat = np.asarray(lat, dtype=np.float64)
lon = np.asarray(lon, dtype=np.float64)
M = len(lat)
_nan = np.full(M, np.nan)
intervals = _collect_access_intervals(
lat, lon, keplerian_params, t_start, t_end,
alt, el_min, propagator_type, max_step, batch_size,
fov_pointing_lvlh, fov_half_angle, sza_max, sza_min,
close_at_end=False,
)
max_revisit = np.full(M, np.nan)
mean_revisit = np.full(M, np.nan)
for m, ivals in enumerate(intervals):
if len(ivals) >= 2:
gaps_us = np.array([
int((ivals[i + 1][0] - ivals[i][1]) / np.timedelta64(1, 'us'))
for i in range(len(ivals) - 1)
], dtype=np.float64)
max_revisit[m] = gaps_us.max() / 1e6
mean_revisit[m] = gaps_us.mean() / 1e6
has_gaps = ~np.isnan(max_revisit)
return {
'max_revisit': max_revisit,
'mean_revisit': mean_revisit,
'global_max': float(np.nanmax(max_revisit)) if has_gaps.any() else float('nan'),
'global_mean': float(np.nanmean(mean_revisit)) if has_gaps.any() else float('nan'),
}
[docs]
def pointwise_coverage(
lat: npt.NDArray[np.floating],
lon: npt.NDArray[np.floating],
keplerian_params: dict,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float | np.floating = 0.0,
el_min: float | np.floating = 0.0,
propagator_type: str = 'twobody',
max_step: np.timedelta64 = np.timedelta64(30, 's'),
batch_size: int = 1_000,
fov_pointing_lvlh: npt.NDArray | None = None,
fov_half_angle: float | None = None,
sza_max: float | None = None,
sza_min: float | None = None,
) -> dict:
"""Return the raw (N × M) visibility matrix for every timestep and ground point.
Unlike :func:`coverage_fraction` and :func:`revisit_time`, which reduce
the visibility matrix to summary statistics, this function returns the
full boolean matrix so callers can apply their own post-processing.
Parameters
----------
lat, lon : npt.NDArray[np.floating]
Ground-point latitudes/longitudes (rad), shape ``(M,)``.
keplerian_params : dict
Orbital elements dict.
t_start, t_end : np.datetime64
Analysis window (``datetime64[us]``).
alt : float | np.floating, optional
Ground-point altitude above WGS84 (m). Defaults to 0.
el_min : float | np.floating, optional
Minimum elevation angle (rad). Defaults to 0 (horizon).
propagator_type : str, optional
``'twobody'`` (default) or ``'j2'``.
max_step : np.timedelta64, optional
Scan time step. Defaults to 30 s.
batch_size : int, optional
Time steps per propagation batch. Defaults to 1 000.
fov_pointing_lvlh : npt.NDArray | None, optional
Sensor pointing direction in LVLH frame. Must be paired with
``fov_half_angle``.
fov_half_angle : float | None, optional
FOV half-angle (rad). Must be paired with ``fov_pointing_lvlh``.
sza_max : float | None, optional
Maximum solar zenith angle (rad). Points where the SZA exceeds this
value are considered invisible (daytime constraint).
sza_min : float | None, optional
Minimum solar zenith angle (rad). Points where the SZA is below this
value are considered invisible (nighttime constraint).
Returns
-------
dict
``t`` : ``(N,)`` ``datetime64[us]`` — sample timestamps.
``lat`` : ``(M,)`` float — ground-point latitudes (rad), echoed from input.
``lon`` : ``(M,)`` float — ground-point longitudes (rad), echoed from input.
``alt`` : float — ground-point altitude (m), echoed from input.
``visible`` : ``(N, M)`` bool — ``True`` where the satellite has
line-of-sight to the ground point at that timestep.
Notes
-----
Memory usage scales as ``N × M`` booleans. For a 30-day window at a
20 s step (N ≈ 130 000) with M = 5 000 points this is roughly 650 MB.
Prefer :func:`coverage_fraction` or :func:`revisit_time` for summary
statistics over large grids.
"""
spec = make_sensor_spec(keplerian_params, propagator_type,
fov_pointing_lvlh, fov_half_angle)
return pointwise_coverage_multi(
lat, lon, [spec], t_start, t_end,
alt, el_min, sza_max, sza_min, max_step, batch_size,
)
[docs]
def access_pointwise(
lat: npt.NDArray[np.floating],
lon: npt.NDArray[np.floating],
keplerian_params: dict,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float | np.floating = 0.0,
el_min: float | np.floating = 0.0,
propagator_type: str = 'twobody',
max_step: np.timedelta64 = np.timedelta64(30, 's'),
batch_size: int = 1_000,
fov_pointing_lvlh: npt.NDArray | None = None,
fov_half_angle: float | None = None,
sza_max: float | None = None,
sza_min: float | None = None,
) -> list[list[tuple[np.datetime64, np.datetime64]]]:
"""Return per-point access intervals over a time window.
Each element of the returned list corresponds to one ground point and
contains a list of ``(AOS, LOS)`` pairs — the times at which the
satellite acquired and lost line-of-sight to that point.
.. note::
Edge times are accurate to ``max_step``. For sub-step accuracy on
individual points of interest use
:func:`~missiontools.orbit.access.earth_access_intervals`.
Parameters
----------
lat, lon : npt.NDArray[np.floating]
Ground-point latitudes/longitudes (rad), shape ``(M,)``.
keplerian_params : dict
Orbital elements dict.
t_start, t_end : np.datetime64
Analysis window.
alt : float | np.floating, optional
Ground-point altitude above WGS84 (m). Defaults to 0.
el_min : float | np.floating, optional
Minimum elevation angle (rad). Defaults to 0 (horizon).
propagator_type : str, optional
``'twobody'`` or ``'j2'``.
max_step : np.timedelta64, optional
Scan step. Defaults to 30 s.
batch_size : int, optional
Time steps per propagation batch. Defaults to 1 000.
fov_pointing_lvlh : npt.NDArray | None, optional
Sensor pointing direction in LVLH frame.
fov_half_angle : float | None, optional
FOV half-angle (rad).
sza_max : float | None, optional
Maximum solar zenith angle (rad).
sza_min : float | None, optional
Minimum solar zenith angle (rad).
Returns
-------
list[list[tuple[datetime64, datetime64]]]
``result[m]`` is a list of ``(AOS, LOS)`` ``datetime64[us]`` pairs
for point ``m``. An interval still open at ``t_end`` is closed with
``t_end``. Points with no access return an empty list.
"""
return _collect_access_intervals(
lat, lon, keplerian_params, t_start, t_end,
alt, el_min, propagator_type, max_step, batch_size,
fov_pointing_lvlh, fov_half_angle, sza_max, sza_min,
close_at_end=True,
)
[docs]
def revisit_pointwise(
lat: npt.NDArray[np.floating],
lon: npt.NDArray[np.floating],
keplerian_params: dict,
t_start: np.datetime64,
t_end: np.datetime64,
alt: float | np.floating = 0.0,
el_min: float | np.floating = 0.0,
propagator_type: str = 'twobody',
max_step: np.timedelta64 = np.timedelta64(30, 's'),
batch_size: int = 1_000,
fov_pointing_lvlh: npt.NDArray | None = None,
fov_half_angle: float | None = None,
sza_max: float | None = None,
sza_min: float | None = None,
) -> list[npt.NDArray[np.timedelta64]]:
"""Return per-point revisit gap arrays over a time window.
Each element of the returned list corresponds to one ground point and
contains an array of ``timedelta64[us]`` values — one per gap between
consecutive access windows (LOS of pass *i* to AOS of pass *i+1*).
.. note::
Accuracy is limited to ``max_step``.
Parameters
----------
lat, lon : npt.NDArray[np.floating]
Ground-point latitudes/longitudes (rad), shape ``(M,)``.
keplerian_params : dict
Orbital elements dict.
t_start, t_end : np.datetime64
Analysis window.
alt : float | np.floating, optional
Ground-point altitude above WGS84 (m). Defaults to 0.
el_min : float | np.floating, optional
Minimum elevation angle (rad). Defaults to 0 (horizon).
propagator_type : str, optional
``'twobody'`` or ``'j2'``.
max_step : np.timedelta64, optional
Scan step. Defaults to 30 s.
batch_size : int, optional
Time steps per propagation batch. Defaults to 1 000.
fov_pointing_lvlh : npt.NDArray | None, optional
Sensor pointing direction in LVLH frame.
fov_half_angle : float | None, optional
FOV half-angle (rad).
sza_max : float | None, optional
Maximum solar zenith angle (rad).
sza_min : float | None, optional
Minimum solar zenith angle (rad).
Returns
-------
list[NDArray[timedelta64]]
``result[m]`` is an array of ``timedelta64[us]`` gaps for point ``m``.
Points with fewer than two access windows return an empty array.
"""
intervals = _collect_access_intervals(
lat, lon, keplerian_params, t_start, t_end,
alt, el_min, propagator_type, max_step, batch_size,
fov_pointing_lvlh, fov_half_angle, sza_max, sza_min,
close_at_end=False,
)
result = []
for ivals in intervals:
if len(ivals) < 2:
result.append(np.array([], dtype='timedelta64[us]'))
else:
gaps = np.array(
[ivals[i + 1][0] - ivals[i][1] for i in range(len(ivals) - 1)],
dtype='timedelta64[us]',
)
result.append(gaps)
return result
# ---------------------------------------------------------------------------
# Shapefile helpers
# ---------------------------------------------------------------------------
#: ESRI shapetype codes for polygon variants (Polygon, PolygonZ, PolygonM)
_SHP_POLYGON_TYPES = frozenset({5, 15, 25})
def _unwrap_ring(coords: list) -> tuple[list, bool]:
"""Unwrap longitude jumps > 180° in a coordinate ring.
Returns the unwrapped coordinate list and whether any unwrapping occurred
(i.e. the ring crosses the antimeridian).
"""
lons_raw = [c[0] for c in coords]
lats = [c[1] for c in coords]
lons_u = [lons_raw[0]]
crosses = False
for j in range(1, len(lons_raw)):
dl = lons_raw[j] - lons_u[-1]
if dl > 180.0:
lons_u.append(lons_raw[j] - 360.0)
crosses = True
elif dl < -180.0:
lons_u.append(lons_raw[j] + 360.0)
crosses = True
else:
lons_u.append(lons_raw[j])
return list(zip(lons_u, lats)), crosses
[docs]
def load_shapefile_geometry(path, feature_index):
"""Read an ESRI Shapefile and return a shapely geometry.
Returns
-------
geom : shapely geometry
Union of all selected features (Polygon or MultiPolygon).
Rings are *unwrapped* so antimeridian-crossing polygons have
continuous (possibly out-of-[-180,180]) coordinates.
crosses_am : bool
True if any ring required longitude unwrapping, indicating that
the geometry may extend outside the [-180, 180] longitude range
and requires shifted-point PIP tests.
"""
import shapefile as _pyshp
from shapely.geometry import shape as _shape, Polygon as _Polygon
from shapely.geometry import MultiPolygon as _MultiPolygon
from shapely.ops import unary_union as _unary_union
sf = _pyshp.Reader(str(path))
if feature_index is not None:
raw_shapes = [sf.shape(feature_index)]
else:
raw_shapes = sf.shapes()
crosses_am = False
geoms = []
for shp in raw_shapes:
if shp.shapeType not in _SHP_POLYGON_TYPES:
continue
# Use pyshp's __geo_interface__ to obtain correct GeoJSON topology.
# This handles the ESRI multipart polygon convention (multiple exterior
# rings within a single feature) correctly, unlike a naive "first ring
# is exterior, rest are holes" approach.
geo = _shape(shp.__geo_interface__)
polys = list(geo.geoms) if isinstance(geo, _MultiPolygon) else [geo]
for poly in polys:
# Unwrap each ring for antimeridian-crossing polygons.
ext_coords, cam = _unwrap_ring(list(poly.exterior.coords))
crosses_am = crosses_am or cam
int_coords_list = []
for interior in poly.interiors:
ic, cam = _unwrap_ring(list(interior.coords))
crosses_am = crosses_am or cam
int_coords_list.append(ic)
geoms.append(_Polygon(ext_coords, int_coords_list))
if not geoms:
raise ValueError(
"No polygon features found in the shapefile. "
"Only shapeTypes 5 (Polygon), 15 (PolygonZ), and 25 (PolygonM) "
"are supported."
)
geom = _unary_union(geoms) if len(geoms) > 1 else geoms[0]
return geom, crosses_am
[docs]
def sample_from_geometry(
geom,
crosses_am: bool,
point_density: float,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
"""Fibonacci-sphere sampling inside a shapely geometry.
Parameters
----------
geom : shapely Polygon or MultiPolygon
Target geometry in geographic degrees (may have extended longitudes
if the polygon crosses the antimeridian).
crosses_am : bool
If True, PIP tests are also performed at ±360° longitude shifts.
point_density : float
Approximate area per sample point (m²). Must be positive.
Returns
-------
lat : (M,) float64, radians
lon : (M,) float64, radians
"""
import shapely as _shapely
# Gonzalez (2009) Fibonacci sphere: N uniformly distributed sphere points
# give area per point ≈ 4πR²/N. To achieve point_density m²/point over
# any polygon of area A, generate N = 4πR²/point_density global points;
# the expected number inside the polygon is then A/point_density,
# independent of the polygon's shape or bounding box.
# A 1.3× oversampling factor guards against Poisson under-count at the
# polygon boundary. A floor of 5 000 ensures that even coarse densities
# on small polygons yield at least a few candidates to test.
_SPHERE_AREA = 4.0 * np.pi * EARTH_MEAN_RADIUS**2
n_global = max(5_000, int(np.ceil(_SPHERE_AREA / point_density * 1.3)))
lat_r, lon_r = _fibonacci_sphere(n_global)
# Shapely expects degrees (lon, lat) = (x, y)
lon_deg = np.degrees(lon_r)
lat_deg = np.degrees(lat_r)
inside = _shapely.contains_xy(geom, lon_deg, lat_deg)
if crosses_am:
# Also test points shifted by ±360° to reach the unwrapped polygon
inside |= _shapely.contains_xy(geom, lon_deg + 360.0, lat_deg)
inside |= _shapely.contains_xy(geom, lon_deg - 360.0, lat_deg)
lat_in = lat_r[inside]
lon_in = lon_r[inside]
if len(lat_in) == 0:
raise ValueError(
"No sample points fell inside the geometry — "
"check that coordinates are in geographic degrees (WGS84 / EPSG:4326)."
)
return lat_in, lon_in
[docs]
def sample_shapefile(
path: str,
*,
feature_index: int | None = None,
point_density: float = 1e11,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
"""Sample approximately equal-area points from an ESRI Shapefile polygon.
Reads the polygon geometry from a ``.shp`` file and returns a Fibonacci
lattice filtered to the interior, using the same density convention as
:func:`sample_region`. Antimeridian-crossing polygons (e.g. Pacific
island groups, Russia) are handled correctly via coordinate unwrapping
and shifted-point PIP tests.
Parameters
----------
path : str
Path to the ``.shp`` file.
feature_index : int or None, optional
Index of the feature/shape to sample from. ``None`` (default)
unions all features in the layer.
point_density : float, optional
Approximate area represented by each sample point (m²).
Defaults to 1×10¹¹ m² (~100 000 km² per point).
Returns
-------
lat : npt.NDArray[np.floating]
Sample latitudes (rad), shape ``(M,)``.
lon : npt.NDArray[np.floating]
Sample longitudes (rad), shape ``(M,)``.
Raises
------
ValueError
If the file contains no polygon features, or if no lattice points
land inside the geometry (degenerate or very small region).
Notes
-----
Requires ``pyshp`` and ``shapely`` (both declared as package
dependencies).
"""
if point_density <= 0:
raise ValueError(f"point_density must be positive, got {point_density}")
geom, crosses_am = load_shapefile_geometry(path, feature_index)
return sample_from_geometry(geom, crosses_am, point_density)
# ---------------------------------------------------------------------------
# Natural Earth geography helpers
# ---------------------------------------------------------------------------
def _load_ne_features(
path: str,
indices: list[int],
) -> tuple:
"""Load and union a specific subset of features from a Natural Earth shapefile.
Parameters
----------
path : str
Path to the ``.shp`` file.
indices : list[int]
Feature indices to load and union.
Returns
-------
geom : shapely Polygon or MultiPolygon
crosses_am : bool
"""
from shapely.ops import unary_union as _unary_union
geoms: list = []
crosses_am = False
for i in indices:
g, cam = load_shapefile_geometry(path, i)
geoms.append(g)
crosses_am = crosses_am or cam
if not geoms:
raise ValueError("No features matched the requested indices.")
return (_unary_union(geoms) if len(geoms) > 1 else geoms[0]), crosses_am
def _find_ne_indices(geography: str) -> tuple[str, list[int]]:
"""Resolve a geography string to a shapefile path and feature indices.
Detection order
---------------
1. Slash pattern ``'Country/Subdivision'`` → admin-1 by name
2. ISO 3166-2 ``'CA-QC'`` → admin-1 ``iso_3166_2``
3. ISO A2 ``'CA'`` → admin-0 ``ISO_A2``
4. ISO A3 ``'CAN'`` → admin-0 ``ISO_A3``
5. Name ``'Canada'`` → admin-0 ``NAME`` (case-insensitive);
fallback to admin-1 ``name``
Returns
-------
path : str
Absolute path to the matched shapefile.
indices : list[int]
Feature indices (one or more) to union.
Raises
------
ValueError
If the geography string matches none of the above.
"""
import shapefile as _pyshp
g = geography.strip()
# ── 1. Slash: "Country/Subdivision" ─────────────────────────────────────
if '/' in g:
country, subdivision = (s.strip() for s in g.split('/', 1))
cl = country.lower()
sl = subdivision.lower()
sf1 = _pyshp.Reader(str(_NE_ADM1))
idx = [i for i, r in enumerate(sf1.records())
if r.as_dict()['admin'].lower() == cl
and (r.as_dict()['name'].lower() == sl
or r.as_dict()['name_en'].lower() == sl)]
if not idx:
raise ValueError(
f"Subdivision {subdivision!r} not found in {country!r}. "
f"Sub-national (admin-1) data is available only for: "
f"Australia, Brazil, Canada, China, India, Indonesia, "
f"Russia, South Africa, United States of America."
)
return str(_NE_ADM1), idx
# ── 2. ISO 3166-2: "CA-QC" ──────────────────────────────────────────────
if _re.match(r'^[A-Z]{2}-[A-Z0-9]{1,3}$', g):
sf1 = _pyshp.Reader(str(_NE_ADM1))
idx = [i for i, r in enumerate(sf1.records())
if r.as_dict()['iso_3166_2'] == g]
if idx:
return str(_NE_ADM1), idx
# ── 3. ISO A2: "CA" ─────────────────────────────────────────────────────
if len(g) == 2 and g.isupper():
sf0 = _pyshp.Reader(str(_NE_ADM0))
idx = [i for i, r in enumerate(sf0.records())
if r.as_dict()['ISO_A2'] == g]
if idx:
return str(_NE_ADM0), idx
# ── 4. ISO A3: "CAN" ────────────────────────────────────────────────────
if len(g) == 3 and g.isupper():
sf0 = _pyshp.Reader(str(_NE_ADM0))
idx = [i for i, r in enumerate(sf0.records())
if r.as_dict()['ISO_A3'] == g]
if idx:
return str(_NE_ADM0), idx
# ── 5. Name lookup (case-insensitive) ────────────────────────────────────
gl = g.lower()
sf0 = _pyshp.Reader(str(_NE_ADM0))
idx = [i for i, r in enumerate(sf0.records())
if r.as_dict()['NAME'].lower() == gl]
if idx:
return str(_NE_ADM0), idx
sf1 = _pyshp.Reader(str(_NE_ADM1))
idx = [i for i, r in enumerate(sf1.records())
if r.as_dict()['name'].lower() == gl
or r.as_dict()['name_en'].lower() == gl]
if idx:
return str(_NE_ADM1), idx
raise ValueError(
f"Geography not found: {geography!r}. "
f"Accepted formats: country name ('Canada'), "
f"'Country/Subdivision' ('Canada/Quebec'), "
f"ISO A2 ('CA'), ISO A3 ('CAN'), ISO 3166-2 ('CA-QC')."
)
[docs]
def sample_geography(
geography: str,
*,
point_density: float = 1e11,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating], object]:
"""Sample approximately equal-area points from a Natural Earth geography.
Looks up the requested country or subdivision in the bundled Natural Earth
50 m dataset and returns a Fibonacci lattice filtered to its interior, using
the same density convention as :func:`sample_region`.
Parameters
----------
geography : str
The geography to sample. Accepted formats (auto-detected):
- Country name: ``'Canada'`` (case-insensitive, matched against the
admin-0 ``NAME`` field; falls back to admin-1 ``name`` if no match).
- ``'Country/Subdivision'``: ``'Canada/Quebec'``, ``'United States of
America/Alaska'``. Sub-national data is available only for
Australia, Brazil, Canada, China, India, Indonesia, Russia,
South Africa, and the United States of America.
- ISO 3166-1 alpha-2: ``'CA'``, ``'US'``
- ISO 3166-1 alpha-3: ``'CAN'``, ``'USA'``
- ISO 3166-2: ``'CA-QC'``, ``'US-AK'``
point_density : float, optional
Approximate area represented by each sample point (m²).
Defaults to 1×10¹¹ m² (~100 000 km² per point).
Returns
-------
lat : (M,) float64
Sample latitudes (rad).
lon : (M,) float64
Sample longitudes (rad).
geometry : shapely Polygon or MultiPolygon
Shapely geometry of the matched feature(s).
Raises
------
ValueError
If ``geography`` does not match any feature, ``point_density`` is
non-positive, or no lattice points fall inside the geometry.
Examples
--------
::
lat, lon, geom = sample_geography('Canada')
lat, lon, geom = sample_geography('Canada/Quebec')
lat, lon, geom = sample_geography('CA-QC')
lat, lon, geom = sample_geography('CAN')
"""
if point_density <= 0:
raise ValueError(f"point_density must be positive, got {point_density}")
path, indices = _find_ne_indices(geography)
geom, crosses_am = _load_ne_features(path, indices)
lat, lon = sample_from_geometry(geom, crosses_am, point_density)
return lat, lon, geom
[docs]
def geography_geometry(geography: str) -> tuple[object, bool]:
"""Return the Shapely geometry for a Natural Earth geography without sampling.
Parameters
----------
geography : str
Same format as :func:`sample_geography` — country name, ISO code,
``'Country/Subdivision'``, or ISO 3166-2 code.
Returns
-------
geom : shapely Polygon or MultiPolygon
Geometry in geographic degrees. May use extended longitudes
(> 180°) for antimeridian-crossing features (e.g. Russia).
crosses_am : bool
True if the geometry uses unwrapped longitude coordinates.
Raises
------
ValueError
If ``geography`` does not match any bundled feature.
"""
path, indices = _find_ne_indices(geography)
return _load_ne_features(path, indices)