Skip to content

Tools

List of tools available to the agent.

geo_assistant.tools.get_place async

get_place(
    place_name: str, tool_call_id: Annotated[str, InjectedToolCallId] = ""
) -> Command

Get place location from Overture Maps based on user input place name.

Source code in src/geo_assistant/tools/overture.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@tool
async def get_place(
    place_name: str,
    tool_call_id: Annotated[str, InjectedToolCallId] = "",
) -> Command:
    """Get place location from Overture Maps based on user input place name."""

    db_connection = create_database_connection()
    source = os.getenv("OVERTURE_SOURCE", "local")
    if source == "s3":
        data_path = os.getenv("OVERTURE_S3_PATH")
        db_connection.execute("SET s3_region='us-west-2';")
    else:
        data_path = os.getenv("OVERTURE_LOCAL_PATH")

    location_results = db_connection.execute(
        f"""
      LOAD spatial;

      SELECT
          id,
          jaro_winkler_similarity(LOWER(names.primary), LOWER('{place_name}')) AS similarity_score,
          names.primary AS name,
          confidence,
          CAST(socials AS JSON) AS socials,
          ST_AsGeoJSON(geometry) AS geometry,
      FROM read_parquet(
          '{data_path}',
          filename=true,
          hive_partitioning=1
      )
      WHERE jaro_winkler_similarity(LOWER(names.primary), LOWER('{place_name}')) > 0.5
      ORDER BY similarity_score DESC
      LIMIT 1;
  """,
    ).fetchall()

    db_connection.close()

    geometry = json.loads(location_results[0][-1])

    feature = Feature(
        type="Feature",
        geometry=geometry,
        properties={
            "overture_id": location_results[0][0],
            "name": location_results[0][2],
            "socials": location_results[0][4],
        },
    )

    return Command(
        update={
            "place": feature,
            "messages": [
                ToolMessage(
                    content=f"Found place with Overture name: {location_results[0][2]} based on user query. Socials: {location_results[0][4]}",
                    tool_call_id=tool_call_id,
                ),
            ],
        },
    )

geo_assistant.tools.get_search_area async

get_search_area(
    buffer_size_km: float,
    state: Annotated[GeoAssistantState, InjectedState],
    tool_call_id: Annotated[str, InjectedToolCallId] = "",
) -> Command

Get a search area buffer in km around the place defined in the agent state.

Source code in src/geo_assistant/tools/buffer.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@tool
async def get_search_area(
    buffer_size_km: float,
    state: Annotated[GeoAssistantState, InjectedState],
    tool_call_id: Annotated[str, InjectedToolCallId] = "",
) -> Command:
    """Get a search area buffer in km around the place defined in the agent state."""

    place_feature = state.get("place")

    if not place_feature:
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content="No place defined in the agent state to create a search area around.",
                        tool_call_id=tool_call_id,
                    ),
                ],
            },
        )

    # Convert GeoJSON feature to GeoDataFrame
    gdf = gpd.GeoDataFrame.from_features(features=[place_feature])
    gdf.crs = "EPSG:4326"

    gdf_m = gdf.to_crs(epsg=3857)  # latlon to Web Mercator for meter-based buffering

    gdf_m["geometry"] = gdf_m["geometry"].buffer(
        buffer_size_km * 1000,
    )  # Buffer in meters
    gdf = gdf_m.to_crs(epsg=4326)  # Back to WGS84

    # Convert back to GeoJSON feature
    if len(gdf) != 1:
        raise ValueError(
            f"{len(gdf)} features found after buffer operation, should be just 1. "
            "Was a Multi-Point/LineString/Polygon geometry passed in?",
        )
    buffer_feature = Feature(
        type="Feature",
        geometry=gdf.iloc[0].geometry.__geo_interface__,
        properties=place_feature.properties.copy(),
    )

    return Command(
        update={
            "search_area": buffer_feature,
            "messages": [
                ToolMessage(
                    content=f"Created search area geometry buffer of {buffer_size_km} km around the place.",
                    tool_call_id=tool_call_id,
                ),
            ],
        },
    )

geo_assistant.tools.fetch_naip_img async

fetch_naip_img(
    start_date: str,
    end_date: str,
    state: Annotated[GeoAssistantState, InjectedState],
    tool_call_id: Annotated[str | None, InjectedToolCallId] = None,
) -> Command

Query Microsoft Planetary Computer for NAIP imagery intersecting an AOI and date range, load all matching items into an xarray data cube using odc-stac, and save a simple RGB composite as a JPEG.

Parameters:

  • start_date (str) –

    Start date (YYYY-MM-DD).

  • end_date (str) –

    End date (YYYY-MM-DD).

