import os
from pathlib import Path
import earthaccess
import rasterio
import rioxarray # noqa
import s3fs
import xarray as xr
import zarr
from common import earthaccess_args
from icechunk import (
IcechunkStore,
S3Credentials,
StorageConfig,
StoreConfig,
VirtualRefConfig,
)
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
from virtualizarr import open_virtual_dataset
from virtualizarr.writers.icechunk import dataset_to_icechunkDownload and virtualize dataset
Setup earthaccess query parameters
dataset = "mursst"
dataset_args = earthaccess_args[dataset]
concept_id = dataset_args["concept_id"]
filename = dataset_args["filename"]
variable = dataset_args["variable"]Authenticate via earthaccess
earthaccess.login()Download dataset
results = earthaccess.search_data(
concept_id=concept_id, count=1, temporal=("2002-06-01", "2002-06-01")
)
fp = earthaccess.download(results, "earthaccess_data")[0]Virtualize dataset
def virtualize_dataset(local_fp):
"""Create a virtual reference file for a dataset"""
def local_to_s3_url(old_local_path: str) -> str:
"""Replace local path to s3 uri for all chucks"""
new_s3_bucket_url = Path("/".join(s3_uri.split("/")[1:-1]))
filename = Path(old_local_path).name
new_path = f"s3:/{str(new_s3_bucket_url / filename)}"
return new_path
s3_uri = results[0].data_links(access="direct")[0]
virtual_ds = open_virtual_dataset(str(local_fp), indexes={})
virtual_ds = virtual_ds.virtualize.rename_paths(local_to_s3_url)
virtual_ds = virtual_ds[[variable]]
return virtual_ds.drop_vars("time")virtual_ds = virtualize_dataset(fp)Store virtual dataset as kerchunk reference
s3_uri = results[0].data_links(access="direct")[0]
if dataset == "gpm_merg":
output_fp = f"earthaccess_data/{s3_uri.split('/')[-1][:-4]}.json"
else:
output_fp = f"earthaccess_data/{s3_uri.split('/')[-1][:-3]}.json"
virtual_ds.virtualize.to_kerchunk(output_fp, format="json")Store virtual dataset using icechunk
s3_creds = earthaccess.get_s3_credentials(daac=dataset_args["daac"])
credentials = S3Credentials(
access_key_id=s3_creds["accessKeyId"],
secret_access_key=s3_creds["secretAccessKey"],
session_token=s3_creds["sessionToken"],
)
storage = StorageConfig.s3_from_env(
bucket="nasa-veda-scratch",
prefix=f"resampling/icechunk/{dataset}-reference",
region="us-west-2",
)
config = StoreConfig(
virtual_ref_config=VirtualRefConfig.s3_from_config(credentials=credentials),
)
virtual_store = IcechunkStore.open_or_create(storage=storage, config=config, mode="w")
dataset_to_icechunk(virtual_ds, virtual_store)
virtual_store.commit("Create refenence dataset")Store dataset using Zarr V3 and icechunk
virtual_storage = StorageConfig.s3_from_env(
bucket="nasa-veda-scratch",
prefix=f"resampling/icechunk/{dataset}-reference",
region="us-west-2",
)
virtual_store = IcechunkStore.open_existing(storage=virtual_storage, mode="r")
ds = xr.open_zarr(virtual_store, zarr_format=3, consolidated=False).load()
ds = ds.drop_encoding()
ds = ds.squeeze()
if dataset == "gpm_imerg":
ds = ds.transpose("lat", "lon")
encoding = {
variable: {
"codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()],
}
}
storage = StorageConfig.s3_from_env(
bucket="nasa-veda-scratch",
prefix=f"resampling/icechunk/{dataset}",
region="us-west-2",
)
store = IcechunkStore.open_or_create(storage=storage, mode="w")
ds.to_zarr(store, zarr_format=3, consolidated=False, encoding=encoding)
store.commit("Add dataset")Store dataset as COG
def _translate(src_path, dst_path, profile="zstd", profile_options={}, **options):
"""
Convert image to COG.
From https://cogeotiff.github.io/rio-cogeo/API/
"""
# Format creation option (see gdalwarp `-co` option)
output_profile = cog_profiles.get(profile)
output_profile.update(dict(BIGTIFF="IF_SAFER"))
output_profile.update(profile_options)
# Dataset Open option (see gdalwarp `-oo` option)
config = dict(
GDAL_NUM_THREADS="ALL_CPUS",
GDAL_TIFF_INTERNAL_MASK=True,
GDAL_TIFF_OVR_BLOCKSIZE="128",
)
cog_translate(
src_path,
dst_path,
output_profile,
config=config,
in_memory=False,
quiet=True,
**options,
)
return True
# Only store MUR SST since it has the expected (time, y, x) axis order
if dataset == "mursst":
args = earthaccess_args[dataset]
src = f'NETCDF:earthaccess_data/{args["filename"]}:{args["variable"]}'
dst = f"earthaccess_data/{dataset}.tif"
# Generate local COG
with rasterio.open(src) as da:
_translate(da, dst)
# Upload to S3
remote_uri = f"s3://nasa-veda-scratch/resampling/{dataset}.tif"
fs = s3fs.S3FileSystem()
fs.put(dst, remote_uri)Store overviews as Zarr
local = False
if local:
dst = f"{os.getcwd()}/earthaccess_data/{dataset}-overviews.zarr"
storage = StorageConfig.filesystem(dst)
else:
storage = StorageConfig.s3_from_env(
bucket="nasa-veda-scratch",
prefix=f"resampling/icechunk/{dataset}-overviews.zarr",
region="us-west-2",
)
store = IcechunkStore.open_or_create(storage=storage, mode="w")
src = f"earthaccess_data/{dataset}.tif"
cog = rasterio.open(src)
data = rioxarray.open_rasterio(src)
scale = float(data.attrs["scale_factor"])
offset = float(data.attrs["add_offset"])
data = rioxarray.open_rasterio(src)
data = data.drop_encoding()
data.attrs = {}
encoding = {"var": {"codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()]}}
data = data.to_dataset(name="var")
data.to_zarr(store, group="data", mode="w", encoding=encoding)
for ind, level in enumerate(cog.overviews(1)):
overview = rioxarray.open_rasterio(src, overview_level=ind)
overview = overview.load() * scale + offset
overview = overview.drop_encoding()
overview.attrs = {}
overview = overview.to_dataset(name="var")
overview.to_zarr(store, group=str(level), mode="w", encoding=encoding)
store.commit("Add overviews")