Skip to content

Cql2ValidateResponseBodyMiddleware

Middleware to validate the response body with a CQL2 filter for single-record endpoints.

Cql2ValidateResponseBodyMiddleware dataclass

ASGI middleware to validate the response body with a CQL2 filter for single-record endpoints.

Parameters:

Name Type Description Default
app Callable[list, Awaitable[None]]
required
state_key str
'cql2_filter'
Source code in src/stac_auth_proxy/middleware/Cql2ValidateResponseBodyMiddleware.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@required_conformance(
    r"http://www.opengis.net/spec/cql2/1.0/conf/basic-cql2",
    r"http://www.opengis.net/spec/cql2/1.0/conf/cql2-text",
    r"http://www.opengis.net/spec/cql2/1.0/conf/cql2-json",
)
@dataclass
class Cql2ValidateResponseBodyMiddleware:
    """ASGI middleware to validate the response body with a CQL2 filter for single-record endpoints."""

    app: ASGIApp
    state_key: str = "cql2_filter"

    single_record_endpoints = [
        r"^/collections/([^/]+)/items/([^/]+)$",
        r"^/collections/([^/]+)$",
    ]

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """Validate the response body with a CQL2 filter for single-record endpoints."""
        if scope["type"] != "http":
            return await self.app(scope, receive, send)

        request = Request(scope)
        cql2_filter: Optional[Expr] = getattr(request.state, self.state_key, None)
        if not cql2_filter:
            return await self.app(scope, receive, send)

        if not any(
            re.match(expr, request.url.path) for expr in self.single_record_endpoints
        ):
            return await self.app(scope, receive, send)

        # Intercept the response
        response_start = None
        body_chunks = []
        more_body = True

        async def send_wrapper(message: Message):
            nonlocal response_start, body_chunks, more_body
            if message["type"] == "http.response.start":
                response_start = message
            elif message["type"] == "http.response.body":
                body_chunks.append(message.get("body", b""))
                more_body = message.get("more_body", False)
                if not more_body:
                    await self._process_and_send_response(
                        response_start, body_chunks, send, cql2_filter
                    )
            else:
                await send(message)

        await self.app(scope, receive, send_wrapper)

    async def _process_and_send_response(
        self, response_start, body_chunks, send, cql2_filter
    ):
        body = b"".join(body_chunks)
        try:
            body_json = json.loads(body)
        except json.JSONDecodeError:
            logger.warning("Failed to parse response body as JSON")
            await self._send_json_response(
                send,
                status=502,
                content={
                    "code": "ParseError",
                    "description": "Failed to parse response body as JSON",
                },
            )
            return

        try:
            cql2_matches = cql2_filter.matches(body_json)
        except Exception as e:
            cql2_matches = False
            logger.warning("Failed to apply filter: %s", e)

        if cql2_matches:
            logger.debug("Response matches filter, returning record")
            # Send the original response start
            await send(response_start)
            # Send the filtered body
            await send(
                {
                    "type": "http.response.body",
                    "body": json.dumps(body_json).encode("utf-8"),
                    "more_body": False,
                }
            )
        else:
            logger.debug("Response did not match filter, returning 404")
            await self._send_json_response(
                send,
                status=404,
                content={"code": "NotFoundError", "description": "Record not found."},
            )

    async def _send_json_response(self, send, status, content):
        response_bytes = json.dumps(content).encode("utf-8")
        await send(
            {
                "type": "http.response.start",
                "status": status,
                "headers": [
                    (b"content-type", b"application/json"),
                    (b"content-length", str(len(response_bytes)).encode("latin1")),
                ],
            }
        )
        await send(
            {
                "type": "http.response.body",
                "body": response_bytes,
                "more_body": False,
            }
        )

__call__(scope: Scope, receive: Receive, send: Send) -> None async

Validate the response body with a CQL2 filter for single-record endpoints.

Source code in src/stac_auth_proxy/middleware/Cql2ValidateResponseBodyMiddleware.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
    """Validate the response body with a CQL2 filter for single-record endpoints."""
    if scope["type"] != "http":
        return await self.app(scope, receive, send)

    request = Request(scope)
    cql2_filter: Optional[Expr] = getattr(request.state, self.state_key, None)
    if not cql2_filter:
        return await self.app(scope, receive, send)

    if not any(
        re.match(expr, request.url.path) for expr in self.single_record_endpoints
    ):
        return await self.app(scope, receive, send)

    # Intercept the response
    response_start = None
    body_chunks = []
    more_body = True

    async def send_wrapper(message: Message):
        nonlocal response_start, body_chunks, more_body
        if message["type"] == "http.response.start":
            response_start = message
        elif message["type"] == "http.response.body":
            body_chunks.append(message.get("body", b""))
            more_body = message.get("more_body", False)
            if not more_body:
                await self._process_and_send_response(
                    response_start, body_chunks, send, cql2_filter
                )
        else:
            await send(message)

    await self.app(scope, receive, send_wrapper)