the Ability To Duplicate
This is a proof-of-concept that brings together a few ideas:
This documentation was generated directly from the source using pycco. The code is hosted at https://github.com/developmentseed/atd.
from __future__ import annotations
import asyncio
import datetime
import urllib.parse
from asyncio import Queue, TaskGroup
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, Literal, TypedDict
import click
import humanize
obstore is a modern Python library that uses Rust under the hood for fast cross-cloud operations: https://developmentseed.org/obstore/
import obstore
import obstore.store
If you know more about your assets, you might create more advanced STAC item generation. For this general case, we use rio-stac.
import rio_stac
stacrs uses the same underlying Rust library as obstore for cross-cloud operations, and can read and write stac-geoparquet.
import stacrs
import tqdm
from multiformats import multihash
from pystac import Asset, Item
from rasterio import MemoryFile
if TYPE_CHECKING:
ObjectStore
and ObjectMeta
aren’t actual Python classes, they’re just
a type hints. We’re working on making the documentation more clear on this
point: https://github.com/developmentseed/obstore/issues/277
from obstore import ObjectMeta
from obstore.store import ObjectStore
This is a simple dataclass to represent a geospatial file that we will copy to our destination. More advanced implementations might have multiple files, e.g. in the case of a “granule”.
@dataclass(frozen=True)
class SourceFile:
href: str
path: str
extension: str | None
id: str | None
size: int
Creates a source file from an entry in a obstore list
operation.
@classmethod
def from_entry(cls: type[SourceFile], source: str, entry: ObjectMeta) -> SourceFile:
path = entry["path"]
if "." in path:
id, extension = path.rsplit(".", 1)
else:
id = None
extension = None
href = source.rstrip("/") + "/" + path
return SourceFile(
href=href, path=path, extension=extension, id=id, size=entry["size"]
)
You could expand this to other file extensions, or do other checks.
def should_be_copied(self) -> bool:
return self.extension is not None and self.extension in ("tif", "tiff")
A message, for progress reporting.
class Message(TypedDict):
We don’t use the path in this progress implementation, but it seems like a good thing to include, just in case.
path: str
We don’t know the size of our stac-geoparquet file before we create it, but we know the size of the rest of the files we touch thanks to obstore.
size: int | None
state: Literal["listed"] | Literal["got"] | Literal["created"] | Literal["put"]
A more advanced CLI would provide options for customizing the behavior of the operations. For now, we keep it simple.
@click.command
@click.argument("source")
@click.argument("destination")
def cli(source: str, destination: str) -> None:
timeout = datetime.timedelta(minutes=10)
source = to_url(source)
source_store = obstore.store.from_url(url=source, timeout=timeout) # type: ignore
destination = to_url(destination)
destination_store = obstore.store.from_url(destination)
messages: Queue[Message | None] = Queue()
There are plugins for click that simplify the ergonomics of async usage, but this internal function is good enough for our simple case.
async def run() -> None:
progress_task = asyncio.create_task(progress_worker(messages))
tasks = []
size = 0
async with TaskGroup() as task_group:
for source_file in get_source_files(source, source_store):
size += source_file.size
tasks.append(
task_group.create_task(
copy(
source_file,
source_store,
destination,
destination_store,
messages,
)
)
)
items = [task.result().to_dict() for task in tasks]
The stac-geoparquet file is organized in a way that allows for easy search and discovery. stacrs uses DuckDB under-the-hood to enable STAC API queries:
items = await stacrs.search(
"s3://my-bucket/items.parquet",
intersects=...
)
You can also use stacrs’s command-line interface to serve the items in a simple STAC API server:
$ stacrs serve s3://my-bucket/items.parquet
You could then browse the items with stac-browser. See the README for a complete walkthrough.
geoparquet_path = destination.rstrip("/") + "/" + "items.parquet"
await stacrs.write(geoparquet_path, items)
await messages.put({"state": "put", "path": geoparquet_path, "size": None})
We use None to indicate that our message printing task should exit gracefully.
await messages.put(None)
put = await progress_task
put.write(
f"{humanize.naturalsize(size)} written to {destination}\nItems available at "
+ geoparquet_path
)
asyncio.run(run())
In this case, we search for GeoTIFF files. If you know more about your domain, you might expand the type of file you’re looking for, or have some other discovery mechanism.
def get_source_files(source: str, store: ObjectStore) -> Iterator[SourceFile]:
for page in obstore.list(store):
for entry in page:
source_file = SourceFile.from_entry(source, entry)
if source_file.should_be_copied():
yield source_file
obstore doesn’t like relative paths, so we convert them to file://
.
def to_url(s: str) -> str:
if urllib.parse.urlparse(s).scheme:
return s
else:
This will fail for local filesystems if the path doesn’t exist. If you want to create the destination directory, add an option here to create the directory.
return "file://" + str(Path(s).absolute())
Copying is a three step process:
Because we know the size of the source object from original obstore list operation, we could implement a memory cap on the amount of data we have locally. We haven’t done that here.
async def copy(
source_file: SourceFile,
source_store: ObjectStore,
destination: str,
destination_store: ObjectStore,
messages: Queue[Message | None],
) -> Item:
await messages.put(
{"path": source_file.path, "size": source_file.size, "state": "listed"}
)
A real-world implementation would have some sort of throttling to ensure we don’t request too many things at once.
response = await obstore.get_async(source_store, source_file.path)
We use async whenever we can to allow the scheduler to run other tasks while we’re doing IO. Here, we’re naïvely copying all the bytes to memory. For large files, this might be bad, and you might want to use a temporary file instead.
data = bytes(await response.bytes_async())
await messages.put(
{"path": source_file.path, "size": source_file.size, "state": "got"}
)
destination_href = destination.rstrip("/") + "/" + source_file.path
item = create_item(data, source_file, destination_href)
await messages.put(
{"path": source_file.path, "size": source_file.size, "state": "created"}
)
await obstore.put_async(destination_store, source_file.path, data)
await messages.put(
{"path": source_file.path, "size": source_file.size, "state": "put"}
)
return item
Creates a STAC item. This example is simple, but you could do a lot more work here, e.g. what is done in stactools-packages.
def create_item(data: bytes, source_file: SourceFile, destination_href: str) -> Item:
with MemoryFile(data) as memory_file:
with memory_file.open() as dataset:
item = rio_stac.create_stac_item(
dataset,
asset_href=destination_href,
asset_roles=["data"],
id=source_file.id,
We don’t add raster information because it requires scanning the whole file again.
with_eo=True,
with_proj=True,
with_raster=False,
)
checksum = multihash.digest(data, "sha2-256").hex()
item.ext.add("file")
asset = item.assets["asset"]
asset.ext.file.checksum = checksum
original = Asset(href=source_file.href)
original.ext.file.checksum = checksum
item.assets = {"data": asset, "original": original}
return item
A worker that manages three progress bars, and returns the last one when done.
async def progress_worker(queue: Queue[Message | None]) -> tqdm.tqdm:
got = tqdm.tqdm(
total=0, position=0, desc="Got", leave=False, unit="B", unit_scale=True
)
created = tqdm.tqdm(total=0, position=1, desc="Created", leave=False)
There’s one more file to put, the stac-geoparquet file. Since we don’t know its size beforehand, we don’t track bytes for the put progress bar.
put = tqdm.tqdm(total=1, position=2, desc="Put", leave=False)
while True:
message = await queue.get()
if message is None:
queue.task_done()
return put
elif message["state"] == "listed":
got.total += message["size"] or 0
created.total += 1
put.total += 1
elif message["state"] == "got":
got.update(message["size"])
elif message["state"] == "created":
created.update()
elif message["state"] == "put":
put.update()
else:
raise Exception("unreachable")
if __name__ == "__main__":
cli()