# exactexact wrappers
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal
import geopandas as gpd
import numpy as np
import sparse
import xarray as xr
from exactextract import exact_extract
from exactextract.raster import NumPyRasterSource
from ..rasterize.core import _get_affine
from ..utils import get_grid_mapping_var
from .utils import clip_to_bbox, geometries_as_dask_array, is_in_memory
if TYPE_CHECKING:
import dask.array
import dask_geopandas
MIN_CHUNK_SIZE = 2 # exactextract cannot handle arrays of size 1.
GEOM_AXIS = 0
Y_AXIS = 1
X_AXIS = 2
DEFAULT_STRATEGY = "feature-sequential"
Strategy = Literal["feature-sequential", "raster-sequential", "raster-parallel"]
CoverageWeights = Literal["area_spherical_m2", "area_cartesian", "area_spherical_km2", "fraction", "none"]
__all__ = ["coverage"]
def affine_to_raster_source(affine, shape: tuple[int, int], *, srs_wkt: str | None) -> NumPyRasterSource:
"""Convert affine transform and shape directly to a NumPyRasterSource.
Parameters
----------
affine : Affine
Affine transform for the raster grid (top-left corner convention).
shape : tuple[int, int]
Shape of the raster (nrows, ncols).
srs_wkt : str or None
Well-known text representation of the spatial reference system.
Returns
-------
NumPyRasterSource
A raster source with the specified extent for use with exactextract.
"""
nrows, ncols = shape
# Compute the four corners using the affine transform
# The affine maps pixel coordinates to world coordinates
# Top-left: (0, 0), Top-right: (ncols, 0), Bottom-left: (0, nrows), Bottom-right: (ncols, nrows)
x0, y0 = affine * (0, 0) # top-left
x1, y1 = affine * (ncols, nrows) # bottom-right
xmin, xmax = min(x0, x1), max(x0, x1)
ymin, ymax = min(y0, y1), max(y0, y1)
return NumPyRasterSource(
np.broadcast_to([1], shape),
xmin=xmin,
xmax=xmax,
ymin=ymin,
ymax=ymax,
srs_wkt=srs_wkt,
)
def xy_to_raster_source(x: np.ndarray, y: np.ndarray, *, srs_wkt: str | None) -> NumPyRasterSource:
assert x.ndim == 1
assert y.ndim == 1
xsize = x.size
ysize = y.size
# we need the top left corner, and the bottom right corner
dx0 = (x[1] - x[0]) / 2
dx1 = (x[-1] - x[-2]) / 2
dy0 = np.abs(y[1] - y[0]) / 2
dy1 = np.abs(y[-1] - y[-2]) / 2
if y[0] > y[-1]:
dy0, dy1 = dy1, dy0
shape = (ysize, xsize)
raster = NumPyRasterSource(
np.broadcast_to([1], shape),
xmin=x.min() - dx0,
xmax=x.max() + dx1,
ymin=y.min() - dy0,
ymax=y.max() + dy1,
srs_wkt=srs_wkt,
)
return raster
def get_dtype(coverage_weight: CoverageWeights, geometries):
if coverage_weight.lower() == "none":
dtype = np.uint8
else:
dtype = np.float64
return dtype
def np_coverage(
affine,
*,
shape: tuple[int, int],
geometries: gpd.GeoDataFrame,
strategy: Strategy = DEFAULT_STRATEGY,
coverage_weight: CoverageWeights = "fraction",
) -> np.ndarray[Any, Any]:
dtype = get_dtype(coverage_weight, geometries)
if len(geometries.columns) > 1:
raise ValueError("Require a single geometries column or a GeoSeries.")
raster = affine_to_raster_source(affine, shape, srs_wkt=geometries.crs.to_wkt())
result = exact_extract(
rast=raster,
vec=geometries,
ops=["cell_id", f"coverage(coverage_weight={coverage_weight})"],
output="pandas",
# max_cells_in_memory=2*x.size * y.size
)
# Vectorized construction of COO sparse array
# Notes on GCXS vs COO: For N data points in 263 geoms by 4000 x by 4000 y
# 1. GCXS cannot compress _all_ axes. This is relevant here.
# 2. GCXS: indptr is 4000*4000 + 1, N per indices & N per data
# 3. COO: 4*N
# It is not obvious that there is much improvement to GCXS at least currently
cell_id_arrays = result.cell_id.values
coverage_arrays = result.coverage.values
lens = np.array([len(c) for c in cell_id_arrays])
if lens.sum() == 0:
# No coverage - return empty sparse array
return sparse.COO(
(np.array([], dtype=np.int64), np.array([], dtype=np.int64), np.array([], dtype=np.int64)),
data=np.array([], dtype=dtype),
sorted=True,
fill_value=0,
shape=(len(geometries), *shape),
)
# Vectorized: repeat geometry indices, concatenate cell_ids and coverage values
geom_idxs = np.repeat(np.arange(len(geometries)), lens)
xy_idxs = np.concatenate([c for c in cell_id_arrays if len(c) > 0])
data = np.concatenate([c for c in coverage_arrays if len(c) > 0]).astype(dtype)
return sparse.COO(
(geom_idxs, *np.unravel_index(xy_idxs, shape=shape)),
data=data,
sorted=True,
fill_value=0,
shape=(len(geometries), *shape),
)
def coverage_np_dask_wrapper(
geom_array: np.ndarray,
x_offsets: np.ndarray,
y_offsets: np.ndarray,
x_sizes: np.ndarray,
y_sizes: np.ndarray,
affine,
coverage_weight: CoverageWeights,
strategy: Strategy,
crs,
) -> np.ndarray:
shape = (y_sizes.item(), x_sizes.item())
chunk_affine = affine * affine.translation(x_offsets.item(), y_offsets.item())
return np_coverage(
chunk_affine,
shape=shape,
geometries=gpd.GeoDataFrame(geometry=geom_array.squeeze(axis=(X_AXIS, Y_AXIS)), crs=crs),
coverage_weight=coverage_weight,
)
def dask_coverage(
affine,
*,
chunks: tuple[tuple[int, ...], tuple[int, ...]],
geom_array: dask.array.Array,
coverage_weight: CoverageWeights = "fraction",
strategy: Strategy = DEFAULT_STRATEGY,
crs: Any,
) -> dask.array.Array:
import dask.array
from dask.array import from_array
y_chunks, x_chunks = chunks
if any(c == 1 for c in x_chunks) or any(c == 1 for c in y_chunks):
raise ValueError("exactextract does not support a chunksize of 1. Please rechunk to avoid this")
x_sizes = from_array(x_chunks, chunks=1)
y_sizes = from_array(y_chunks, chunks=1)
x_offsets = from_array(np.cumulative_sum(x_chunks[:-1], include_initial=True), chunks=1)
y_offsets = from_array(np.cumulative_sum(y_chunks[:-1], include_initial=True), chunks=1)
out = dask.array.map_blocks(
coverage_np_dask_wrapper,
geom_array[:, np.newaxis, np.newaxis],
x_offsets[np.newaxis, np.newaxis, :],
y_offsets[np.newaxis, :, np.newaxis],
x_sizes[np.newaxis, np.newaxis, :],
y_sizes[np.newaxis, :, np.newaxis],
affine=affine,
crs=crs,
coverage_weight=coverage_weight,
strategy=strategy,
chunks=(*geom_array.chunks, tuple(y_chunks), tuple(x_chunks)),
meta=sparse.COO(
[], data=np.array([], dtype=get_dtype(coverage_weight, geom_array)), shape=(0, 0, 0), fill_value=0
),
)
return out
[docs]
def coverage(
obj: xr.Dataset | xr.DataArray,
geometries: gpd.GeoDataFrame | dask_geopandas.GeoDataFrame,
*,
xdim="x",
ydim="y",
strategy: Strategy = "feature-sequential",
coverage_weight: CoverageWeights = "fraction",
clip: bool = False,
) -> xr.DataArray:
"""Calculate pixel coverage fractions for geometries using exactextract.
This function computes how much of each raster pixel is covered by each geometry,
returning precise fractional coverage values or area measurements. It supports
both in-memory and dask-based computation for large datasets.
Parameters
----------
obj : xarray.DataArray or xarray.Dataset
Xarray object defining the raster grid. If a grid mapping coordinate
variable (e.g. ``spatial_ref``) is present, it will be propagated to
the output.
geometries : geopandas.GeoDataFrame or dask_geopandas.GeoDataFrame
Vector geometries for which to calculate coverage. CRS should match
the raster object (though this is not currently enforced).
xdim : str, default "x"
Name of the x (longitude/easting) dimension in the raster object.
ydim : str, default "y"
Name of the y (latitude/northing) dimension in the raster object.
strategy : {"feature-sequential", "raster-sequential", "raster-parallel"}, default "feature-sequential"
Processing strategy passed to exactextract. Controls how computation
is parallelized and memory is managed.
coverage_weight : {"fraction", "none", "area_cartesian", "area_spherical_m2", "area_spherical_km2"}, default "fraction"
Type of coverage measurement to compute:
- "fraction": Fractional coverage (0-1)
- "none": Binary coverage (0 or 1)
- "area_cartesian": Area in map units squared
- "area_spherical_m2": Spherical area in square meters
- "area_spherical_km2": Spherical area in square kilometers
clip: bool
If True, clip raster to the bounding box of the geometries.
Ignored for dask-geopandas geometries.
Returns
-------
xarray.DataArray
3D DataArray with dimensions (geometry, y, x) containing coverage values.
Data type depends on coverage_weight: uint8 for "none", float64 otherwise.
Includes appropriate units and long_name attributes for area measurements.
Raises
------
ValueError
If exactextract encounters chunks of size 1 (not supported by exactextract).
See Also
--------
exactextract.exact_extract : Underlying exactextract function used for coverage calculation
Examples
--------
Calculate fractional coverage:
>>> import rasterix.rasterize.exact as exact
>>> import xarray as xr
>>> import geopandas as gpd
>>> # Load raster data with CRS info
>>> raster = xr.open_dataarray("data.tif")
>>> # Load vector geometries
>>> geometries = gpd.read_file("polygons.shp")
>>> # Calculate coverage fractions
>>> coverage = exact.coverage(raster, geometries)
>>> print(coverage.dims) # ('geometry', 'y', 'x')
Calculate area in square meters:
>>> area_coverage = exact.coverage(raster, geometries, coverage_weight="area_spherical_m2")
>>> print(area_coverage.units) # 'm2'
"""
# FIXME: assert obj.crs == geometries.crs
if clip:
obj = clip_to_bbox(obj, geometries, xdim=xdim, ydim=ydim)
affine = _get_affine(obj, x_dim=xdim, y_dim=ydim)
shape = (obj.sizes[ydim], obj.sizes[xdim])
if is_in_memory(obj=obj, geometries=geometries):
out = np_coverage(
affine,
shape=shape,
geometries=geometries,
coverage_weight=coverage_weight,
strategy=strategy,
)
geom_array = geometries.to_numpy().squeeze(axis=1)
else:
geom_dask_array = geometries_as_dask_array(geometries)
chunks = (
obj.chunksizes.get(ydim, (obj.sizes[ydim],)),
obj.chunksizes.get(xdim, (obj.sizes[xdim],)),
)
out = dask_coverage(
affine,
chunks=chunks,
geom_array=geom_dask_array,
crs=geometries.crs,
coverage_weight=coverage_weight,
strategy=strategy,
)
if isinstance(geometries, gpd.GeoDataFrame):
geom_array = geometries.to_numpy().squeeze(axis=1)
else:
geom_array = geom_dask_array
name = "coverage"
attrs = {}
if "area" in coverage_weight:
name = "area"
if "_m2" in coverage_weight or coverage_weight == "area_cartesian":
attrs["long_name"] = coverage_weight.removesuffix("_m2")
attrs["units"] = "m2"
elif "_km2" in coverage_weight:
attrs["long_name"] = coverage_weight.removesuffix("_km2")
attrs["units"] = "km2"
xy_coords = [
xr.Coordinates.from_xindex(obj.xindexes.get(dim))
for dim in (xdim, ydim)
if obj.xindexes.get(dim) is not None
]
grid_mapping_coords: dict = {}
if (grid_mapping := get_grid_mapping_var(obj)) is not None:
grid_mapping_coords[grid_mapping.name] = grid_mapping
coords = xr.Coordinates(
coords={
**grid_mapping_coords,
"geometry": geom_array,
},
indexes={},
)
if xy_coords:
for c in xy_coords:
coords = coords.merge(c)
coords = coords.coords
coverage = xr.DataArray(dims=("geometry", ydim, xdim), data=out, coords=coords, attrs=attrs, name=name)
return coverage
# ============================================================================
# Engine functions for use with rasterize() in core.py
# These provide the same interface as rasterio.py and rusterize.py engines
# ============================================================================
def rasterize_geometries(
geometries,
*,
dtype: np.dtype,
shape: tuple[int, int],
affine,
offset: int,
all_touched: bool = False,
merge_alg: str = "replace",
fill: Any = 0,
**kwargs,
) -> np.ndarray:
"""
Rasterize geometries using exactextract for precise coverage.
Uses exactextract to compute which pixels are covered by each geometry,
then reduces to geometry indices using the specified merge algorithm.
Parameters
----------
geometries : Sequence[Geometry]
Shapely geometries to rasterize.
dtype : np.dtype
Output data type.
shape : tuple[int, int]
Output shape (nrows, ncols).
affine : Affine
Affine transform for the output grid.
offset : int
Starting value for geometry indices.
all_touched : bool
If True, all pixels touched by geometries will be burned in.
Note: exactextract does not support this parameter directly.
merge_alg : str
Merge algorithm: "replace" or "add".
fill : Any
Fill value for pixels not covered by any geometry.
**kwargs
Additional arguments (ignored for compatibility).
Returns
-------
np.ndarray
Rasterized array with shape (nrows, ncols).
"""
if all_touched:
raise NotImplementedError(
"all_touched=True is not supported by the exactextract engine. "
"Use engine='rusterize' or engine='rasterio' if you need all_touched support."
)
if len(geometries) == 0:
return np.full(shape, fill, dtype=dtype)
# Create GeoDataFrame (exactextract requires it)
# Use a dummy CRS - exactextract needs one but the algorithm doesn't depend on it
gdf = gpd.GeoDataFrame(geometry=list(geometries), crs="EPSG:4326")
# Use exactextract to get coverage (binary mode for efficiency)
raster = affine_to_raster_source(affine, shape, srs_wkt=gdf.crs.to_wkt())
result = exact_extract(
rast=raster,
vec=gdf,
ops=["cell_id", "coverage(coverage_weight=none)"],
output="pandas",
)
# Initialize output with fill value
out = np.full(shape, fill, dtype=dtype)
# Vectorized merging: concatenate all cell_ids and corresponding values
cell_id_arrays = result.cell_id.values
lens = np.array([len(c) for c in cell_id_arrays])
if lens.sum() == 0:
return out
all_cell_ids = np.concatenate([c for c in cell_id_arrays if len(c) > 0])
all_values = np.repeat(np.arange(len(geometries), dtype=dtype) + offset, lens)
# Process based on merge algorithm
if merge_alg == "replace":
# Later geometries overwrite earlier ones (last write wins)
np.put(out, all_cell_ids, all_values)
elif merge_alg == "add":
# Sum values where geometries overlap (unbuffered addition)
np.add.at(out.ravel(), all_cell_ids, all_values)
else:
raise ValueError(f"Unsupported merge_alg: {merge_alg}. Must be 'replace' or 'add'.")
return out
def dask_rasterize_wrapper(
geom_array: np.ndarray,
x_offsets: np.ndarray,
y_offsets: np.ndarray,
x_sizes: np.ndarray,
y_sizes: np.ndarray,
offset_array: np.ndarray,
*,
fill: Any,
affine,
all_touched: bool,
merge_alg: str,
dtype_: np.dtype,
**kwargs,
) -> np.ndarray:
"""Dask wrapper for exactextract rasterization."""
offset = offset_array.item()
return rasterize_geometries(
geom_array[:, 0, 0].tolist(),
affine=affine * affine.translation(x_offsets.item(), y_offsets.item()),
shape=(y_sizes.item(), x_sizes.item()),
offset=offset,
all_touched=all_touched,
merge_alg=merge_alg,
fill=fill,
dtype=dtype_,
)[np.newaxis, :, :]
def np_geometry_mask(
geometries,
*,
shape: tuple[int, int],
affine,
all_touched: bool = False,
invert: bool = False,
**kwargs,
) -> np.ndarray:
"""
Create a geometry mask using exactextract.
Parameters
----------
geometries : Sequence[Geometry]
Shapely geometries for masking.
shape : tuple[int, int]
Output shape (nrows, ncols).
affine : Affine
Affine transform for the output grid.
all_touched : bool
If True, all pixels touched by geometries will be included.
Note: exactextract does not support this parameter directly.
invert : bool
If True, pixels inside geometries are True (unmasked).
If False (default), pixels inside geometries are False (masked).
**kwargs
Additional arguments (ignored for compatibility).
Returns
-------
np.ndarray
Boolean mask array with shape (nrows, ncols).
"""
if all_touched:
raise NotImplementedError(
"all_touched=True is not supported by the exactextract engine. "
"Use engine='rasterio' or engine='rusterize' if you need all_touched support."
)
if len(geometries) == 0:
# No geometries: all pixels are outside (masked=True) or inside (masked=False)
return np.full(shape, not invert, dtype=bool)
# Create GeoDataFrame (exactextract requires it)
gdf = gpd.GeoDataFrame(geometry=list(geometries), crs="EPSG:4326")
# Use exactextract to get coverage
raster = affine_to_raster_source(affine, shape, srs_wkt=gdf.crs.to_wkt())
result = exact_extract(
rast=raster,
vec=gdf,
ops=["cell_id", "coverage(coverage_weight=none)"],
output="pandas",
)
# Collect all covered cell IDs
all_cell_ids = set()
for i in range(len(geometries)):
cell_ids = result.cell_id.values[i]
all_cell_ids.update(cell_ids)
# Create mask: True = outside geometry (masked), False = inside
inside = np.zeros(shape, dtype=bool)
if all_cell_ids:
np.put(inside, list(all_cell_ids), True)
if invert:
return inside # True = inside
else:
return ~inside # True = outside (masked)
def dask_mask_wrapper(
geom_array: np.ndarray,
x_offsets: np.ndarray,
y_offsets: np.ndarray,
x_sizes: np.ndarray,
y_sizes: np.ndarray,
*,
affine,
**kwargs,
) -> np.ndarray:
"""Dask wrapper for exactextract geometry masking."""
res = np_geometry_mask(
geom_array[:, 0, 0].tolist(),
shape=(y_sizes.item(), x_sizes.item()),
affine=affine * affine.translation(x_offsets.item(), y_offsets.item()),
**kwargs,
)
return res[np.newaxis, :, :]