Skip to content

AuthenticationExtensionMiddleware

Middleware to add auth information to item response served by upstream API.

AuthenticationExtensionMiddleware dataclass

Bases: JsonResponseMiddleware

Middleware to add the authentication extension to the response.

Parameters:

Name Type Description Default
app Callable[list, Awaitable[None]]
required
default_public bool
required
private_endpoints dict[str, Sequence[Literal[GET, POST, PUT, DELETE, PATCH]]]
required
public_endpoints dict[str, Sequence[Literal[GET, POST, PUT, DELETE, PATCH]]]
required
oidc_discovery_url str
required
auth_scheme_name str
'oidc'
auth_scheme dict[str, Any]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>
extension_url str
'https://stac-extensions.github.io/authentication/v1.1.0/schema.json'
json_content_type_expr str
'application/(geo\\+)?json'
Source code in src/stac_auth_proxy/middleware/AuthenticationExtensionMiddleware.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@dataclass
class AuthenticationExtensionMiddleware(JsonResponseMiddleware):
    """Middleware to add the authentication extension to the response."""

    app: ASGIApp

    default_public: bool
    private_endpoints: EndpointMethods
    public_endpoints: EndpointMethods

    oidc_discovery_url: str
    auth_scheme_name: str = "oidc"
    auth_scheme: dict[str, Any] = field(default_factory=dict)
    extension_url: str = (
        "https://stac-extensions.github.io/authentication/v1.1.0/schema.json"
    )

    json_content_type_expr: str = r"application/(geo\+)?json"

    def should_transform_response(self, request: Request, scope: Scope) -> bool:
        """Determine if the response should be transformed."""
        # Match STAC catalog, collection, or item URLs with a single regex
        return (
            all(
                (
                    re.match(expr, val)
                    for expr, val in [
                        (
                            # catalog, collections, collection, items, item, search
                            r"^(/|/collections(/[^/]+(/items(/[^/]+)?)?)?|/search)$",
                            request.url.path,
                        ),
                        (
                            self.json_content_type_expr,
                            Headers(scope=scope).get("content-type", ""),
                        ),
                    ]
                ),
            )
            and 200 <= scope["status"] < 300
        )

    def transform_json(self, data: dict[str, Any], request: Request) -> dict[str, Any]:
        """Augment the STAC Item with auth information."""
        extensions = data.setdefault("stac_extensions", [])
        if self.extension_url not in extensions:
            extensions.append(self.extension_url)

        # auth:schemes
        # ---
        # A property that contains all of the scheme definitions used by Assets and
        # Links in the STAC Item or Collection.
        # - Catalogs
        # - Collections
        # - Item Properties

        scheme_loc = data["properties"] if "properties" in data else data
        schemes = scheme_loc.setdefault("auth:schemes", {})
        schemes[self.auth_scheme_name] = {
            "type": "openIdConnect",
            "openIdConnectUrl": self.oidc_discovery_url,
        }

        # auth:refs
        # ---
        # Annotate links with "auth:refs": [auth_scheme]
        for link in get_links(data):
            if "href" not in link:
                logger.warning("Link %s has no href", link)
                continue
            match = find_match(
                path=urlparse(link["href"]).path,
                method="GET",
                private_endpoints=self.private_endpoints,
                public_endpoints=self.public_endpoints,
                default_public=self.default_public,
            )
            if match.is_private:
                link.setdefault("auth:refs", []).append(self.auth_scheme_name)

        return data

should_transform_response(request: Request, scope: Scope) -> bool

Determine if the response should be transformed.

Source code in src/stac_auth_proxy/middleware/AuthenticationExtensionMiddleware.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def should_transform_response(self, request: Request, scope: Scope) -> bool:
    """Determine if the response should be transformed."""
    # Match STAC catalog, collection, or item URLs with a single regex
    return (
        all(
            (
                re.match(expr, val)
                for expr, val in [
                    (
                        # catalog, collections, collection, items, item, search
                        r"^(/|/collections(/[^/]+(/items(/[^/]+)?)?)?|/search)$",
                        request.url.path,
                    ),
                    (
                        self.json_content_type_expr,
                        Headers(scope=scope).get("content-type", ""),
                    ),
                ]
            ),
        )
        and 200 <= scope["status"] < 300
    )

transform_json(data: dict[str, Any], request: Request) -> dict[str, Any]

Augment the STAC Item with auth information.

Source code in src/stac_auth_proxy/middleware/AuthenticationExtensionMiddleware.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def transform_json(self, data: dict[str, Any], request: Request) -> dict[str, Any]:
    """Augment the STAC Item with auth information."""
    extensions = data.setdefault("stac_extensions", [])
    if self.extension_url not in extensions:
        extensions.append(self.extension_url)

    # auth:schemes
    # ---
    # A property that contains all of the scheme definitions used by Assets and
    # Links in the STAC Item or Collection.
    # - Catalogs
    # - Collections
    # - Item Properties

    scheme_loc = data["properties"] if "properties" in data else data
    schemes = scheme_loc.setdefault("auth:schemes", {})
    schemes[self.auth_scheme_name] = {
        "type": "openIdConnect",
        "openIdConnectUrl": self.oidc_discovery_url,
    }

    # auth:refs
    # ---
    # Annotate links with "auth:refs": [auth_scheme]
    for link in get_links(data):
        if "href" not in link:
            logger.warning("Link %s has no href", link)
            continue
        match = find_match(
            path=urlparse(link["href"]).path,
            method="GET",
            private_endpoints=self.private_endpoints,
            public_endpoints=self.public_endpoints,
            default_public=self.default_public,
        )
        if match.is_private:
            link.setdefault("auth:refs", []).append(self.auth_scheme_name)

    return data