import re as _re
from pathlib import Path
from typing import NamedTuple
import numpy as np
import numpy.typing as npt
from matplotlib.path import Path as _MplPath
from ..orbit.constants import EARTH_MEAN_RADIUS
_GEODATA_DIR = Path(__file__).parent / "geodata"
_NE_ADM0 = _GEODATA_DIR / "ne_map_units" / "ne_50m_admin_0_map_units.shp"
_NE_ADM1 = _GEODATA_DIR / "ne_states_provinces" / "ne_50m_admin_1_states_provinces.shp"
def _fibonacci_sphere(n: int) -> tuple[npt.NDArray, npt.NDArray]:
if n == 1:
return np.array([0.0]), np.array([0.0])
phi = (1.0 + np.sqrt(5.0)) / 2.0
i = np.arange(n, dtype=np.float64)
lat = np.arcsin(np.clip(2.0 * i / (n - 1) - 1.0, -1.0, 1.0))
lon = (2.0 * np.pi * i / phi) % (2.0 * np.pi) - np.pi
return lat, lon
def _pip(
polygon: npt.NDArray, lat: npt.NDArray, lon: npt.NDArray
) -> npt.NDArray[np.bool_]:
path = _MplPath(polygon[:, ::-1])
return path.contains_points(np.column_stack([lon, lat]))
[docs]
def sample_aoi(
polygon: npt.NDArray[np.floating],
n: int,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
polygon = np.asarray(polygon, dtype=np.float64)
if polygon.ndim != 2 or polygon.shape[1] != 2:
raise ValueError("polygon must have shape (V, 2) with columns [lat, lon]")
if n < 1:
raise ValueError(f"n must be at least 1, got {n}")
n_pilot = max(n * 10, 5_000)
lat_p, lon_p = _fibonacci_sphere(n_pilot)
frac = float(_pip(polygon, lat_p, lon_p).mean())
if frac < 1e-6:
raise ValueError(
"AoI polygon encloses too few global lattice points — "
"check that coordinates are in radians and the polygon is not "
"degenerate."
)
n_global = int(np.ceil(n / frac * 1.3))
lat_all, lon_all = _fibonacci_sphere(n_global)
inside = _pip(polygon, lat_all, lon_all)
lat_in = lat_all[inside]
lon_in = lon_all[inside]
if len(lat_in) > n:
idx = np.round(np.linspace(0, len(lat_in) - 1, n)).astype(int)
lat_in = lat_in[idx]
lon_in = lon_in[idx]
return lat_in, lon_in
[docs]
def sample_region(
lat_min: float | None = None,
lat_max: float | None = None,
lon_min: float | None = None,
lon_max: float | None = None,
point_density: float = 1e5,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
if (lon_min is None) != (lon_max is None):
raise ValueError(
"lon_min and lon_max must both be None (all longitudes) or both "
"be specified; got lon_min={} lon_max={}".format(lon_min, lon_max)
)
if point_density <= 0:
raise ValueError(f"point_density must be positive, got {point_density}")
lat_lo = float(lat_min) if lat_min is not None else -np.pi / 2.0
lat_hi = float(lat_max) if lat_max is not None else np.pi / 2.0
full_lon = lon_min is None
if lat_lo >= lat_hi:
raise ValueError(
f"lat_min ({lat_lo:.6f} rad) must be less than lat_max ({lat_hi:.6f} rad)"
)
lon_lo = float(lon_min) if lon_min is not None else 0.0
lon_hi = float(lon_max) if lon_max is not None else 0.0
antimeridian = (not full_lon) and (lon_lo > lon_hi)
if full_lon:
lon_frac = 1.0
elif antimeridian:
lon_frac = (2.0 * np.pi - (lon_lo - lon_hi)) / (2.0 * np.pi)
else:
lon_frac = (lon_hi - lon_lo) / (2.0 * np.pi)
area = (
4.0
* np.pi
* EARTH_MEAN_RADIUS**2
* (np.sin(lat_hi) - np.sin(lat_lo))
/ 2.0
* lon_frac
)
pd_m2 = point_density * 1e6
n = max(1, int(np.round(area / pd_m2)))
area_fraction = area / (4.0 * np.pi * EARTH_MEAN_RADIUS**2)
n_global = max(n * 5, int(np.ceil(n / area_fraction * 1.3)))
lat_all, lon_all = _fibonacci_sphere(n_global)
lat_ok = (lat_all >= lat_lo) & (lat_all <= lat_hi)
if full_lon:
lon_ok = np.ones(n_global, dtype=np.bool_)
elif antimeridian:
lon_ok = (lon_all >= lon_lo) | (lon_all <= lon_hi)
else:
lon_ok = (lon_all >= lon_lo) & (lon_all <= lon_hi)
lat_in = lat_all[lat_ok & lon_ok]
lon_in = lon_all[lat_ok & lon_ok]
if len(lat_in) > n:
idx = np.round(np.linspace(0, len(lat_in) - 1, n)).astype(int)
lat_in = lat_in[idx]
lon_in = lon_in[idx]
return lat_in, lon_in
_SHP_POLYGON_TYPES = frozenset({5, 15, 25})
def _unwrap_ring(coords: list) -> tuple[list, bool]:
lons_raw = [c[0] for c in coords]
lats = [c[1] for c in coords]
lons_u = [lons_raw[0]]
crosses = False
for j in range(1, len(lons_raw)):
dl = lons_raw[j] - lons_u[-1]
if dl > 180.0:
lons_u.append(lons_raw[j] - 360.0)
crosses = True
elif dl < -180.0:
lons_u.append(lons_raw[j] + 360.0)
crosses = True
else:
lons_u.append(lons_raw[j])
return list(zip(lons_u, lats)), crosses
[docs]
def load_shapefile_geometry(path, feature_index):
import shapefile as _pyshp
from shapely.geometry import shape as _shape, Polygon as _Polygon
from shapely.geometry import MultiPolygon as _MultiPolygon
from shapely.ops import unary_union as _unary_union
sf = _pyshp.Reader(str(path))
if feature_index is not None:
raw_shapes = [sf.shape(feature_index)]
else:
raw_shapes = sf.shapes()
crosses_am = False
geoms = []
for shp in raw_shapes:
if shp.shapeType not in _SHP_POLYGON_TYPES:
continue
geo = _shape(shp.__geo_interface__)
polys = list(geo.geoms) if isinstance(geo, _MultiPolygon) else [geo]
for poly in polys:
ext_coords, cam = _unwrap_ring(list(poly.exterior.coords))
crosses_am = crosses_am or cam
int_coords_list = []
for interior in poly.interiors:
ic, cam = _unwrap_ring(list(interior.coords))
crosses_am = crosses_am or cam
int_coords_list.append(ic)
geoms.append(_Polygon(ext_coords, int_coords_list))
if not geoms:
raise ValueError(
"No polygon features found in the shapefile. "
"Only shapeTypes 5 (Polygon), 15 (PolygonZ), and 25 (PolygonM) "
"are supported."
)
geom = _unary_union(geoms) if len(geoms) > 1 else geoms[0]
return geom, crosses_am
[docs]
def sample_from_geometry(
geom,
crosses_am: bool,
point_density: float,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
import shapely as _shapely
_SPHERE_AREA = 4.0 * np.pi * EARTH_MEAN_RADIUS**2
pd_m2 = point_density * 1e6
n_global = max(5_000, int(np.ceil(_SPHERE_AREA / pd_m2 * 1.3)))
lat_r, lon_r = _fibonacci_sphere(n_global)
lon_deg = np.degrees(lon_r)
lat_deg = np.degrees(lat_r)
inside = _shapely.contains_xy(geom, lon_deg, lat_deg)
if crosses_am:
inside |= _shapely.contains_xy(geom, lon_deg + 360.0, lat_deg)
inside |= _shapely.contains_xy(geom, lon_deg - 360.0, lat_deg)
lat_in = lat_r[inside]
lon_in = lon_r[inside]
if len(lat_in) == 0:
raise ValueError(
"No sample points fell inside the geometry — "
"check that coordinates are in geographic degrees (WGS84 / EPSG:4326)."
)
return lat_in, lon_in
[docs]
def sample_shapefile(
path: str,
*,
feature_index: int | None = None,
point_density: float = 1e5,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]:
if point_density <= 0:
raise ValueError(f"point_density must be positive, got {point_density}")
geom, crosses_am = load_shapefile_geometry(path, feature_index)
return sample_from_geometry(geom, crosses_am, point_density)
def _load_ne_features(
path: str,
indices: list[int],
) -> tuple:
from shapely.ops import unary_union as _unary_union
geoms: list = []
crosses_am = False
for i in indices:
g, cam = load_shapefile_geometry(path, i)
geoms.append(g)
crosses_am = crosses_am or cam
if not geoms:
raise ValueError("No features matched the requested indices.")
return (_unary_union(geoms) if len(geoms) > 1 else geoms[0]), crosses_am
def _find_ne_indices(geography: str) -> tuple[str, list[int]]:
import shapefile as _pyshp
g = geography.strip()
if "/" in g:
country, subdivision = (s.strip() for s in g.split("/", 1))
cl = country.lower()
sl = subdivision.lower()
sf1 = _pyshp.Reader(str(_NE_ADM1))
idx = [
i
for i, r in enumerate(sf1.records())
if r.as_dict()["admin"].lower() == cl
and (
r.as_dict()["name"].lower() == sl
or r.as_dict()["name_en"].lower() == sl
)
]
if not idx:
raise ValueError(
f"Subdivision {subdivision!r} not found in {country!r}. "
f"Sub-national (admin-1) data is available only for: "
f"Australia, Brazil, Canada, China, India, Indonesia, "
f"Russia, South Africa, United States of America."
)
return str(_NE_ADM1), idx
if _re.match(r"^[A-Z]{2}-[A-Z0-9]{1,3}$", g):
sf1 = _pyshp.Reader(str(_NE_ADM1))
idx = [i for i, r in enumerate(sf1.records()) if r.as_dict()["iso_3166_2"] == g]
if idx:
return str(_NE_ADM1), idx
if len(g) == 2 and g.isupper():
sf0 = _pyshp.Reader(str(_NE_ADM0))
idx = [i for i, r in enumerate(sf0.records()) if r.as_dict()["ISO_A2"] == g]
if idx:
return str(_NE_ADM0), idx
if len(g) == 3 and g.isupper():
sf0 = _pyshp.Reader(str(_NE_ADM0))
idx = [i for i, r in enumerate(sf0.records()) if r.as_dict()["ISO_A3"] == g]
if idx:
return str(_NE_ADM0), idx
gl = g.lower()
sf0 = _pyshp.Reader(str(_NE_ADM0))
idx = [i for i, r in enumerate(sf0.records()) if r.as_dict()["NAME"].lower() == gl]
if idx:
return str(_NE_ADM0), idx
sf1 = _pyshp.Reader(str(_NE_ADM1))
idx = [
i
for i, r in enumerate(sf1.records())
if r.as_dict()["name"].lower() == gl or r.as_dict()["name_en"].lower() == gl
]
if idx:
return str(_NE_ADM1), idx
raise ValueError(
f"Geography not found: {geography!r}. "
f"Accepted formats: country name ('Canada'), "
f"'Country/Subdivision' ('Canada/Quebec'), "
f"ISO A2 ('CA'), ISO A3 ('CAN'), ISO 3166-2 ('CA-QC')."
)
[docs]
def sample_geography(
geography: str,
*,
point_density: float = 1e5,
) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating], object]:
if point_density <= 0:
raise ValueError(f"point_density must be positive, got {point_density}")
path, indices = _find_ne_indices(geography)
geom, crosses_am = _load_ne_features(path, indices)
lat, lon = sample_from_geometry(geom, crosses_am, point_density)
return lat, lon, geom
[docs]
def geography_geometry(geography: str) -> tuple[object, bool]:
path, indices = _find_ne_indices(geography)
return _load_ne_features(path, indices)