Skip to content

Class tigrbl_auth.adapters.remote_adapter.RemoteAuthNAdapter

tigrbl_auth.adapters.remote_adapter.RemoteAuthNAdapter

RemoteAuthNAdapter(
    *,
    base_url,
    timeout=0.4,
    cache_ttl=10,
    cache_size=10000,
    client=None,
)

Bases: AuthNProvider

Authenticate every request by calling a remote AuthN service.

Parameters

base_url: Full origin of the AuthN deployment, without trailing slash, e.g. "https://authn.internal:8080". timeout: Float seconds for the outbound HTTP POST (default 0.4 s). cache_ttl: TTL in seconds for the in-process API-key cache (default 10 s). cache_size: Maximum number of distinct API-keys to cache (default 10,000). client: Optional pre-configured httpx.AsyncClient. If omitted, one is created.

Source code in tigrbl_auth/adapters/remote_adapter.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    *,
    base_url: str,
    timeout: float = 0.4,
    cache_ttl: int = 10,
    cache_size: int = 10_000,
    client: httpx.AsyncClient | None = None,
) -> None:
    self.base_url = base_url.rstrip("/")
    self._introspect = f"{self.base_url}/introspect"
    self._client = client or httpx.AsyncClient(
        timeout=timeout,
        headers={"User-Agent": "autoauthn-remote-adapter"},
    )
    self._ttl: Final[int] = cache_ttl
    self._max: Final[int] = cache_size
    self._cache: dict[str, tuple[dict, float]] = {}  # api_key -> (principal, ts)

base_url instance-attribute

base_url = rstrip('/')

get_principal async

get_principal(request, api_key=Security(_API_KEY_REQUIRED))

Resolve and return the principal for a required API key.

Emits a 401 when the key is invalid/expired. Missing header is handled by the security scheme (auto_error=True).

Source code in tigrbl_auth/adapters/remote_adapter.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def get_principal(  # strict: header is required (401 on invalid)
    self,
    request: Request,
    api_key: str = Security(_API_KEY_REQUIRED),
) -> dict:
    """
    Resolve and return the principal for a required API key.

    Emits a 401 when the key is invalid/expired. Missing header is handled
    by the security scheme (auto_error=True).
    """
    principal = self._cache_get(api_key)
    if principal is None:
        principal = await self._introspect_key(api_key)
        if principal is None:
            raise HTTPException(
                status.HTTP_401_UNAUTHORIZED, "Invalid or expired API key"
            )
        self._cache_put(api_key, principal)

    request.state.principal = principal
    principal_var.set(principal)
    set_auth_context(request, principal)
    return principal

get_principal_optional async

get_principal_optional(
    request, api_key=Security(_API_KEY_OPTIONAL)
)

Resolve and return the principal when the API key header is optional.

Returns None when no API key is provided or when introspection fails. Never raises due to missing header (auto_error=False).

Source code in tigrbl_auth/adapters/remote_adapter.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def get_principal_optional(  # optional: header may be absent
    self,
    request: Request,
    api_key: str | None = Security(_API_KEY_OPTIONAL),
) -> dict | None:
    """
    Resolve and return the principal when the API key header is optional.

    Returns ``None`` when no API key is provided or when introspection fails.
    Never raises due to missing header (auto_error=False).
    """
    if not api_key:
        request.state.principal = None
        principal_var.set(None)
        set_auth_context(request, None)
        return None

    principal = self._cache_get(api_key)
    if principal is None:
        principal = await self._introspect_key(api_key)
        if principal is None:
            # For optional auth we do not raise; return None to allow anon flows.
            request.state.principal = None
            principal_var.set(None)
            set_auth_context(request, None)
            return None
        self._cache_put(api_key, principal)

    request.state.principal = principal
    principal_var.set(principal)
    set_auth_context(request, principal)
    return principal