Working With COG - At Scale¶
For this demo we will use the new Ozone Monitoring Instrument (OMI) / Aura NO2 Tropospheric Column Density
dataset hosted on AWS PDS: https://registry.opendata.aws/omi-no2-nasa/
Requirement: AWS credentials
Requirements¶
- AWS credentials
- rasterio
- folium
- httpx
- tqdm
!pip install rasterio boto3 folium httpx tqdm
In [ ]:
Copied!
# Uncomment this line if you need to install the dependencies
# !pip install rasterio boto3 folium requests tqdm
# Uncomment this line if you need to install the dependencies
# !pip install rasterio boto3 folium requests tqdm
In [ ]:
Copied!
import os
import datetime
import json
import urllib.parse
from io import BytesIO
from functools import partial
from concurrent import futures
import httpx
import numpy
from boto3.session import Session as boto3_session
from rasterio.plot import reshape_as_image
from rasterio.features import bounds as featureBounds
from tqdm.notebook import tqdm
from folium import Map, TileLayer, GeoJson
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import os
import datetime
import json
import urllib.parse
from io import BytesIO
from functools import partial
from concurrent import futures
import httpx
import numpy
from boto3.session import Session as boto3_session
from rasterio.plot import reshape_as_image
from rasterio.features import bounds as featureBounds
from tqdm.notebook import tqdm
from folium import Map, TileLayer, GeoJson
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
In [ ]:
Copied!
titiler_endpoint = "https://titiler.xyz" # Developmentseed Demo endpoint. Please be kind.
titiler_endpoint = "https://titiler.xyz" # Developmentseed Demo endpoint. Please be kind.
Define your area of interest (AOI)¶
In [ ]:
Copied!
# use geojson.io
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-74.1796875,
45.18978009667531
],
[
-73.092041015625,
45.18978009667531
],
[
-73.092041015625,
46.00459325574482
],
[
-74.1796875,
46.00459325574482
],
[
-74.1796875,
45.18978009667531
]
]
]
}
}
]
}
bounds = featureBounds(geojson)
# use geojson.io
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-74.1796875,
45.18978009667531
],
[
-73.092041015625,
45.18978009667531
],
[
-73.092041015625,
46.00459325574482
],
[
-74.1796875,
46.00459325574482
],
[
-74.1796875,
45.18978009667531
]
]
]
}
}
]
}
bounds = featureBounds(geojson)
In [ ]:
Copied!
m = Map(
tiles="OpenStreetMap",
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
GeoJson(geojson).add_to(m)
m
m = Map(
tiles="OpenStreetMap",
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
GeoJson(geojson).add_to(m)
m
List available files on AWS S3¶
In [ ]:
Copied!
# To Be able to run this notebook you'll need to have AWS credential available in the environment
# import os
# os.environ["AWS_ACCESS_KEY_ID"] = "YOUR AWS ACCESS ID HERE"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "YOUR AWS ACCESS KEY HERE"
# To Be able to run this notebook you'll need to have AWS credential available in the environment
# import os
# os.environ["AWS_ACCESS_KEY_ID"] = "YOUR AWS ACCESS ID HERE"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "YOUR AWS ACCESS KEY HERE"
In [ ]:
Copied!
session = boto3_session(region_name="us-west-2")
client = session.client("s3")
bucket = "omi-no2-nasa" #https://registry.opendata.aws/omi-no2-nasa/
def list_objects(bucket, prefix):
"""AWS s3 list objects."""
paginator = client.get_paginator('list_objects_v2')
files = []
for subset in paginator.paginate(Bucket=bucket, Prefix=prefix):
files.extend(subset.get("Contents", []))
return files
list_files = list_objects(bucket, "OMI-Aura_L3")
print("Archive Size")
files = [r["Key"] for r in list_files]
print(f"Found {len(files)} OMI-NO2 files")
size = sum([r["Size"]/1000000. for r in list_files])
print(f"Size of the archive: {size} Mo ({size / 1000} Go)")
session = boto3_session(region_name="us-west-2")
client = session.client("s3")
bucket = "omi-no2-nasa" #https://registry.opendata.aws/omi-no2-nasa/
def list_objects(bucket, prefix):
"""AWS s3 list objects."""
paginator = client.get_paginator('list_objects_v2')
files = []
for subset in paginator.paginate(Bucket=bucket, Prefix=prefix):
files.extend(subset.get("Contents", []))
return files
list_files = list_objects(bucket, "OMI-Aura_L3")
print("Archive Size")
files = [r["Key"] for r in list_files]
print(f"Found {len(files)} OMI-NO2 files")
size = sum([r["Size"]/1000000. for r in list_files])
print(f"Size of the archive: {size} Mo ({size / 1000} Go)")
In [ ]:
Copied!
print(files[0:10])
print(files[0:10])
file name structure is "OMI-Aura_L3-OMNO2d_{YEAR}m{MONTH:02}{DAY:02}..."
We can then easily filter e.g
In [ ]:
Copied!
files_2019 = list(filter(lambda x: x.split("_")[2][0:4] == "2019", files))
print(len(files_2019))
files_2019 = list(filter(lambda x: x.split("_")[2][0:4] == "2019", files))
print(len(files_2019))
In [ ]:
Copied!
files_Oct5 = list(filter(lambda x: (x.split("_")[2][5:7] == "10") & (x.split("_")[2][7:9] == "05"), files))
print(len(files_Oct5))
print(files_Oct5)
files_Oct5 = list(filter(lambda x: (x.split("_")[2][5:7] == "10") & (x.split("_")[2][7:9] == "05"), files))
print(len(files_Oct5))
print(files_Oct5)
DATA Endpoint¶
{endpoint}/cog/tiles/{tileMatrixSetId}/{z}/{x}/{y}.{format}?url={cog}&{otherquery params}
{endpoint}/cog/bbox/{minx},{miny},{maxx},{maxy}.{format}?url={cog}&{otherquery params}
{endpoint}/cog/point/{minx},{miny}?url={cog}&{otherquery params}
Visualize One Item¶
In [ ]:
Copied!
def _url(src_path):
return f"s3://omi-no2-nasa/{src_path}"
def _url(src_path):
return f"s3://omi-no2-nasa/{src_path}"
In [ ]:
Copied!
# Fetch File Metadata to get min/max rescaling values (because the file is stored as float32)
r = httpx.get(
f"{titiler_endpoint}/cog/statistics",
params = {
"url": _url(files[0])
}
).json()
print(json.dumps(r, indent=4))
# Fetch File Metadata to get min/max rescaling values (because the file is stored as float32)
r = httpx.get(
f"{titiler_endpoint}/cog/statistics",
params = {
"url": _url(files[0])
}
).json()
print(json.dumps(r, indent=4))
In [ ]:
Copied!
r = httpx.get(
f"{titiler_endpoint}/cog/WebMercatorQuad/tilejson.json",
params = {
"url": _url(files[2]),
"rescale": "0,3000000000000000",
"colormap_name": "viridis",
}
).json()
m = Map(
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
TileLayer(
tiles=r["tiles"][0],
opacity=1,
attr="NASA"
).add_to(m)
GeoJson(geojson, style_function=lambda feature: {"fill": False, "color": "red"}).add_to(m)
m
r = httpx.get(
f"{titiler_endpoint}/cog/WebMercatorQuad/tilejson.json",
params = {
"url": _url(files[2]),
"rescale": "0,3000000000000000",
"colormap_name": "viridis",
}
).json()
m = Map(
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
TileLayer(
tiles=r["tiles"][0],
opacity=1,
attr="NASA"
).add_to(m)
GeoJson(geojson, style_function=lambda feature: {"fill": False, "color": "red"}).add_to(m)
m
Create time series of NO2¶
In [ ]:
Copied!
def _stats(data, mask):
arr = numpy.ma.array(data)
arr.mask = mask == 0
return arr.min().item(), arr.max().item(), arr.mean().item(), arr.std().item()
xmin, ymin, xmax, ymax = bounds
def fetch_bbox(file):
url = f"{titiler_endpoint}/cog/bbox/{xmin},{ymin},{xmax},{ymax}.npy"
params = {
"url": _url(file),
"bidx": "1",
"max_size": 128,
}
r = httpx.get(url, params=params)
data = numpy.load(BytesIO(r.content))
s = _stats(data[0:-1], data[-1])
return (
_stats(data[0:-1], data[-1]),
datetime.datetime.strptime(file.split("_")[2].replace("m", ""), "%Y%m%d"),
)
# small tool to filter invalid response from the API
def _filter_futures(tasks):
for future in tasks:
try:
yield future.result()
except Exception:
pass
def _stats(data, mask):
arr = numpy.ma.array(data)
arr.mask = mask == 0
return arr.min().item(), arr.max().item(), arr.mean().item(), arr.std().item()
xmin, ymin, xmax, ymax = bounds
def fetch_bbox(file):
url = f"{titiler_endpoint}/cog/bbox/{xmin},{ymin},{xmax},{ymax}.npy"
params = {
"url": _url(file),
"bidx": "1",
"max_size": 128,
}
r = httpx.get(url, params=params)
data = numpy.load(BytesIO(r.content))
s = _stats(data[0:-1], data[-1])
return (
_stats(data[0:-1], data[-1]),
datetime.datetime.strptime(file.split("_")[2].replace("m", ""), "%Y%m%d"),
)
# small tool to filter invalid response from the API
def _filter_futures(tasks):
for future in tasks:
try:
yield future.result()
except Exception:
pass
Get NO2 Max for day 15th of each month¶
In [ ]:
Copied!
# Every 15 of each month for all the years
files_15 = list(filter(lambda x: (x.split("_")[2][7:9] == "15"), files))
# Every 15 of each month for all the years
files_15 = list(filter(lambda x: (x.split("_")[2][7:9] == "15"), files))
In [ ]:
Copied!
with futures.ThreadPoolExecutor(max_workers=10) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files_15
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
max_values = [
v[1]
for v in values
]
fig, ax1 = plt.subplots(dpi=300)
fig.autofmt_xdate()
ax1.plot(dates, max_values, label="No2")
ax1.xaxis.set_major_locator(mdates.YearLocator(1,7))
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
with futures.ThreadPoolExecutor(max_workers=10) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files_15
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
max_values = [
v[1]
for v in values
]
fig, ax1 = plt.subplots(dpi=300)
fig.autofmt_xdate()
ax1.plot(dates, max_values, label="No2")
ax1.xaxis.set_major_locator(mdates.YearLocator(1,7))
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
Same but for all the days for the last 16 years¶
In [ ]:
Copied!
with futures.ThreadPoolExecutor(max_workers=50) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
max_values = [
v[1]
for v in values
]
fig, ax1 = plt.subplots(dpi=150)
fig.autofmt_xdate()
ax1.plot(dates, max_values, label="No2")
ax1.xaxis.set_major_locator(mdates.YearLocator())
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
with futures.ThreadPoolExecutor(max_workers=50) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
max_values = [
v[1]
for v in values
]
fig, ax1 = plt.subplots(dpi=150)
fig.autofmt_xdate()
ax1.plot(dates, max_values, label="No2")
ax1.xaxis.set_major_locator(mdates.YearLocator())
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
In [ ]:
Copied!