Skip to content

Class swarmauri_tokens_remoteoidc.RemoteOIDCTokenService.RemoteOIDCTokenService

swarmauri_tokens_remoteoidc.RemoteOIDCTokenService.RemoteOIDCTokenService

RemoteOIDCTokenService(
    issuer,
    *,
    jwks_url=None,
    cache_ttl_s=300,
    request_timeout_s=5,
    user_agent="RemoteOIDCTokenService/1.0",
    expected_alg_whitelist=None,
    accept_unsigned=False,
)

Bases: TokenServiceBase

Verify-only OIDC token service backed by remote discovery + JWKS.

Features

  • Resolves OIDC discovery: /.well-known/openid-configuration → jwks_uri. (You may also pass jwks_url directly; that bypasses discovery.)
  • Caches discovery + JWKS in-memory with TTL; thread-safe refresh; honors ETag / Last-Modified for conditional GETs.
  • Strict issuer check (iss must equal configured issuer).
  • Audience validation (optional); clock skew leeway configurable.
  • Supports any JWS algs announced by the issuer; falls back to common algs.
  • Exposes jwks() for debugging/inspection.
  • No mint() (raises NotImplementedError).

Constructor

RemoteOIDCTokenService( issuer: str, *, jwks_url: Optional[str] = None, cache_ttl_s: int = 300, request_timeout_s: int = 5, user_agent: str = "RemoteOIDCTokenService/1.0", expected_alg_whitelist: Optional[Iterable[str]] = None, accept_unsigned: bool = False, # for test envs only; strongly discouraged )

Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def __init__(
    self,
    issuer: str,
    *,
    jwks_url: Optional[str] = None,
    cache_ttl_s: int = 300,
    request_timeout_s: int = 5,
    user_agent: str = "RemoteOIDCTokenService/1.0",
    expected_alg_whitelist: Optional[Iterable[str]] = None,
    accept_unsigned: bool = False,
) -> None:
    super().__init__()
    if not issuer:
        raise ValueError("issuer is required")
    self._issuer = issuer.rstrip("/")
    self._jwks_url_config = jwks_url
    self._cache_ttl_s = int(cache_ttl_s)
    self._timeout_s = int(request_timeout_s)
    self._ua = user_agent
    self._accept_unsigned = bool(accept_unsigned)

    # Discovery cache
    self._disc_obj: Optional[dict] = None
    self._disc_at: float = 0.0
    self._disc_etag: Optional[str] = None
    self._disc_lastmod: Optional[str] = None

    # JWKS cache
    self._jwks_obj: Optional[dict] = None
    self._jwks_at: float = 0.0
    self._jwks_etag: Optional[str] = None
    self._jwks_lastmod: Optional[str] = None

    # accepted algs
    self._allowed_algs: Optional[Tuple[str, ...]] = (
        tuple(expected_alg_whitelist) if expected_alg_whitelist else None
    )

    # Pre-resolve discovery on init (best-effort)
    try:
        self._ensure_discovery_locked(force=False)
    except Exception:
        # non-fatal at construction; will retry on first verify()
        pass

type class-attribute instance-attribute

type = 'RemoteOIDCTokenService'

supports

supports()
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
153
154
155
156
157
158
159
160
161
162
def supports(self) -> Mapping[str, Iterable[str]]:
    algs = self._allowed_algs or (
        # Sensible defaults; may be narrowed by discovery later
        "RS256",
        "PS256",
        "ES256",
        "EdDSA",
    )
    fmts = ("JWT", "JWS")
    return {"formats": fmts, "algs": algs}

mint async

mint(
    claims,
    *,
    alg,
    kid=None,
    key_version=None,
    headers=None,
    lifetime_s=3600,
    issuer=None,
    subject=None,
    audience=None,
    scope=None,
)
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
async def mint(
    self,
    claims: Dict[str, Any],
    *,
    alg: str,
    kid: str | None = None,
    key_version: int | None = None,
    headers: Optional[Dict[str, Any]] = None,
    lifetime_s: Optional[int] = 3600,
    issuer: Optional[str] = None,
    subject: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    scope: Optional[str] = None,
) -> str:  # pragma: no cover - mint not implemented
    raise NotImplementedError(
        "RemoteOIDCTokenService is verification-only (no mint)"
    )

verify async

verify(
    token,
    *,
    issuer=None,
    audience=None,
    leeway_s=60,
    max_age_s=None,
    nonce=None,
)

Verify a JWT/JWS against the configured OIDC issuer and remote JWKS.

Checks performed
  • JWS signature using remote JWKS (by 'kid' header).
  • 'iss' must equal configured issuer (or explicit 'issuer' arg if provided).
  • 'aud' validated if provided by caller.
  • 'exp','nbf','iat' validated with 'leeway_s'.
  • Optional 'nonce' and 'auth_time'/'max_age' checks if provided.
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
async def verify(
    self,
    token: str,
    *,
    issuer: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    leeway_s: int = 60,
    max_age_s: Optional[int] = None,  # optional OIDC 'max_age' enforcement
    nonce: Optional[str] = None,  # if you flow through /authorize with nonce
) -> Dict[str, Any]:
    """
    Verify a JWT/JWS against the configured OIDC issuer and remote JWKS.

    Checks performed:
      - JWS signature using remote JWKS (by 'kid' header).
      - 'iss' must equal configured issuer (or explicit 'issuer' arg if provided).
      - 'aud' validated if provided by caller.
      - 'exp','nbf','iat' validated with 'leeway_s'.
      - Optional 'nonce' and 'auth_time'/'max_age' checks if provided.
    """
    # Refresh caches if stale
    with self._lock:
        self._ensure_discovery_locked(force=False)
        self._ensure_jwks_locked(force=False)

        iss_expected = issuer or self._issuer

        # Choose allowed algorithms
        allowed = self._derive_allowed_algs_locked()

        # Build a key resolver that picks verification key from cached JWKS by kid
        jwks = self._jwks_obj or {"keys": []}

    def _resolve_key(header, payload):  # pragma: no cover - internal
        kid = header.get("kid")
        if header.get("alg") == "none":
            return (
                None
                if self._accept_unsigned
                else jwt.InvalidAlgorithmError("Unsigned tokens are not accepted")
            )
        if not kid:
            return None
        for jwk in jwks.get("keys", []):
            if jwk.get("kid") == kid:
                kty = jwk.get("kty")
                if kty == "RSA":
                    return algorithms.RSAAlgorithm.from_jwk(jwk)
                if kty == "EC":
                    return algorithms.ECAlgorithm.from_jwk(jwk)
                if kty == "OKP":
                    return algorithms.Ed25519Algorithm.from_jwk(jwk)
                if kty == "oct":
                    return algorithms.HMACAlgorithm.from_jwk(jwk)
        return None

    options = {
        "verify_aud": audience is not None,
        "require": ["exp", "iat"],
    }

    header = jwt.get_unverified_header(token)
    key_obj = _resolve_key(header, None)
    claims = jwt.decode(
        token,
        key=key_obj,
        algorithms=list(allowed),
        audience=audience,
        issuer=iss_expected,
        leeway=leeway_s,
        options=options,
    )

    if nonce is not None:
        if claims.get("nonce") != nonce:
            raise jwt.InvalidTokenError("OIDC nonce mismatch")

    if max_age_s is not None:
        auth_time = claims.get("auth_time")
        now = int(_now())
        if isinstance(auth_time, int):
            if now > (auth_time + int(max_age_s) + int(leeway_s)):
                raise jwt.ExpiredSignatureError("OIDC max_age exceeded")

    return claims

jwks async

jwks()
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
268
269
270
271
272
async def jwks(self) -> dict:
    with self._lock:
        self._ensure_discovery_locked(force=False)
        self._ensure_jwks_locked(force=False)
        return dict(self._jwks_obj or {"keys": []})

refresh

refresh(*, force=False)

Synchronous refresh of discovery + JWKS caches. Safe to call at process start or when you receive a rotation signal.

Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
274
275
276
277
278
279
280
281
def refresh(self, *, force: bool = False) -> None:
    """
    Synchronous refresh of discovery + JWKS caches. Safe to call
    at process start or when you receive a rotation signal.
    """
    with self._lock:
        self._ensure_discovery_locked(force=force)
        self._ensure_jwks_locked(force=force)