Resampling with XESMF (S3 storage, NetCDF file, Zarr reader, icechunk virtualization, and pre-generated weights)

import argparse

import numpy as np
import xarray as xr
import xesmf as xe
from icechunk import IcechunkStore, StorageConfig
def _reconstruct_xesmf_weights(ds_w):
    """
    Reconstruct weights into format that xESMF understands

    Notes
    -----
    From ndpyramid - https://github.com/carbonplan/ndpyramid
    """
    import sparse
    import xarray as xr

    col = ds_w["col"].values - 1
    row = ds_w["row"].values - 1
    s = ds_w["S"].values
    n_out, n_in = ds_w.attrs["n_out"], ds_w.attrs["n_in"]
    crds = np.stack([row, col])
    return xr.DataArray(
        sparse.COO(crds, s, (n_out, n_in)), dims=("out_dim", "in_dim"), name="weights"
    )
def regrid(dataset, zoom=0):
    from common import earthaccess_args  # noqa: 402

    args = earthaccess_args[dataset]
    # Load pre-generated weights and target dataset
    weights_storage = StorageConfig.s3_from_env(
        bucket="nasa-veda-scratch",
        prefix=f"resampling/test-weight-caching/{dataset}-weights-{zoom}",
        region="us-west-2",
    )
    target_storage = StorageConfig.s3_from_env(
        bucket="nasa-veda-scratch",
        prefix=f"resampling/test-weight-caching/{dataset}-target-{zoom}",
        region="us-west-2",
    )
    weights_store = IcechunkStore.open_existing(storage=weights_storage, mode="r")
    target_store = IcechunkStore.open_existing(storage=target_storage, mode="r")
    weights = _reconstruct_xesmf_weights(
        xr.open_zarr(weights_store, zarr_format=3, consolidated=False)
    )
    grid = xr.open_zarr(target_store, zarr_format=3, consolidated=False)
    # Open dataset
    storage = StorageConfig.s3_from_env(
        bucket="nasa-veda-scratch",
        prefix=f"resampling/icechunk/{dataset}-reference",
        region="us-west-2",
    )
    store = IcechunkStore.open_existing(storage=storage, mode="r")
    da = xr.open_zarr(store, zarr_format=3, consolidated=False)[args["variable"]]
    # Create XESMF regridder
    regridder = xe.Regridder(
        da,
        grid,
        "nearest_s2d",
        periodic=True,
        extrap_method="nearest_s2d",
        ignore_degenerate=True,
        reuse_weights=True,
        weights=weights,
    )
    # Regrid dataset
    return regridder(da)
if __name__ == "__main__":
    if "get_ipython" in dir():
        # Just call warp_resample if running as a Jupyter Notebook
        da = regrid("gpm_imerg")
    else:
        # Configure dataset via argpase if running via CLI
        parser = argparse.ArgumentParser(description="Set environment for the script.")
        parser.add_argument(
            "--dataset",
            default="gpm_imerg",
            help="Dataset to resample.",
            choices=["gpm_imerg", "mursst"],
        )
        parser.add_argument(
            "--zoom",
            default=0,
            help="Zoom level for tile extent.",
        )
        user_args = parser.parse_args()
        da = regrid(user_args.dataset, int(user_args.zoom))