lazymerge datafusion backend¶
Same mosaic as the conventions demo, but using the datafusion=True merge path.
Source arrays are organized in a zarr-datafusion-search store layout: a /meta group
with columnar metadata arrays, and source groups containing band subgroups with
multiscale overviews.
Instead of building an in-memory ScanIndex upfront, merge(datafusion=True) issues
per-chunk spatial SQL queries against the /meta group to find intersecting sources.
This scales to stores with many sources.
The store parameter in merge() accepts any zarr-compatible store (zarr Store,
Icechunk Session) or an obstore object store (LocalStore, S3Store). When
datafusion=True, the store must be obstore-compatible since zarr-datafusion-search
queries via obstore.
import tempfile
import numpy as np
import xarray as xr
import zarr
from affine import Affine
from obstore.store import LocalStore
from rasterix import RasterIndex
from lazymerge.conventions import (
ProjAttrs,
SpatialAttrs,
read_multiscales,
read_spatial,
write_proj,
write_spatial,
)
from lazymerge.merge import merge
from lazymerge.sources import select_overview
1. Create synthetic source arrays with DataFusion store layout¶
Four tiles arranged in a 2x2 grid. The left two tiles are in UTM 18N (EPSG:32618), and the right two are in UTM 17N (EPSG:32617) — covering adjacent geographic areas but stored in different coordinate reference systems.
Each source is a group containing a data/ band subgroup with base (10m) and
overview (20m) arrays, plus the zarr multiscales convention.
We write to a local directory store so we can pass an obstore LocalStore to merge().
DataFusion queries require an obstore-compatible store.
store_path/
meta/ # columnar metadata for DataFusion queries
utm18n_tile_a/
data/
multiscales # convention attr
0/ # base (10m)
1/ # overview (20m)
...
tmpdir = tempfile.mkdtemp()
store = zarr.storage.LocalStore(tmpdir)
root = zarr.open_group(store, mode="w")
sources = [
# UTM 18N tiles (left side)
{
"name": "utm18n_tile_a",
"crs": "EPSG:32618",
"epsg": 32618,
"bbox": (500000.0, 5990000.0, 505000.0, 5995000.0),
"resolution": 10.0,
"fill": 100.0,
},
{
"name": "utm18n_tile_c",
"crs": "EPSG:32618",
"epsg": 32618,
"bbox": (500000.0, 5995000.0, 505000.0, 6000000.0),
"resolution": 10.0,
"fill": 300.0,
},
# UTM 17N tiles (right side — same geographic area as UTM 18N
# x=[503000, 508000], but expressed in UTM zone 17N coordinates)
{
"name": "utm17n_tile_b",
"crs": "EPSG:32617",
"epsg": 32617,
"bbox": (895094.0, 6006920.0, 900511.0, 6012337.0),
"resolution": 10.0,
"fill": 200.0,
},
{
"name": "utm17n_tile_d",
"crs": "EPSG:32617",
"epsg": 32617,
"bbox": (894669.0, 6011912.0, 900086.0, 6017329.0),
"resolution": 10.0,
"fill": 400.0,
},
]
for src in sources:
xmin, ymin, xmax, ymax = src["bbox"]
res = src["resolution"]
width = int((xmax - xmin) / res)
height = int((ymax - ymin) / res)
scene_group = root.create_group(src["name"])
band_group = scene_group.create_group("data")
# Base resolution array (level 0)
base = band_group.create_array("0", shape=(height, width), dtype="f4", chunks=(256, 256))
base[:] = src["fill"]
write_spatial(
base,
SpatialAttrs(
dimensions=["y", "x"],
transform=(res, 0.0, xmin, 0.0, -res, ymax),
bbox=(xmin, ymin, xmax, ymax),
shape=(height, width),
),
)
write_proj(base, ProjAttrs(code=src["crs"]))
# 2x overview array (level 1)
ovr_width = width // 2
ovr_height = height // 2
ovr_res = res * 2
ovr = band_group.create_array("1", shape=(ovr_height, ovr_width), dtype="f4", chunks=(256, 256))
ovr[:] = src["fill"]
write_spatial(
ovr,
SpatialAttrs(
dimensions=["y", "x"],
transform=(ovr_res, 0.0, xmin, 0.0, -ovr_res, ymax),
bbox=(xmin, ymin, xmax, ymax),
shape=(ovr_height, ovr_width),
),
)
write_proj(ovr, ProjAttrs(code=src["crs"]))
# Set multiscales convention on band group
band_group.attrs["multiscales"] = {
"layout": [
{"asset": "0", "transform": {"scale": [1.0, 1.0], "translation": [0.0, 0.0]}},
{
"asset": "1",
"derived_from": "0",
"transform": {"scale": [2.0, 2.0], "translation": [0.5, 0.5]},
},
],
}
# Create obstore LocalStore pointing at the same directory for DataFusion queries
ob_store = LocalStore(tmpdir)
2. Populate the /meta group¶
The /meta group holds columnar arrays that DataFusion queries against.
Each row corresponds to one source scene. By convention, bbox values are
stored in EPSG:4326 so queries work in a single CRS.
import struct
from pyproj import Transformer
def bbox_to_wkb(xmin, ymin, xmax, ymax):
"""Encode a bounding box as a WKB Polygon (little-endian)."""
points = [(xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax), (xmin, ymin)]
wkb = struct.pack("<BII", 1, 3, 1) # LE, Polygon, 1 ring
wkb += struct.pack("<I", 5) # 5 points
for x, y in points:
wkb += struct.pack("<dd", x, y)
return wkb
meta = root.create_group("meta")
ids = np.array([s["name"] for s in sources])
epsgs = np.array([s["epsg"] for s in sources], dtype="int64")
# Affine transform components: (a=res, b=0, c=xmin, d=0, e=-res, f=ymax)
t0 = np.array([s["resolution"] for s in sources])
t1 = np.zeros(len(sources))
t2 = np.array([s["bbox"][0] for s in sources]) # xmin
t3 = np.zeros(len(sources))
t4 = np.array([-s["resolution"] for s in sources])
t5 = np.array([s["bbox"][3] for s in sources]) # ymax
shape_x = np.array(
[int((s["bbox"][2] - s["bbox"][0]) / s["resolution"]) for s in sources], dtype="int64"
)
shape_y = np.array(
[int((s["bbox"][3] - s["bbox"][1]) / s["resolution"]) for s in sources], dtype="int64"
)
# Compute bboxes in EPSG:4326 and encode as WKB (DataFusion convention)
bbox_wkb = []
for s in sources:
xmin, ymin, xmax, ymax = s["bbox"]
transformer = Transformer.from_crs(s["crs"], "EPSG:4326", always_xy=True)
xs = [xmin, xmin, xmax, xmax]
ys = [ymin, ymax, ymin, ymax]
tx, ty = transformer.transform(xs, ys)
lon_min, lon_max = min(tx), max(tx)
lat_min, lat_max = min(ty), max(ty)
bbox_wkb.append(bbox_to_wkb(lon_min, lat_min, lon_max, lat_max))
bbox_wkb_arr = np.array(bbox_wkb, dtype=object)
# Use dtype='string' for variable-length UTF-8 (zarr VariableLengthUTF8)
id_arr = meta.create_array("id", shape=ids.shape, dtype="string")
id_arr[:] = ids
# Store bbox as variable-length bytes (WKB polygons in EPSG:4326)
bbox_arr = meta.create_array("bbox", shape=bbox_wkb_arr.shape, dtype="variable_length_bytes")
bbox_arr[:] = bbox_wkb_arr
meta.create_array("proj:epsg", data=epsgs)
meta.create_array("transform_0", data=t0)
meta.create_array("transform_1", data=t1)
meta.create_array("transform_2", data=t2)
meta.create_array("transform_3", data=t3)
meta.create_array("transform_4", data=t4)
meta.create_array("transform_5", data=t5)
meta.create_array("shape_x", data=shape_x)
meta.create_array("shape_y", data=shape_y)
/Users/seanharkins/projects/lazymerge/docs/examples/.venv/lib/python3.13/site-packages/zarr/core/dtype/npy/bytes.py:1143: UnstableSpecificationWarning: The data type (VariableLengthBytes()) does not have a Zarr V3 specification. That means that the representation of arrays saved with this data type may change without warning in a future version of Zarr Python. Arrays stored with this data type may be unreadable by other Zarr libraries. Use this data type at your own risk! Check https://github.com/zarr-developers/zarr-extensions/tree/main/data-types for the status of data type specifications for Zarr V3. v3_unstable_dtype_warning(self)
<Array file:///var/folders/mp/33cxt8xj36bbxj0jqdwbfwyc0000gn/T/tmp900t4_0d/meta/shape_y shape=(4,) dtype=int64>
3. Overview selection¶
Before running the full merge, let's demonstrate how overview selection works.
This part uses only lazymerge (no DataFusion dependency).
band_group = root["utm18n_tile_a/data"]
overviews = read_multiscales(band_group)
base_spatial = read_spatial(band_group["0"])
native_res = abs(base_spatial.transform[0])
for _ov in overviews:
pass
# At 10m target, no overview selected (use base)
sel_10 = select_overview(overviews, target_res=10.0, native_res=native_res)
# At 20m target, overview 1 is selected
sel_20 = select_overview(overviews, target_res=20.0, native_res=native_res)
4. Merge with DataFusion¶
With datafusion=True, merge() does not need a pre-built ScanIndex.
Instead, each target chunk issues a DataFusion SQL query against the /meta
group to find intersecting sources by bbox.
The store must be obstore-compatible (e.g. LocalStore, S3Store) when using
the DataFusion path. For the convention path, any zarr-compatible store works
(zarr Store, Icechunk Session, or a string path).
The sortby parameter controls the compositing order — sources returned earlier
take priority when filling NaN pixels.
# No scan_store() call needed!
result_arr, result_spatial, result_proj = merge(
store=ob_store, # obstore LocalStore for DataFusion queries
crs="EPSG:32618",
bbox=(500000.0, 5990000.0, 508000.0, 6000000.0),
resolution=10.0,
chunk_size=(256, 256),
bands="data", # navigate into data/ band group
datafusion=True, # use DataFusion for source discovery
sortby="id", # order sources by id
)
3. Lazy merge¶
# Requires zarr-datafusion-search and obstore
result_arr, result_spatial, result_proj, time_coords = merge(
store=ob_store,
crs="EPSG:32618",
bbox=(500000.0, 5990000.0, 508000.0, 6000000.0),
resolution=10.0,
chunk_size=(256, 256),
bands="data",
datafusion=True,
sortby="id",
)
4. Open the lazy Cubed array with xarray¶
da = xr.DataArray(result_arr, dims=["y", "x"])
# Build georeferenced coordinates using rasterix
affine = Affine(*result_spatial.transform)
raster_idx = RasterIndex.from_transform(
affine=affine,
width=da.sizes["x"],
height=da.sizes["y"],
x_dim="x",
y_dim="y",
crs=result_proj.code,
)
coords = xr.Coordinates.from_xindex(raster_idx)
da = da.assign_coords(coords)
da = da.proj.assign_crs(spatial_ref=result_proj.code, allow_override=True)
da
<xarray.DataArray 'array-003' (y: 1000, x: 800)> Size: 3MB
cubed.Array<array-003, shape=(1000, 800), dtype=float32, chunks=((256, 256, 256, 232), (256, 256, 256, 32))>
Coordinates:
* y (y) float64 8kB 6e+06 6e+06 6e+06 ... 5.99e+06 5.99e+06
* x (x) float64 6kB 5e+05 5e+05 5e+05 ... 5.08e+05 5.08e+05
* spatial_ref int64 8B 0
Indexes:
┌ x RasterIndex (crs=EPSG:32618)
└ y
spatial_ref CRSIndex (crs=EPSG:32618)5. Compute and visualize¶
Compute the result and plot it.
da = da.compute()
da
/Users/seanharkins/projects/lazymerge/lazymerge/sources.py:183: RuntimeWarning: Successfully reconstructed a store defined in another Python module. Connection pooling will not be shared across store instances. zarr_table = await ZarrTable.from_obstore(store, "/meta")
<xarray.DataArray 'array-003' (y: 1000, x: 800)> Size: 3MB
array([[300., 300., 300., ..., 400., 400., 400.],
[300., 300., 300., ..., 400., 400., 400.],
[300., 300., 300., ..., 400., 400., 400.],
...,
[100., 100., 100., ..., 200., 200., 200.],
[100., 100., 100., ..., 200., 200., nan],
[100., 100., 100., ..., 200., 200., nan]],
shape=(1000, 800), dtype=float32)
Coordinates:
* y (y) float64 8kB 6e+06 6e+06 6e+06 ... 5.99e+06 5.99e+06
* x (x) float64 6kB 5e+05 5e+05 5e+05 ... 5.08e+05 5.08e+05
* spatial_ref int64 8B 0
Indexes:
┌ x RasterIndex (crs=EPSG:32618)
└ y
spatial_ref CRSIndex (crs=EPSG:32618)da.plot(figsize=(12, 6), cmap="viridis", add_colorbar=True);
6. Overview selection at 20m¶
Merge at 20m to trigger the 2x overview. merge() automatically
selects the coarsest overview whose resolution is still <= the target.
result_20m, result_spatial_20m, result_proj_20m, time_coords = merge(
store=ob_store,
crs="EPSG:32618",
bbox=(500000.0, 5990000.0, 508000.0, 6000000.0),
resolution=20.0,
chunk_size=(128, 128),
bands="data",
datafusion=True,
sortby="id",
)
affine_20m = Affine(*result_spatial_20m.transform)
da_20m = xr.DataArray(result_20m, dims=["y", "x"])
raster_idx_20m = RasterIndex.from_transform(
affine=affine_20m,
width=da_20m.sizes["x"],
height=da_20m.sizes["y"],
x_dim="x",
y_dim="y",
crs=result_proj_20m.code,
)
coords_20m = xr.Coordinates.from_xindex(raster_idx_20m)
da_20m = da_20m.assign_coords(coords_20m)
da_20m = da_20m.proj.assign_crs(spatial_ref=result_proj_20m.code, allow_override=True)
da_20m
<xarray.DataArray 'array-009' (y: 500, x: 400)> Size: 800kB
cubed.Array<array-009, shape=(500, 400), dtype=float32, chunks=((128, 128, 128, 116), (128, 128, 128, 16))>
Coordinates:
* y (y) float64 4kB 6e+06 6e+06 6e+06 ... 5.99e+06 5.99e+06
* x (x) float64 3kB 5e+05 5e+05 5e+05 ... 5.08e+05 5.08e+05
* spatial_ref int64 8B 0
Indexes:
┌ x RasterIndex (crs=EPSG:32618)
└ y
spatial_ref CRSIndex (crs=EPSG:32618)import matplotlib.pyplot as plt
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 5))
da.plot(ax=ax1, cmap="viridis", vmin=100, vmax=400, add_colorbar=False)
ax1.set_title(f"10m — base ({da.sizes['y']}x{da.sizes['x']})")
ax1.set_aspect("equal")
da_20m.plot(ax=ax2, cmap="viridis", vmin=100, vmax=400, add_colorbar=False)
ax2.set_title(f"20m — overview ({da_20m.sizes['y']}x{da_20m.sizes['x']})")
ax2.set_aspect("equal")
fig.suptitle("Overview selection: 10m uses base, 20m uses 2x overview", fontsize=14)
fig.tight_layout();
/Users/seanharkins/projects/lazymerge/lazymerge/sources.py:183: RuntimeWarning: Successfully reconstructed a store defined in another Python module. Connection pooling will not be shared across store instances. zarr_table = await ZarrTable.from_obstore(store, "/meta")