Source code for rasterix.rasterize.exact

# exactexact wrappers
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Literal

import geopandas as gpd
import numpy as np
import sparse
import xarray as xr
from exactextract import exact_extract
from exactextract.raster import NumPyRasterSource

from ..rasterize.core import _get_affine
from ..utils import get_grid_mapping_var
from .utils import clip_to_bbox, geometries_as_dask_array, is_in_memory

if TYPE_CHECKING:
    import dask.array
    import dask_geopandas

MIN_CHUNK_SIZE = 2  # exactextract cannot handle arrays of size 1.
GEOM_AXIS = 0
Y_AXIS = 1
X_AXIS = 2

DEFAULT_STRATEGY = "feature-sequential"
Strategy = Literal["feature-sequential", "raster-sequential", "raster-parallel"]
CoverageWeights = Literal["area_spherical_m2", "area_cartesian", "area_spherical_km2", "fraction", "none"]

__all__ = ["coverage"]


def affine_to_raster_source(affine, shape: tuple[int, int], *, srs_wkt: str | None) -> NumPyRasterSource:
    """Convert affine transform and shape directly to a NumPyRasterSource.

    Parameters
    ----------
    affine : Affine
        Affine transform for the raster grid (top-left corner convention).
    shape : tuple[int, int]
        Shape of the raster (nrows, ncols).
    srs_wkt : str or None
        Well-known text representation of the spatial reference system.

    Returns
    -------
    NumPyRasterSource
        A raster source with the specified extent for use with exactextract.
    """
    nrows, ncols = shape
    # Compute the four corners using the affine transform
    # The affine maps pixel coordinates to world coordinates
    # Top-left: (0, 0), Top-right: (ncols, 0), Bottom-left: (0, nrows), Bottom-right: (ncols, nrows)
    x0, y0 = affine * (0, 0)  # top-left
    x1, y1 = affine * (ncols, nrows)  # bottom-right

    xmin, xmax = min(x0, x1), max(x0, x1)
    ymin, ymax = min(y0, y1), max(y0, y1)

    return NumPyRasterSource(
        np.broadcast_to([1], shape),
        xmin=xmin,
        xmax=xmax,
        ymin=ymin,
        ymax=ymax,
        srs_wkt=srs_wkt,
    )


def xy_to_raster_source(x: np.ndarray, y: np.ndarray, *, srs_wkt: str | None) -> NumPyRasterSource:
    assert x.ndim == 1
    assert y.ndim == 1

    xsize = x.size
    ysize = y.size

    # we need the top left corner, and the bottom right corner
    dx0 = (x[1] - x[0]) / 2
    dx1 = (x[-1] - x[-2]) / 2
    dy0 = np.abs(y[1] - y[0]) / 2
    dy1 = np.abs(y[-1] - y[-2]) / 2
    if y[0] > y[-1]:
        dy0, dy1 = dy1, dy0

    shape = (ysize, xsize)
    raster = NumPyRasterSource(
        np.broadcast_to([1], shape),
        xmin=x.min() - dx0,
        xmax=x.max() + dx1,
        ymin=y.min() - dy0,
        ymax=y.max() + dy1,
        srs_wkt=srs_wkt,
    )

    return raster


def get_dtype(coverage_weight: CoverageWeights, geometries):
    if coverage_weight.lower() == "none":
        dtype = np.uint8
    else:
        dtype = np.float64
    return dtype


def np_coverage(
    affine,
    *,
    shape: tuple[int, int],
    geometries: gpd.GeoDataFrame,
    strategy: Strategy = DEFAULT_STRATEGY,
    coverage_weight: CoverageWeights = "fraction",
) -> np.ndarray[Any, Any]:
    dtype = get_dtype(coverage_weight, geometries)

    if len(geometries.columns) > 1:
        raise ValueError("Require a single geometries column or a GeoSeries.")

    raster = affine_to_raster_source(affine, shape, srs_wkt=geometries.crs.to_wkt())
    result = exact_extract(
        rast=raster,
        vec=geometries,
        ops=["cell_id", f"coverage(coverage_weight={coverage_weight})"],
        output="pandas",
        # max_cells_in_memory=2*x.size * y.size
    )

    # Vectorized construction of COO sparse array
    # Notes on GCXS vs COO: For N data points in 263 geoms by 4000 x by 4000 y
    # 1. GCXS cannot compress _all_ axes. This is relevant here.
    # 2. GCXS: indptr is 4000*4000 + 1, N per indices & N per data
    # 3. COO: 4*N
    # It is not obvious that there is much improvement to GCXS at least currently
    cell_id_arrays = result.cell_id.values
    coverage_arrays = result.coverage.values
    lens = np.array([len(c) for c in cell_id_arrays])

    if lens.sum() == 0:
        # No coverage - return empty sparse array
        return sparse.COO(
            (np.array([], dtype=np.int64), np.array([], dtype=np.int64), np.array([], dtype=np.int64)),
            data=np.array([], dtype=dtype),
            sorted=True,
            fill_value=0,
            shape=(len(geometries), *shape),
        )

    # Vectorized: repeat geometry indices, concatenate cell_ids and coverage values
    geom_idxs = np.repeat(np.arange(len(geometries)), lens)
    xy_idxs = np.concatenate([c for c in cell_id_arrays if len(c) > 0])
    data = np.concatenate([c for c in coverage_arrays if len(c) > 0]).astype(dtype)

    return sparse.COO(
        (geom_idxs, *np.unravel_index(xy_idxs, shape=shape)),
        data=data,
        sorted=True,
        fill_value=0,
        shape=(len(geometries), *shape),
    )


