Skip to content

lifespan

Health check implementations for lifespan events.

check_conformance(middleware_classes: list[Middleware], api_url: str, attr_name: str = '__required_conformances__', endpoint: str = '/conformance') async

Check if the upstream API supports a given conformance class.

Source code in src/stac_auth_proxy/utils/lifespan.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def check_conformance(
    middleware_classes: list[Middleware],
    api_url: str,
    attr_name: str = "__required_conformances__",
    endpoint: str = "/conformance",
):
    """Check if the upstream API supports a given conformance class."""
    required_conformances: dict[str, list[str]] = {}
    for middleware in middleware_classes:

        for conformance in getattr(middleware.cls, attr_name, []):
            required_conformances.setdefault(conformance, []).append(
                middleware.cls.__name__
            )

    async with httpx.AsyncClient(base_url=api_url) as client:
        response = await client.get(endpoint)
        response.raise_for_status()
        api_conforms_to = response.json().get("conformsTo", [])

    missing = [
        req_conformance
        for req_conformance in required_conformances.keys()
        if not any(
            re.match(req_conformance, conformance) for conformance in api_conforms_to
        )
    ]

    def conformance_str(conformance: str) -> str:
        return f" - {conformance} [{','.join(required_conformances[conformance])}]"

    if missing:
        missing_str = [conformance_str(c) for c in missing]
        raise RuntimeError(
            "\n".join(
                [
                    "Upstream catalog is missing the following conformance classes:",
                    *missing_str,
                ]
            )
        )
    logger.info(
        "Upstream catalog conforms to the following required conformance classes: \n%s",
        "\n".join([conformance_str(c) for c in required_conformances]),
    )

check_server_health(url: str | HttpUrl, max_retries: int = 10, retry_delay: float = 1.0, retry_delay_max: float = 5.0, timeout: float = 5.0) -> None async

Wait for upstream API to become available.

Source code in src/stac_auth_proxy/utils/lifespan.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def check_server_health(
    url: str | HttpUrl,
    max_retries: int = 10,
    retry_delay: float = 1.0,
    retry_delay_max: float = 5.0,
    timeout: float = 5.0,
) -> None:
    """Wait for upstream API to become available."""
    # Convert url to string if it's a HttpUrl
    if isinstance(url, HttpUrl):
        url = str(url)

    async with httpx.AsyncClient(
        base_url=url, timeout=timeout, follow_redirects=True
    ) as client:
        for attempt in range(max_retries):
            try:
                response = await client.get("/")
                response.raise_for_status()
                logger.info(f"Upstream API {url!r} is healthy")
                return
            except httpx.ConnectError as e:
                logger.warning(f"Upstream health check for {url!r} failed: {e}")
                retry_in = min(retry_delay * (2**attempt), retry_delay_max)
                logger.warning(
                    f"Upstream API {url!r} not healthy, retrying in {retry_in:.1f}s "
                    f"(attempt {attempt + 1}/{max_retries})"
                )
                await asyncio.sleep(retry_in)

    raise RuntimeError(
        f"Upstream API {url!r} failed to respond after {max_retries} attempts"
    )