Working With COG - At Scale¶
For this demo we will use the new Ozone Monitoring Instrument (OMI) / Aura NO2 Tropospheric Column Density
dataset hosted on AWS PDS: https://registry.opendata.aws/omi-no2-nasa/
Requirement: AWS credentials
Requirements¶
- AWS credentials
- rasterio
- folium
- httpx
- tqdm
!pip install rasterio boto3 folium httpx tqdm
In [ ]:
Copied!
# Uncomment this line if you need to install the dependencies
# !pip install rasterio boto3 folium requests tqdm
# Uncomment this line if you need to install the dependencies
# !pip install rasterio boto3 folium requests tqdm
In [ ]:
Copied!
import os
import json
import urllib.parse
from io import BytesIO
from functools import partial
from concurrent import futures
import httpx
from boto3.session import Session as boto3_session
from rasterio.plot import reshape_as_image
from rasterio.features import bounds as featureBounds
from tqdm.notebook import tqdm
from folium import Map, TileLayer
%pylab inline
import os
import json
import urllib.parse
from io import BytesIO
from functools import partial
from concurrent import futures
import httpx
from boto3.session import Session as boto3_session
from rasterio.plot import reshape_as_image
from rasterio.features import bounds as featureBounds
from tqdm.notebook import tqdm
from folium import Map, TileLayer
%pylab inline
In [ ]:
Copied!
titiler_endpoint = "https://titiler.xyz" # Developmentseed Demo endpoint. Please be kind.
titiler_endpoint = "https://titiler.xyz" # Developmentseed Demo endpoint. Please be kind.
Define your area of interest (AOI)¶
In [ ]:
Copied!
# use geojson.io
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-74.1796875,
45.18978009667531
],
[
-73.092041015625,
45.18978009667531
],
[
-73.092041015625,
46.00459325574482
],
[
-74.1796875,
46.00459325574482
],
[
-74.1796875,
45.18978009667531
]
]
]
}
}
]
}
bounds = featureBounds(geojson)
# use geojson.io
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-74.1796875,
45.18978009667531
],
[
-73.092041015625,
45.18978009667531
],
[
-73.092041015625,
46.00459325574482
],
[
-74.1796875,
46.00459325574482
],
[
-74.1796875,
45.18978009667531
]
]
]
}
}
]
}
bounds = featureBounds(geojson)
In [ ]:
Copied!
Map(
tiles="OpenStreetMap",
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
Map(
tiles="OpenStreetMap",
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
List available files on AWS S3¶
In [ ]:
Copied!
# To Be able to run this notebook you'll need to have AWS credential available in the environment
# import os
# os.environ["AWS_ACCESS_KEY_ID"] = "YOUR AWS ACCESS ID HERE"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "YOUR AWS ACCESS KEY HERE"
# To Be able to run this notebook you'll need to have AWS credential available in the environment
# import os
# os.environ["AWS_ACCESS_KEY_ID"] = "YOUR AWS ACCESS ID HERE"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "YOUR AWS ACCESS KEY HERE"
In [ ]:
Copied!
session = boto3_session(region_name="us-west-2")
client = session.client("s3")
bucket = "omi-no2-nasa" #https://registry.opendata.aws/omi-no2-nasa/
def list_objects(bucket, prefix):
"""AWS s3 list objects."""
paginator = client.get_paginator('list_objects_v2')
files = []
for subset in paginator.paginate(Bucket=bucket, Prefix=prefix):
files.extend(subset.get("Contents", []))
return [r["Key"] for r in files]
files = list_objects(bucket, "OMI-Aura_L3")
print(f"Found : {len(files)}")
session = boto3_session(region_name="us-west-2")
client = session.client("s3")
bucket = "omi-no2-nasa" #https://registry.opendata.aws/omi-no2-nasa/
def list_objects(bucket, prefix):
"""AWS s3 list objects."""
paginator = client.get_paginator('list_objects_v2')
files = []
for subset in paginator.paginate(Bucket=bucket, Prefix=prefix):
files.extend(subset.get("Contents", []))
return [r["Key"] for r in files]
files = list_objects(bucket, "OMI-Aura_L3")
print(f"Found : {len(files)}")
In [ ]:
Copied!
print(files[0:10])
print(files[0:10])
file name structure is "OMI-Aura_L3-OMNO2d_{YEAR}m{MONTH:02}{DAY:02}..."
We can then easily filter e.g
In [ ]:
Copied!
files_2019 = list(filter(lambda x: x.split("_")[2][0:4] == "2019", files))
print(len(files_2019))
files_2019 = list(filter(lambda x: x.split("_")[2][0:4] == "2019", files))
print(len(files_2019))
In [ ]:
Copied!
files_Oct5 = list(filter(lambda x: (x.split("_")[2][5:7] == "10") & (x.split("_")[2][7:9] == "05"), files))
print(len(files_Oct5))
print(files_Oct5)
files_Oct5 = list(filter(lambda x: (x.split("_")[2][5:7] == "10") & (x.split("_")[2][7:9] == "05"), files))
print(len(files_Oct5))
print(files_Oct5)
DATA Endpoint¶
{endpoint}/cog/tiles/{z}/{x}/{y}.{format}?url={cog}&{otherquery params}
{endpoint}/cog/crop/{minx},{miny},{maxx},{maxy}.{format}?url={cog}&{otherquery params}
{endpoint}/cog/point/{minx},{miny}?url={cog}&{otherquery params}
Visualize One Item¶
In [ ]:
Copied!
def _url(src_path):
return f"s3://omi-no2-nasa/{src_path}"
def _url(src_path):
return f"s3://omi-no2-nasa/{src_path}"
In [ ]:
Copied!
# Fetch File Metadata to get min/max rescaling values (because the file is stored as float32)
r = httpx.get(
f"{titiler_endpoint}/cog/statistics",
params = {
"url": _url(files[0])
}
).json()
print(json.dumps(r, indent=4))
# Fetch File Metadata to get min/max rescaling values (because the file is stored as float32)
r = httpx.get(
f"{titiler_endpoint}/cog/statistics",
params = {
"url": _url(files[0])
}
).json()
print(json.dumps(r, indent=4))
In [ ]:
Copied!
r = httpx.get(
f"{titiler_endpoint}/cog/tilejson.json",
params = {
"url": _url(files[2]),
"rescale": "0,3000000000000000",
"colormap_name": "viridis",
}
).json()
m = Map(
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
aod_layer = TileLayer(
tiles=r["tiles"][0],
opacity=1,
attr="NASA"
)
aod_layer.add_to(m)
m
r = httpx.get(
f"{titiler_endpoint}/cog/tilejson.json",
params = {
"url": _url(files[2]),
"rescale": "0,3000000000000000",
"colormap_name": "viridis",
}
).json()
m = Map(
location=((bounds[1] + bounds[3]) / 2,(bounds[0] + bounds[2]) / 2),
zoom_start=6
)
aod_layer = TileLayer(
tiles=r["tiles"][0],
opacity=1,
attr="NASA"
)
aod_layer.add_to(m)
m
Create time series of NO2¶
In [ ]:
Copied!
def _stats(data, mask):
arr = numpy.ma.array(data)
arr.mask = mask == 0
return arr.min().item(), arr.max().item(), arr.mean().item(), arr.std().item()
xmin, ymin, xmax, ymax = bounds
def fetch_bbox(file):
url = f"{titiler_endpoint}/cog/crop/{xmin},{ymin},{xmax},{ymax}.npy"
params = {
"url": _url(file),
"bidx": "1",
"max_size": 128,
}
r = httpx.get(url, params=params)
data = numpy.load(BytesIO(r.content))
s = _stats(data[0:-1], data[-1])
return s[1], file.split("_")[2]
# small tool to filter invalid response from the API
def _filter_futures(tasks):
for future in tasks:
try:
yield future.result()
except Exception:
pass
def _stats(data, mask):
arr = numpy.ma.array(data)
arr.mask = mask == 0
return arr.min().item(), arr.max().item(), arr.mean().item(), arr.std().item()
xmin, ymin, xmax, ymax = bounds
def fetch_bbox(file):
url = f"{titiler_endpoint}/cog/crop/{xmin},{ymin},{xmax},{ymax}.npy"
params = {
"url": _url(file),
"bidx": "1",
"max_size": 128,
}
r = httpx.get(url, params=params)
data = numpy.load(BytesIO(r.content))
s = _stats(data[0:-1], data[-1])
return s[1], file.split("_")[2]
# small tool to filter invalid response from the API
def _filter_futures(tasks):
for future in tasks:
try:
yield future.result()
except Exception:
pass
Get NO2 Max for day 15th of each month¶
In [ ]:
Copied!
# Every 15 of each month for all the years
files_15 = list(filter(lambda x: (x.split("_")[2][7:9] == "15"), files))
# Every 15 of each month for all the years
files_15 = list(filter(lambda x: (x.split("_")[2][7:9] == "15"), files))
In [ ]:
Copied!
with futures.ThreadPoolExecutor(max_workers=10) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files_15
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
fig, ax1 = plt.subplots(dpi=150)
fig.autofmt_xdate()
ax1.plot(dates, values, label="No2")
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
with futures.ThreadPoolExecutor(max_workers=10) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files_15
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
fig, ax1 = plt.subplots(dpi=150)
fig.autofmt_xdate()
ax1.plot(dates, values, label="No2")
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
Same but for all the days for the last 16 years¶
In [ ]:
Copied!
with futures.ThreadPoolExecutor(max_workers=50) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
fig, ax1 = plt.subplots(dpi=150)
fig.autofmt_xdate()
ax1.plot(dates, values, label="No2")
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
with futures.ThreadPoolExecutor(max_workers=50) as executor:
future_work = [
executor.submit(fetch_bbox, file) for file in files
]
for f in tqdm(futures.as_completed(future_work), total=len(future_work)):
pass
values, dates = zip(*list(_filter_futures(future_work)))
fig, ax1 = plt.subplots(dpi=150)
fig.autofmt_xdate()
ax1.plot(dates, values, label="No2")
ax1.set_xlabel("Dates")
ax1.set_ylabel("No2")
ax1.legend()
In [ ]:
Copied!