Skip to content

opa

Integration with Open Policy Agent (OPA) to generate CQL2 filters for requests to a STAC API.

Opa dataclass

Call Open Policy Agent (OPA) to generate CQL2 filters from request context.

Parameters:

Name Type Description Default
host str
required
decision str
required
cache_key str
'req.headers.authorization'
cache_ttl float
5.0

Attributes:

Name Type Description
client AsyncClient
cache MemoryCache
Source code in src/stac_auth_proxy/filters/opa.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@dataclass
class Opa:
    """Call Open Policy Agent (OPA) to generate CQL2 filters from request context."""

    host: str
    decision: str

    client: httpx.AsyncClient = field(init=False)
    cache: MemoryCache = field(init=False)
    cache_key: str = "req.headers.authorization"
    cache_ttl: float = 5.0

    def __post_init__(self):
        """Initialize the client."""
        self.client = httpx.AsyncClient(base_url=self.host)
        self.cache = MemoryCache(ttl=self.cache_ttl)

    async def __call__(self, context: dict[str, Any]) -> str:
        """Generate a CQL2 filter for the request."""
        token = get_value_by_path(context, self.cache_key)
        try:
            expr_str = self.cache[token]
        except KeyError:
            expr_str = await self._fetch(context)
            self.cache[token] = expr_str
        return expr_str

    async def _fetch(self, context: dict[str, Any]) -> str:
        """Fetch the CQL2 filter from OPA."""
        response = await self.client.post(
            f"/v1/data/{self.decision}",
            json={"input": context},
        )
        return response.raise_for_status().json()["result"]

__call__(context: dict[str, Any]) -> str async

Generate a CQL2 filter for the request.

Source code in src/stac_auth_proxy/filters/opa.py
28
29
30
31
32
33
34
35
36
async def __call__(self, context: dict[str, Any]) -> str:
    """Generate a CQL2 filter for the request."""
    token = get_value_by_path(context, self.cache_key)
    try:
        expr_str = self.cache[token]
    except KeyError:
        expr_str = await self._fetch(context)
        self.cache[token] = expr_str
    return expr_str

__post_init__()

Initialize the client.

Source code in src/stac_auth_proxy/filters/opa.py
23
24
25
26
def __post_init__(self):
    """Initialize the client."""
    self.client = httpx.AsyncClient(base_url=self.host)
    self.cache = MemoryCache(ttl=self.cache_ttl)