import os
from pathlib import Path
import earthaccess
import rasterio
import rioxarray # noqa
import s3fs
import xarray as xr
import zarr
from common import earthaccess_args
from icechunk import (
IcechunkStore,
S3Credentials,
StorageConfig,
StoreConfig,
VirtualRefConfig,
)from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
from virtualizarr import open_virtual_dataset
from virtualizarr.writers.icechunk import dataset_to_icechunk
Download and virtualize dataset
Setup earthaccess query parameters
= "mursst"
dataset = earthaccess_args[dataset]
dataset_args = dataset_args["concept_id"]
concept_id = dataset_args["filename"]
filename = dataset_args["variable"] variable
Authenticate via earthaccess
earthaccess.login()
Download dataset
= earthaccess.search_data(
results =concept_id, count=1, temporal=("2002-06-01", "2002-06-01")
concept_id
)= earthaccess.download(results, "earthaccess_data")[0] fp
Virtualize dataset
def virtualize_dataset(local_fp):
"""Create a virtual reference file for a dataset"""
def local_to_s3_url(old_local_path: str) -> str:
"""Replace local path to s3 uri for all chucks"""
= Path("/".join(s3_uri.split("/")[1:-1]))
new_s3_bucket_url = Path(old_local_path).name
filename = f"s3:/{str(new_s3_bucket_url / filename)}"
new_path return new_path
= results[0].data_links(access="direct")[0]
s3_uri = open_virtual_dataset(str(local_fp), indexes={})
virtual_ds = virtual_ds.virtualize.rename_paths(local_to_s3_url)
virtual_ds = virtual_ds[[variable]]
virtual_ds return virtual_ds.drop_vars("time")
= virtualize_dataset(fp) virtual_ds
Store virtual dataset as kerchunk reference
= results[0].data_links(access="direct")[0]
s3_uri if dataset == "gpm_merg":
= f"earthaccess_data/{s3_uri.split('/')[-1][:-4]}.json"
output_fp else:
= f"earthaccess_data/{s3_uri.split('/')[-1][:-3]}.json"
output_fp format="json") virtual_ds.virtualize.to_kerchunk(output_fp,
Store virtual dataset using icechunk
= earthaccess.get_s3_credentials(daac=dataset_args["daac"])
s3_creds = S3Credentials(
credentials =s3_creds["accessKeyId"],
access_key_id=s3_creds["secretAccessKey"],
secret_access_key=s3_creds["sessionToken"],
session_token
)= StorageConfig.s3_from_env(
storage ="nasa-veda-scratch",
bucket=f"resampling/icechunk/{dataset}-reference",
prefix="us-west-2",
region
)= StoreConfig(
config =VirtualRefConfig.s3_from_config(credentials=credentials),
virtual_ref_config
)= IcechunkStore.open_or_create(storage=storage, config=config, mode="w")
virtual_store
dataset_to_icechunk(virtual_ds, virtual_store)"Create refenence dataset") virtual_store.commit(
Store dataset using Zarr V3 and icechunk
= StorageConfig.s3_from_env(
virtual_storage ="nasa-veda-scratch",
bucket=f"resampling/icechunk/{dataset}-reference",
prefix="us-west-2",
region
)= IcechunkStore.open_existing(storage=virtual_storage, mode="r")
virtual_store = xr.open_zarr(virtual_store, zarr_format=3, consolidated=False).load()
ds = ds.drop_encoding()
ds = ds.squeeze()
ds if dataset == "gpm_imerg":
= ds.transpose("lat", "lon")
ds = {
encoding
variable: {"codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()],
}
}= StorageConfig.s3_from_env(
storage ="nasa-veda-scratch",
bucket=f"resampling/icechunk/{dataset}",
prefix="us-west-2",
region
)= IcechunkStore.open_or_create(storage=storage, mode="w")
store =3, consolidated=False, encoding=encoding)
ds.to_zarr(store, zarr_format"Add dataset") store.commit(
Store dataset as COG
def _translate(src_path, dst_path, profile="zstd", profile_options={}, **options):
"""
Convert image to COG.
From https://cogeotiff.github.io/rio-cogeo/API/
"""
# Format creation option (see gdalwarp `-co` option)
= cog_profiles.get(profile)
output_profile dict(BIGTIFF="IF_SAFER"))
output_profile.update(
output_profile.update(profile_options)
# Dataset Open option (see gdalwarp `-oo` option)
= dict(
config ="ALL_CPUS",
GDAL_NUM_THREADS=True,
GDAL_TIFF_INTERNAL_MASK="128",
GDAL_TIFF_OVR_BLOCKSIZE
)
cog_translate(
src_path,
dst_path,
output_profile,=config,
config=False,
in_memory=True,
quiet**options,
)return True
# Only store MUR SST since it has the expected (time, y, x) axis order
if dataset == "mursst":
= earthaccess_args[dataset]
args = f'NETCDF:earthaccess_data/{args["filename"]}:{args["variable"]}'
src = f"earthaccess_data/{dataset}.tif"
dst # Generate local COG
with rasterio.open(src) as da:
_translate(da, dst)# Upload to S3
= f"s3://nasa-veda-scratch/resampling/{dataset}.tif"
remote_uri = s3fs.S3FileSystem()
fs fs.put(dst, remote_uri)
Store overviews as Zarr
= False
local if local:
= f"{os.getcwd()}/earthaccess_data/{dataset}-overviews.zarr"
dst = StorageConfig.filesystem(dst)
storage
else:
= StorageConfig.s3_from_env(
storage ="nasa-veda-scratch",
bucket=f"resampling/icechunk/{dataset}-overviews.zarr",
prefix="us-west-2",
region
)= IcechunkStore.open_or_create(storage=storage, mode="w")
store = f"earthaccess_data/{dataset}.tif"
src = rasterio.open(src)
cog = rioxarray.open_rasterio(src)
data = float(data.attrs["scale_factor"])
scale = float(data.attrs["add_offset"])
offset = rioxarray.open_rasterio(src)
data = data.drop_encoding()
data = {}
data.attrs = {"var": {"codecs": [zarr.codecs.BytesCodec(), zarr.codecs.ZstdCodec()]}}
encoding
= data.to_dataset(name="var")
data ="data", mode="w", encoding=encoding)
data.to_zarr(store, groupfor ind, level in enumerate(cog.overviews(1)):
= rioxarray.open_rasterio(src, overview_level=ind)
overview = overview.load() * scale + offset
overview = overview.drop_encoding()
overview = {}
overview.attrs = overview.to_dataset(name="var")
overview =str(level), mode="w", encoding=encoding)
overview.to_zarr(store, group"Add overviews") store.commit(