Tiny data chunks¶
Summary¶
Using tiny data chunks leads to poor performance.
Details¶
There are three primary reasons why you should avoid using too small of chunk sizes in your datacubes:
- Inefficient compression since most compression algorithms leverage correlations within a chunk.
- Inefficient data loading when querying large subsets of the data cube due to numerous GET requests with high latency. The excessive GET requests also increases costs.
- Inefficient encoding/decoding due to the number of chunks greatly exceeding available parallelism.
- Issues with parallel computing frameworks like Dask that have a 1:1 mapping between tasks and chunks.
Please note that issue of too many GET requests can be mitigated by using Zarr V3 sharding or a cloud-native file format that allows storing multiple chunks in a single file.
Demonstration¶
import datacube_benchmark
import zarr
import xarray as xr
import pandas as pd
import hvplot.pandas # noqa
from pint import Quantity
from typing import Any
Set constants to use when comparing datacubes
config = datacube_benchmark.Config
config.target_array_size = "1 GB"
# config.credential_provider = AzureCredentialProvider(
# credential=DefaultAzureCredential()
# )
config.create_data = False
config.warmup_samples = 1
Set the concurrency to use for the Zarr Python library
zarr.config.set({"async.concurrency": config.zarr_concurrency})
<donfig.config_obj.ConfigSet at 0x10a6b2350>
Demonstrating storage inefficiencies of too small of chunks¶
Create (or reuse) a blosc compressed array with 25 KB chunks
url_for_tiny_chunks = "https://datacubeguide.blob.core.windows.net/performance-testing/performance-testing/tiny-chunks.zarr"
tiny_chunk_arr = datacube_benchmark.create_or_open_zarr_array(
url_for_tiny_chunks, target_chunk_size="25 kilobyte", config=config
)
Create (or reuse) a blosc compressed array with 25 MB chunks
url_for_reg_chunks = "https://datacubeguide.blob.core.windows.net/performance-testing/performance-testing/reg-chunks.zarr"
reg_chunk_arr = datacube_benchmark.create_or_open_zarr_array(
url_for_reg_chunks, target_chunk_size="25 megabyte", config=config
)
Compare the storage size of the two arrays.
def storage_statistics(arr: zarr.Array) -> dict[str:Any]:
storage_size = Quantity(
datacube_benchmark.utils.array_storage_size(arr), "bytes"
).to("GB")
compression_ratio = arr.nbytes / storage_size.to("bytes").magnitude
return {"storage_size": storage_size, "compression_ratio": compression_ratio}
reg_chunk_storage_stats = storage_statistics(reg_chunk_arr)
tiny_chunks_storage_stats = storage_statistics(tiny_chunk_arr)
print("Storage size of a 10 GB array in object storage:")
print(f"\t25 KB chunks: {tiny_chunks_storage_stats['storage_size']:.2f}")
print(f"\t25 MB chunks: {reg_chunk_storage_stats['storage_size']:.2f}")
print("Compression ratio of a 10 GB array in object storage:")
print(f"\t25 KB chunks: {tiny_chunks_storage_stats['compression_ratio']:.1f}")
print(f"\t25 MB chunks: {reg_chunk_storage_stats['compression_ratio']:.1f}")
Storage size of a 10 GB array in object storage: 25 KB chunks: 0.05 gigabyte 25 MB chunks: 0.02 gigabyte Compression ratio of a 10 GB array in object storage: 25 KB chunks: 20.8 25 MB chunks: 53.8
Notice the much better compression ratio for a datacube with 25 MB chunks relative to a datacube with 25 KB chunks.
Demonstrating performance inefficiencies of too small of chunks¶
Test the time required to load a random point, a time series, or a spatial slice for the array.
tiny_chunks_results = datacube_benchmark.benchmark_access_patterns(
tiny_chunk_arr,
num_samples=config.num_samples,
warmup_samples=config.warmup_samples,
).reset_index(drop=True)
reg_chunks_results = datacube_benchmark.benchmark_access_patterns(
reg_chunk_arr,
num_samples=config.num_samples,
warmup_samples=config.warmup_samples,
).reset_index(drop=True)
df = pd.concat([tiny_chunks_results, reg_chunks_results])
df["access_pattern"] = df["access_pattern"].replace(
{
"point": "Random point",
"time_series": "Time series",
"spatial_slice": "Spatial slice",
"full": "Full scan",
}
)
df["mean_time"] = df.apply(lambda row: float(row["mean_time"].magnitude), axis=1)
df["chunk_size"] = df.apply(
lambda row: f"{row['chunk_size'].to("MB").magnitude:,.2f} (MB)", axis=1
)
df
| mean_time | median_time | std_time | min_time | max_time | total_samples | access_pattern | array_shape | chunk_shape | chunk_size | nchunks | shard_shape | array_dtype | array_size_memory | array_size_storage | array_compressors | compression_ratio | zarr_concurrency | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0.083279 | 0.08327929200004291 second | 0.0 second | 0.08327929200004291 second | 0.08327929200004291 second | 1 | Random point | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 20.76:1 | 128 |
| 1 | 0.417142 | 0.4171420409993516 second | 0.0 second | 0.4171420409993516 second | 0.4171420409993516 second | 1 | Time series | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 20.76:1 | 128 |
| 2 | 0.538106 | 0.5381062500000553 second | 0.0 second | 0.5381062500000553 second | 0.5381062500000553 second | 1 | Spatial slice | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 20.76:1 | 128 |
| 3 | 22.218361 | 22.218361375000313 second | 0.0 second | 22.218361375000313 second | 22.218361375000313 second | 1 | Full scan | (965, 360, 720) | (19, 19, 19) | 0.03 (MB) | 36822 | None | float32 | 1.000512 gigabyte | 0.048189051000000004 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 20.76:1 | 128 |
| 0 | 0.101287 | 0.10128733300007298 second | 0.0 second | 0.10128733300007298 second | 0.10128733300007298 second | 1 | Random point | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 53.80:1 | 128 |
| 1 | 0.735506 | 0.7355062080005155 second | 0.0 second | 0.7355062080005155 second | 0.7355062080005155 second | 1 | Time series | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 53.80:1 | 128 |
| 2 | 0.501272 | 0.5012718750003842 second | 0.0 second | 0.5012718750003842 second | 0.5012718750003842 second | 1 | Spatial slice | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 53.80:1 | 128 |
| 3 | 2.540260 | 2.540260417000354 second | 0.0 second | 2.540260417000354 second | 2.540260417000354 second | 1 | Full scan | (965, 360, 720) | (185, 185, 185) | 25.33 (MB) | 48 | None | float32 | 1.000512 gigabyte | 0.018597328 gigabyte | (BloscCodec(_tunable_attrs=set(), typesize=4, ... | 53.80:1 | 128 |
title = "Duration to load data for difference access patterns"
plt = df.hvplot.bar(
x="chunk_size",
y="mean_time",
by="access_pattern",
width=1000,
rot=45,
title=title,
ylabel="Duration (s)",
xlabel="Chunk Size, Query type",
)
plt
Note that while random point access is faster for datacubes with smaller chunks, the time for loading many chunks is dramatically worse. This performance impact is even more noticeable for writing data.
Demonstrating Dask task graph issue with too small of chunks¶
def xarray_array_from_zarr_array(
array: zarr.Array, config=datacube_benchmark.Config
) -> xr.Dataset:
store = array.store
ds = xr.open_zarr(store=store, zarr_format=3, consolidated=True)
da = ds[config.data_var]
return da
result_tiny_chunks = xarray_array_from_zarr_array(tiny_chunk_arr) * 3
result_tiny_chunks.data
|
||||||||||||||||
result_reg_chunks = xarray_array_from_zarr_array(reg_chunk_arr) * 3
result_reg_chunks.data
|
||||||||||||||||
Note how many chunks Dask needs to track in the first dataset. This leads to Dask struggling because the task graph becomes too large with the overhead associated with each task becoming burdensome along with lots of extra network transfer between the client and worker nodes. This can be circumvented by using different Dask chunks than the chunks in storage, but most users do not configure this and will get frustrated with Dask struggling to perform computations on the data.