Download and virtualize dataset

import os
from pathlib import Path

import earthaccess
import rasterio
import rioxarray  # noqa
import s3fs
import xarray as xr
import zarr
from common import earthaccess_args
from icechunk import (
    IcechunkStore,
    S3Credentials,
    StorageConfig,
    StoreConfig,
    VirtualRefConfig,
)
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
from virtualizarr import open_virtual_dataset
from virtualizarr.writers.icechunk import dataset_to_icechunk

Setup earthaccess query parameters

dataset = "mursst"
dataset_args = earthaccess_args[dataset]
concept_id = dataset_args["concept_id"]
filename = dataset_args["filename"]
variable = dataset_args["variable"]

Authenticate via earthaccess

earthaccess.login()

Download dataset

results = earthaccess.search_data(
    concept_id=concept_id, count=1, temporal=("2002-06-01", "2002-06-01")
)
fp = earthaccess.download(results, "earthaccess_data")[0]

Virtualize dataset

def virtualize_dataset(local_fp):
    """Create a virtual reference file for a dataset"""

    def local_to_s3_url(old_local_path: str) -> str:
        """Replace local path to s3 uri for all chucks"""

        new_s3_bucket_url = Path("/".join(s3_uri.split("/")[1:-1]))
        filename = Path(old_local_path).name
        new_path = f"s3:/{str(new_s3_bucket_url / filename)}"
        return new_path

    s3_uri = results[0].data_links(access="direct")[0]
    virtual_ds = open_virtual_dataset(str(local_fp), indexes={})
    virtual_ds = virtual_ds.virtualize.rename_paths(local_to_s3_url)
    virtual_ds = virtual_ds[[variable]]
    return virtual_ds.drop_vars("time")
virtual_ds = virtualize_dataset(fp)

Store virtual dataset as kerchunk reference

s3_uri = results[0].data_links(access="direct")[0]
if dataset == "gpm_merg":
    output_fp = f"earthaccess_data/{s3_uri.split('/')[-1][:-4]}.json"
else:
    output_fp = f"earthaccess_data/{s3_uri.split('/')[-1][:-3]}.json"
virtual_ds.virtualize.to_kerchunk(output_fp, format="json")

Store virtual dataset using icechunk

s3_creds = earthaccess.get_s3_credentials(daac=dataset_args["daac"])
credentials = S3Credentials(
    access_key_id=s3_creds["accessKeyId"],
    secret_access_key=s3_creds["secretAccessKey"],
    session_token=s3_creds["sessionToken"],
)
storage = StorageConfig.s3_from_env(
    bucket="nasa-veda-scratch",
    prefix=f"resampling/icechunk/{dataset}-reference",
    region="us-west-2",
)
config = StoreConfig(
    virtual_ref_config=VirtualRefConfig.s3_from_config(credentials=credentials),
)
virtual_store = IcechunkStore.open_or_create(storage=storage, config=config, mode="w")
dataset_to_icechunk(virtual_ds, virtual_store)
virtual_store.commit("Create refenence dataset")

Store dataset using Zarr V3 and icechunk

virtual_storage = StorageConfig.s3_from_env(
    bucket="nasa-veda-scratch",
    prefix=f"resampling/icechunk/{dataset}-reference",
    region="us-west-2",
)
virtual_store = IcechunkStore.open_existing(storage=virtual_storage, mode="r")
ds = xr.open_zarr(virtual_store, zarr_format=3, consolidated=False).load()
ds = ds.drop_encoding()
ds = ds.squeeze()
if dataset == "gpm_imerg":
    ds = ds.transpose("lat", "lon")
encoding = {
    variable: {
        "codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()],
    }
}
storage = StorageConfig.s3_from_env(
    bucket="nasa-veda-scratch",
    prefix=f"resampling/icechunk/{dataset}",
    region="us-west-2",
)
store = IcechunkStore.open_or_create(storage=storage, mode="w")
ds.to_zarr(store, zarr_format=3, consolidated=False, encoding=encoding)
store.commit("Add dataset")

Store dataset as COG

def _translate(src_path, dst_path, profile="zstd", profile_options={}, **options):
    """
    Convert image to COG.

    From https://cogeotiff.github.io/rio-cogeo/API/
    """
    # Format creation option (see gdalwarp `-co` option)
    output_profile = cog_profiles.get(profile)
    output_profile.update(dict(BIGTIFF="IF_SAFER"))
    output_profile.update(profile_options)

    # Dataset Open option (see gdalwarp `-oo` option)
    config = dict(
        GDAL_NUM_THREADS="ALL_CPUS",
        GDAL_TIFF_INTERNAL_MASK=True,
        GDAL_TIFF_OVR_BLOCKSIZE="128",
    )

    cog_translate(
        src_path,
        dst_path,
        output_profile,
        config=config,
        in_memory=False,
        quiet=True,
        **options,
    )
    return True


# Only store MUR SST since it has the expected (time, y, x) axis order
if dataset == "mursst":
    args = earthaccess_args[dataset]
    src = f'NETCDF:earthaccess_data/{args["filename"]}:{args["variable"]}'
    dst = f"earthaccess_data/{dataset}.tif"
    # Generate local COG
    with rasterio.open(src) as da:
        _translate(da, dst)
    # Upload to S3
    remote_uri = f"s3://nasa-veda-scratch/resampling/{dataset}.tif"
    fs = s3fs.S3FileSystem()
    fs.put(dst, remote_uri)

Store overviews as Zarr

local = False
if local:
    dst = f"{os.getcwd()}/earthaccess_data/{dataset}-overviews.zarr"
    storage = StorageConfig.filesystem(dst)

else:
    storage = StorageConfig.s3_from_env(
        bucket="nasa-veda-scratch",
        prefix=f"resampling/icechunk/{dataset}-overviews.zarr",
        region="us-west-2",
    )
store = IcechunkStore.open_or_create(storage=storage, mode="w")
src = f"earthaccess_data/{dataset}.tif"
cog = rasterio.open(src)
data = rioxarray.open_rasterio(src)
scale = float(data.attrs["scale_factor"])
offset = float(data.attrs["add_offset"])
data = rioxarray.open_rasterio(src)
data = data.drop_encoding()
data.attrs = {}
encoding = {"var": {"codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()]}}

data = data.to_dataset(name="var")
data.to_zarr(store, group="data", mode="w", encoding=encoding)
for ind, level in enumerate(cog.overviews(1)):
    overview = rioxarray.open_rasterio(src, overview_level=ind)
    overview = overview.load() * scale + offset
    overview = overview.drop_encoding()
    overview.attrs = {}
    overview = overview.to_dataset(name="var")
    overview.to_zarr(store, group=str(level), mode="w", encoding=encoding)
store.commit("Add overviews")