def coverage_np_dask_wrapper(
    geom_array: np.ndarray,
    x_offsets: np.ndarray,
    y_offsets: np.ndarray,
    x_sizes: np.ndarray,
    y_sizes: np.ndarray,
    affine,
    coverage_weight: CoverageWeights,
    strategy: Strategy,
    crs,
) -> np.ndarray:
    shape = (y_sizes.item(), x_sizes.item())
    chunk_affine = affine * affine.translation(x_offsets.item(), y_offsets.item())
    return np_coverage(
        chunk_affine,
        shape=shape,
        geometries=gpd.GeoDataFrame(geometry=geom_array.squeeze(axis=(X_AXIS, Y_AXIS)), crs=crs),
        coverage_weight=coverage_weight,
    )


def dask_coverage(
    affine,
    *,
    chunks: tuple[tuple[int, ...], tuple[int, ...]],
    geom_array: dask.array.Array,
    coverage_weight: CoverageWeights = "fraction",
    strategy: Strategy = DEFAULT_STRATEGY,
    crs: Any,
) -> dask.array.Array:
    import dask.array
    from dask.array import from_array

    y_chunks, x_chunks = chunks

    if any(c == 1 for c in x_chunks) or any(c == 1 for c in y_chunks):
        raise ValueError("exactextract does not support a chunksize of 1. Please rechunk to avoid this")

    x_sizes = from_array(x_chunks, chunks=1)
    y_sizes = from_array(y_chunks, chunks=1)
    x_offsets = from_array(np.cumulative_sum(x_chunks[:-1], include_initial=True), chunks=1)
    y_offsets = from_array(np.cumulative_sum(y_chunks[:-1], include_initial=True), chunks=1)

    out = dask.array.map_blocks(
        coverage_np_dask_wrapper,
        geom_array[:, np.newaxis, np.newaxis],
        x_offsets[np.newaxis, np.newaxis, :],
        y_offsets[np.newaxis, :, np.newaxis],
        x_sizes[np.newaxis, np.newaxis, :],
        y_sizes[np.newaxis, :, np.newaxis],
        affine=affine,
        crs=crs,
        coverage_weight=coverage_weight,
        strategy=strategy,
        chunks=(*geom_array.chunks, tuple(y_chunks), tuple(x_chunks)),
        meta=sparse.COO(
            [], data=np.array([], dtype=get_dtype(coverage_weight, geom_array)), shape=(0, 0, 0), fill_value=0
        ),
    )
    return out


