Tiny data chunks¶
There are three primary reasons why you should avoid using too small of chunk sizes in your datacubes:
- Inefficient compression since most compression algorithms leverage correlations within a chunk.
- Inefficient data loading when querying large subsets of the data cube due to numerous GET requests with high latency. The excessive GET requests also increases costs.
- Inefficient encoding/decoding due to the number of chunks greatly exceeding available parallelism.
- Issues with parallel computing frameworks like Dask that have a 1:1 mapping between tasks and chunks.
Please note that issue of too many GET requests can be mitigated by using Zarr V3 sharding or a cloud-native file format that allows storing multiple chunks in a single file.
import datacube_benchmark
import zarr
import xarray as xr
import pandas as pd
import hvplot.pandas # noqa
from pint import Quantity
from typing import Any
from azure.identity import DefaultAzureCredential
from obstore.auth.azure import AzureCredentialProvider
Set constants to use when comparing datacubes
config = datacube_benchmark.Config
config.target_array_size = "1 GB"
config.credential_provider = AzureCredentialProvider(
credential=DefaultAzureCredential()
)
config.create_data = False
config.warmup_samples = 1
Set the concurrency to use for the Zarr Python library
zarr.config.set({"async.concurrency": config.zarr_concurrency})
<donfig.config_obj.ConfigSet at 0x77eecb7f8550>
Demonstrating storage inefficiencies of too small of chunks¶
Create (or reuse) a blosc compressed array with 25 KB chunks
url_for_tiny_chunks = (
"https://datacubeguide.blob.core.windows.net/performance-testing/tiny-chunks.zarr"
)
tiny_chunk_arr = datacube_benchmark.create_or_open_zarr_array(
url_for_tiny_chunks, target_chunk_size="25 kilobyte", config=config
)
Create (or reuse) a blosc compressed array with 25 MB chunks
url_for_reg_chunks = (
"https://datacubeguide.blob.core.windows.net/performance-testing/reg-chunks.zarr"
)
reg_chunk_arr = datacube_benchmark.create_or_open_zarr_array(
url_for_reg_chunks, target_chunk_size="25 megabyte", config=config
)
Compare the storage size of the two arrays.
def storage_statistics(arr: zarr.Array) -> dict[str:Any]:
storage_size = Quantity(
datacube_benchmark.utils.array_storage_size(arr), "bytes"
).to("GB")
compression_ratio = arr.nbytes / storage_size.to("bytes").magnitude
return {"storage_size": storage_size, "compression_ratio": compression_ratio}
reg_chunk_storage_stats = storage_statistics(reg_chunk_arr)
tiny_chunks_storage_stats = storage_statistics(tiny_chunk_arr)
print("Storage size of a 10 GB array in object storage:")
print(f"\t25 KB chunks: {tiny_chunks_storage_stats['storage_size']:.2f}")
print(f"\t25 MB chunks: {reg_chunk_storage_stats['storage_size']:.2f}")
print("Compression ratio of a 10 GB array in object storage:")
print(f"\t25 KB chunks: {tiny_chunks_storage_stats['compression_ratio']:.1f}")
print(f"\t25 MB chunks: {reg_chunk_storage_stats['compression_ratio']:.1f}")
Storage size of a 10 GB array in object storage: 25 KB chunks: 0.05 gigabyte 25 MB chunks: 0.02 gigabyte Compression ratio of a 10 GB array in object storage: 25 KB chunks: 20.8 25 MB chunks: 53.8
Notice the much better compression ratio for a datacube with 25 MB chunks relative to a datacube with 25 KB chunks.
Demonstrating performance inefficiencies of too small of chunks¶
Test the time required to load a random point, a time series, or a spatial slice for the array.
tiny_chunks_results = datacube_benchmark.benchmark_access_patterns(
tiny_chunk_arr,
num_samples=config.num_samples,
warmup_samples=config.warmup_samples,
).reset_index(drop=True)
reg_chunks_results = datacube_benchmark.benchmark_access_patterns(
reg_chunk_arr,
num_samples=config.num_samples,
warmup_samples=config.warmup_samples,
).reset_index(drop=True)
df = pd.concat([tiny_chunks_results, reg_chunks_results])
df["access_pattern"] = df["access_pattern"].replace(
{
"point": "Random point",
"time_series": "Time series",
"spatial_slice": "Spatial slice",
"full": "Full scan",
}
)
df["mean_time"] = df.apply(lambda row: float(row["mean_time"].magnitude), axis=1)
df["chunk_size"] = df.apply(
lambda row: f"{row['chunk_size'].to("MB").magnitude:,.2f} (MB)", axis=1
)
df
mean_time | median_time | std_time | min_time | max_time | total_samples | access_pattern | array_shape | chunk_shape | chunk_size | nchunks | shard_shape | array_dtype | array_size_memory | array_size_storage | array_compressors | compression_ratio | zarr_concurrency | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.006557 | 0.006557253000210039 second | 0.0 second | 0.006557253000210039 second | 0.006557253000210039 second | 1 | Random point | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 20.76:1 | 128 |
1 | 0.036745 | 0.03674469600082375 second | 0.0 second | 0.03674469600082375 second | 0.03674469600082375 second | 1 | Time series | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 20.76:1 | 128 |
2 | 0.391419 | 0.39141855200068676 second | 0.0 second | 0.39141855200068676 second | 0.39141855200068676 second | 1 | Spatial slice | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 20.76:1 | 128 |
3 | 21.369652 | 21.369652307999786 second | 0.0 second | 21.369652307999786 second | 21.369652307999786 second | 1 | Full scan | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 20.76:1 | 128 |
0 | 0.077645 | 0.07764525400125422 second | 0.0 second | 0.07764525400125422 second | 0.07764525400125422 second | 1 | Random point | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 53.80:1 | 128 |
1 | 0.077246 | 0.07724585100004333 second | 0.0 second | 0.07724585100004333 second | 0.07724585100004333 second | 1 | Time series | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 53.80:1 | 128 |
2 | 0.047293 | 0.04729349899935187 second | 0.0 second | 0.04729349899935187 second | 0.04729349899935187 second | 1 | Spatial slice | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 53.80:1 | 128 |
3 | 0.663260 | 0.6632603899997775 second | 0.0 second | 0.6632603899997775 second | 0.6632603899997775 second | 1 | Full scan | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(typesize=4, cname=<BloscCname.zstd... | 53.80:1 | 128 |
title = "Duration to load data for difference access patterns"
plt = df.hvplot.bar(
x="chunk_size",
y="mean_time",
by="access_pattern",
width=1000,
rot=45,
title=title,
ylabel="Duration (s)",
xlabel="Chunk Size, Query type",
)
plt
Note that while random point access is faster for datacubes with smaller chunks, the time for loading many chunks is dramatically worse. This performance impact is even more noticeable for writing data.
Demonstrating Dask task graph issue with too small of chunks¶
def xarray_array_from_zarr_array(
array: zarr.Array, config=datacube_benchmark.Config
) -> xr.Dataset:
store = array.store
ds = xr.open_zarr(store=store, zarr_format=3, consolidated=True)
da = ds[config.data_var]
return da
result_tiny_chunks = xarray_array_from_zarr_array(tiny_chunk_arr) * 3
result_tiny_chunks.data
|
result_reg_chunks = xarray_array_from_zarr_array(reg_chunk_arr) * 3
result_reg_chunks.data
|
Note how many chunks Dask needs to track in the first dataset. This leads to Dask struggling because the task graph becomes too large with the overhead associated with each task becoming burdensome along with lots of extra network transfer between the client and worker nodes. This can be circumvented by using different Dask chunks than the chunks in storage, but most users do not configure this and will get frustrated with Dask struggling to perform computations on the data.