Source code in src/geo_assistant/tools/naip.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@tool("fetch_naip_img")
async def fetch_naip_img(
    start_date: str,
    end_date: str,
    state: Annotated[GeoAssistantState, InjectedState],
    tool_call_id: Annotated[str | None, InjectedToolCallId] = None,
) -> Command:
    """
    Query Microsoft Planetary Computer for NAIP imagery intersecting an AOI and
    date range, load all matching items into an xarray data cube using odc-stac,
    and save a simple RGB composite as a JPEG.

    Args:
        start_date: Start date (YYYY-MM-DD).
        end_date: End date (YYYY-MM-DD).

    """
    if not state["search_area"]:
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content="No search area avilable yetmee",
                        tool_call_id=tool_call_id,
                    ),
                ],
                "naip_img_bytes": None,
            },
        )
    # --- 1. STAC search on Element84's EarthSearch API ---
    catalog = Client.open(DATA_URL)

    search = catalog.search(
        collections=["naip"],
        intersects=state["search_area"].geometry,
        datetime=f"{start_date}/{end_date}",
    )

    items = list(search.items())

    # This is a hack to add raster extension info to the items, since
    # the Planetary Computer STAC API adds the band information using the
    # eo:bands extension, but odc.stac expects the raster:bands extension.
    for item in items:
        item.assets["image"].ext.add("raster")
        item.assets["image"].ext.raster.bands = [
            RasterBand.create() for _ in ("red", "green", "blue", "nir")
        ]

    if len(items) == 0:
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content="No NAIP imagery found for the specified area and date range.",
                        tool_call_id=tool_call_id,
                    ),
                ],
                "naip_img_bytes": None,
            },
        )

    # --- 2. Load as xarray cube with odc.stac ---
    # NAIP in MPC: 4-band multi-band asset (R,G,B,NIR) in one asset named "image".
    # odc.stac exposes these as measurements 'red','green','blue','nir' for this collection
    # Limit to first item for now
    with ThreadPoolExecutor(max_workers=5) as executor:
        ds: xr.Dataset = stac_load(
            items[:1],
            bands=["red", "green", "blue"],  # use only RGB
            geopolygon=state["search_area"].geometry,
            resolution=1.0,  # NAIP native ~1 m
            executor=executor,
            crs=items[0].properties["proj:code"],
        )

    if ds.dims.get("time", 0) == 0:
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content="Unable to load NAIP RGB image, dataset has no time dimension",
                        tool_call_id=tool_call_id,
                    ),
                ],
                "naip_img_bytes": None,
            },
        )

    # Enforce max output size based on dataset sizes (y, x)
    sizes = dict(ds.sizes)
    h = int(sizes.get("y", 0))
    w = int(sizes.get("x", 0))
    if h > 512 or w > 512:
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content=f"NAIP RGB image {w}x{h} exceeds 512x512 limit. Skipping image output.",
                        tool_call_id=tool_call_id,
                    ),
                ],
                "naip_img_bytes": None,
            },
        )

    # --- 3. Build an RGB composite from the cube ---
    # For the JPEG, we'll just use the first time slice (you can swap in “latest”
    # or a temporal reduction if you prefer).
    red = ds["red"].isel(time=0)
    green = ds["green"].isel(time=0)
    blue = ds["blue"].isel(time=0)

    # Stack into (y, x, 3) array
    rgb = xr.concat([red, green, blue], dim="band")  # (band, y, x)
    rgb = rgb.transpose("y", "x", "band")  # (y, x, band)

    # Convert to uint8 for JPEG with a simple contrast stretch.
    arr = rgb.values.astype("float32")
    # Robust min/max to avoid a few hot pixels blowing out the stretch
    vmin = np.nanpercentile(arr, 2)
    vmax = np.nanpercentile(arr, 98)
    if vmax <= vmin:
        vmin, vmax = np.nanmin(arr), np.nanmax(arr)

    arr = np.clip((arr - vmin) / (vmax - vmin + 1e-6), 0, 1)
    arr_uint8 = (arr * 255).astype("uint8")

    # --- 4. Save image ---

    buf = BytesIO()
    plt.imsave(buf, arr_uint8, format="jpeg")
    buf.seek(0)
    img_base64 = base64.b64encode(buf.read()).decode("utf-8")

    return Command(
        update={
            "messages": [
                ToolMessage(
                    content="NAIP RGB image fetched and encoded as JPEG bytes.",
                    tool_call_id=tool_call_id,
                ),
            ],
            "naip_img_bytes": img_base64,
        },
    )

geo_assistant.tools.summarize_sat_img async

summarize_sat_img(
    state: Annotated[GeoAssistantState, InjectedState],
    tool_call_id: Annotated[str | None, InjectedToolCallId] = None,
) -> Command

Summarize the contents of a satellite image using an LLM.

Returns:

  • Command

    Command containing the image summary and metadata

Raises:

  • ValueError

    If the image URL is invalid or the image cannot be processed

Source code in src/geo_assistant/tools/summarize.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@tool
async def summarize_sat_img(
    state: Annotated[GeoAssistantState, InjectedState],
    tool_call_id: Annotated[str | None, InjectedToolCallId] = None,
) -> Command:
    """Summarize the contents of a satellite image using an LLM.

    Returns:
        Command containing the image summary and metadata

    Raises:
        ValueError: If the image URL is invalid or the image cannot be processed
    """
    if not state["naip_img_bytes"]:
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content="No NAIP image bytes available yet",
                        tool_call_id=tool_call_id,
                    ),
                ],
            },
        )
    img_url = f"data:image/jpeg;base64,{state['naip_img_bytes']}"
    summary = _SUMMARIZER_AGENT(img_url)
    message_content = summary.answer
    return Command(
        update={
            "messages": [
                ToolMessage(content=message_content, tool_call_id=tool_call_id),
            ],
        },
    )