[docs] def coverage( obj: xr.Dataset | xr.DataArray, geometries: gpd.GeoDataFrame | dask_geopandas.GeoDataFrame, *, xdim="x", ydim="y", strategy: Strategy = "feature-sequential", coverage_weight: CoverageWeights = "fraction", clip: bool = False, ) -> xr.DataArray: """Calculate pixel coverage fractions for geometries using exactextract. This function computes how much of each raster pixel is covered by each geometry, returning precise fractional coverage values or area measurements. It supports both in-memory and dask-based computation for large datasets. Parameters ---------- obj : xarray.DataArray or xarray.Dataset Xarray object defining the raster grid. If a grid mapping coordinate variable (e.g. ``spatial_ref``) is present, it will be propagated to the output. geometries : geopandas.GeoDataFrame or dask_geopandas.GeoDataFrame Vector geometries for which to calculate coverage. CRS should match the raster object (though this is not currently enforced). xdim : str, default "x" Name of the x (longitude/easting) dimension in the raster object. ydim : str, default "y" Name of the y (latitude/northing) dimension in the raster object. strategy : {"feature-sequential", "raster-sequential", "raster-parallel"}, default "feature-sequential" Processing strategy passed to exactextract. Controls how computation is parallelized and memory is managed. coverage_weight : {"fraction", "none", "area_cartesian", "area_spherical_m2", "area_spherical_km2"}, default "fraction" Type of coverage measurement to compute: - "fraction": Fractional coverage (0-1) - "none": Binary coverage (0 or 1) - "area_cartesian": Area in map units squared - "area_spherical_m2": Spherical area in square meters - "area_spherical_km2": Spherical area in square kilometers clip: bool If True, clip raster to the bounding box of the geometries. Ignored for dask-geopandas geometries. Returns ------- xarray.DataArray 3D DataArray with dimensions (geometry, y, x) containing coverage values. Data type depends on coverage_weight: uint8 for "none", float64 otherwise. Includes appropriate units and long_name attributes for area measurements. Raises ------ ValueError If exactextract encounters chunks of size 1 (not supported by exactextract). See Also -------- exactextract.exact_extract : Underlying exactextract function used for coverage calculation Examples -------- Calculate fractional coverage: >>> import rasterix.rasterize.exact as exact >>> import xarray as xr >>> import geopandas as gpd >>> # Load raster data with CRS info >>> raster = xr.open_dataarray("data.tif") >>> # Load vector geometries >>> geometries = gpd.read_file("polygons.shp") >>> # Calculate coverage fractions >>> coverage = exact.coverage(raster, geometries) >>> print(coverage.dims) # ('geometry', 'y', 'x') Calculate area in square meters: >>> area_coverage = exact.coverage(raster, geometries, coverage_weight="area_spherical_m2") >>> print(area_coverage.units) # 'm2' """ # FIXME: assert obj.crs == geometries.crs if clip: obj = clip_to_bbox(obj, geometries, xdim=xdim, ydim=ydim) affine = _get_affine(obj, x_dim=xdim, y_dim=ydim) shape = (obj.sizes[ydim], obj.sizes[xdim]) if is_in_memory(obj=obj, geometries=geometries): out = np_coverage( affine, shape=shape, geometries=geometries, coverage_weight=coverage_weight, strategy=strategy, ) geom_array = geometries.to_numpy().squeeze(axis=1) else: geom_dask_array = geometries_as_dask_array(geometries) chunks = ( obj.chunksizes.get(ydim, (obj.sizes[ydim],)), obj.chunksizes.get(xdim, (obj.sizes[xdim],)), ) out = dask_coverage( affine, chunks=chunks, geom_array=geom_dask_array, crs=geometries.crs, coverage_weight=coverage_weight, strategy=strategy, ) if isinstance(geometries, gpd.GeoDataFrame): geom_array = geometries.to_numpy().squeeze(axis=1) else: geom_array = geom_dask_array name = "coverage" attrs = {} if "area" in coverage_weight: name = "area" if "_m2" in coverage_weight or coverage_weight == "area_cartesian": attrs["long_name"] = coverage_weight.removesuffix("_m2") attrs["units"] = "m2" elif "_km2" in coverage_weight: attrs["long_name"] = coverage_weight.removesuffix("_km2") attrs["units"] = "km2" xy_coords = [ xr.Coordinates.from_xindex(obj.xindexes.get(dim)) for dim in (xdim, ydim) if obj.xindexes.get(dim) is not None ] grid_mapping_coords: dict = {} if (grid_mapping := get_grid_mapping_var(obj)) is not None: grid_mapping_coords[grid_mapping.name] = grid_mapping coords = xr.Coordinates( coords={ **grid_mapping_coords, "geometry": geom_array, }, indexes={}, ) if xy_coords: for c in xy_coords: coords = coords.merge(c) coords = coords.coords coverage = xr.DataArray(dims=("geometry", ydim, xdim), data=out, coords=coords, attrs=attrs, name=name) return coverage
# ============================================================================ # Engine functions for use with rasterize() in core.py # These provide the same interface as rasterio.py and rusterize.py engines # ============================================================================ def rasterize_geometries( geometries, *, dtype: np.dtype, shape: tuple[int, int], affine, offset: int, all_touched: bool = False, merge_alg: str = "replace", fill: Any = 0, **kwargs, ) -> np.ndarray: """ Rasterize geometries using exactextract for precise coverage. Uses exactextract to compute which pixels are covered by each geometry, then reduces to geometry indices using the specified merge algorithm. Parameters ---------- geometries : Sequence[Geometry] Shapely geometries to rasterize. dtype : np.dtype Output data type. shape : tuple[int, int] Output shape (nrows, ncols). affine : Affine Affine transform for the output grid. offset : int Starting value for geometry indices. all_touched : bool If True, all pixels touched by geometries will be burned in. Note: exactextract does not support this parameter directly. merge_alg : str Merge algorithm: "replace" or "add". fill : Any Fill value for pixels not covered by any geometry. **kwargs Additional arguments (ignored for compatibility). Returns ------- np.ndarray Rasterized array with shape (nrows, ncols). """ if all_touched: raise NotImplementedError( "all_touched=True is not supported by the exactextract engine. " "Use engine='rusterize' or engine='rasterio' if you need all_touched support." ) if len(geometries) == 0: return np.full(shape, fill, dtype=dtype) # Create GeoDataFrame (exactextract requires it) # Use a dummy CRS - exactextract needs one but the algorithm doesn't depend on it gdf = gpd.GeoDataFrame(geometry=list(geometries), crs="EPSG:4326") # Use exactextract to get coverage (binary mode for efficiency) raster = affine_to_raster_source(affine, shape, srs_wkt=gdf.crs.to_wkt()) result = exact_extract( rast=raster, vec=gdf, ops=["cell_id", "coverage(coverage_weight=none)"], output="pandas", ) # Initialize output with fill value out = np.full(shape, fill, dtype=dtype) # Vectorized merging: concatenate all cell_ids and corresponding values cell_id_arrays = result.cell_id.values lens = np.array([len(c) for c in cell_id_arrays]) if lens.sum() == 0: return out all_cell_ids = np.concatenate([c for c in cell_id_arrays if len(c) > 0]) all_values = np.repeat(np.arange(len(geometries), dtype=dtype) + offset, lens) # Process based on merge algorithm if merge_alg == "replace": # Later geometries overwrite earlier ones (last write wins) np.put(out, all_cell_ids, all_values) elif merge_alg == "add": # Sum values where geometries overlap (unbuffered addition) np.add.at(out.ravel(), all_cell_ids, all_values) else: raise ValueError(f"Unsupported merge_alg: {merge_alg}. Must be 'replace' or 'add'.") return out def dask_rasterize_wrapper( geom_array: np.ndarray, x_offsets: np.ndarray, y_offsets: np.ndarray, x_sizes: np.ndarray, y_sizes: np.ndarray, offset_array: np.ndarray, *, fill: Any, affine, all_touched: bool, merge_alg: str, dtype_: np.dtype, **kwargs, ) -> np.ndarray: """Dask wrapper for exactextract rasterization.""" offset = offset_array.item() return rasterize_geometries( geom_array[:, 0, 0].tolist(), affine=affine * affine.translation(x_offsets.item(), y_offsets.item()), shape=(y_sizes.item(), x_sizes.item()), offset=offset, all_touched=all_touched, merge_alg=merge_alg, fill=fill, dtype=dtype_, )[np.newaxis, :, :] def np_geometry_mask( geometries, *, shape: tuple[int, int], affine, all_touched: bool = False, invert: bool = False, **kwargs, ) -> np.ndarray: """ Create a geometry mask using exactextract. Parameters ---------- geometries : Sequence[Geometry] Shapely geometries for masking. shape : tuple[int, int] Output shape (nrows, ncols). affine : Affine Affine transform for the output grid. all_touched : bool If True, all pixels touched by geometries will be included. Note: exactextract does not support this parameter directly. invert : bool If True, pixels inside geometries are True (unmasked). If False (default), pixels inside geometries are False (masked). **kwargs Additional arguments (ignored for compatibility). Returns ------- np.ndarray Boolean mask array with shape (nrows, ncols). """ if all_touched: raise NotImplementedError( "all_touched=True is not supported by the exactextract engine. " "Use engine='rasterio' or engine='rusterize' if you need all_touched support." ) if len(geometries) == 0: # No geometries: all pixels are outside (masked=True) or inside (masked=False) return np.full(shape, not invert, dtype=bool) # Create GeoDataFrame (exactextract requires it) gdf = gpd.GeoDataFrame(geometry=list(geometries), crs="EPSG:4326") # Use exactextract to get coverage raster = affine_to_raster_source(affine, shape, srs_wkt=gdf.crs.to_wkt()) result = exact_extract( rast=raster, vec=gdf, ops=["cell_id", "coverage(coverage_weight=none)"], output="pandas", ) # Collect all covered cell IDs all_cell_ids = set() for i in range(len(geometries)): cell_ids = result.cell_id.values[i] all_cell_ids.update(cell_ids) # Create mask: True = outside geometry (masked), False = inside inside = np.zeros(shape, dtype=bool) if all_cell_ids: np.put(inside, list(all_cell_ids), True) if invert: return inside # True = inside else: return ~inside # True = outside (masked) def dask_mask_wrapper( geom_array: np.ndarray, x_offsets: np.ndarray, y_offsets: np.ndarray, x_sizes: np.ndarray, y_sizes: np.ndarray, *, affine, **kwargs, ) -> np.ndarray: """Dask wrapper for exactextract geometry masking.""" res = np_geometry_mask( geom_array[:, 0, 0].tolist(), shape=(y_sizes.item(), x_sizes.item()), affine=affine * affine.translation(x_offsets.item(), y_offsets.item()), **kwargs, ) return res[np.newaxis, :, :]