RemoteOIDCTokenService(
issuer,
*,
jwks_url=None,
cache_ttl_s=300,
request_timeout_s=5,
user_agent="RemoteOIDCTokenService/1.0",
expected_alg_whitelist=None,
accept_unsigned=False,
)
Bases: TokenServiceBase
Verify-only OIDC token service backed by remote discovery + JWKS.
Features
- Resolves OIDC discovery: /.well-known/openid-configuration → jwks_uri.
(You may also pass jwks_url directly; that bypasses discovery.)
- Caches discovery + JWKS in-memory with TTL; thread-safe refresh; honors
ETag / Last-Modified for conditional GETs.
- Strict issuer check (iss must equal configured issuer).
- Audience validation (optional); clock skew leeway configurable.
- Supports any JWS algs announced by the issuer; falls back to common algs.
- Exposes jwks() for debugging/inspection.
- No mint() (raises NotImplementedError).
Constructor
RemoteOIDCTokenService(
issuer: str,
*,
jwks_url: Optional[str] = None,
cache_ttl_s: int = 300,
request_timeout_s: int = 5,
user_agent: str = "RemoteOIDCTokenService/1.0",
expected_alg_whitelist: Optional[Iterable[str]] = None,
accept_unsigned: bool = False, # for test envs only; strongly discouraged
)
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 | def __init__(
self,
issuer: str,
*,
jwks_url: Optional[str] = None,
cache_ttl_s: int = 300,
request_timeout_s: int = 5,
user_agent: str = "RemoteOIDCTokenService/1.0",
expected_alg_whitelist: Optional[Iterable[str]] = None,
accept_unsigned: bool = False,
) -> None:
super().__init__()
if not issuer:
raise ValueError("issuer is required")
self._issuer = issuer.rstrip("/")
self._jwks_url_config = jwks_url
self._cache_ttl_s = int(cache_ttl_s)
self._timeout_s = int(request_timeout_s)
self._ua = user_agent
self._accept_unsigned = bool(accept_unsigned)
# Discovery cache
self._disc_obj: Optional[dict] = None
self._disc_at: float = 0.0
self._disc_etag: Optional[str] = None
self._disc_lastmod: Optional[str] = None
# JWKS cache
self._jwks_obj: Optional[dict] = None
self._jwks_at: float = 0.0
self._jwks_etag: Optional[str] = None
self._jwks_lastmod: Optional[str] = None
# accepted algs
self._allowed_algs: Optional[Tuple[str, ...]] = (
tuple(expected_alg_whitelist) if expected_alg_whitelist else None
)
# Pre-resolve discovery on init (best-effort)
try:
self._ensure_discovery_locked(force=False)
except Exception:
# non-fatal at construction; will retry on first verify()
pass
|
type
class-attribute
instance-attribute
type = 'RemoteOIDCTokenService'
supports
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
153
154
155
156
157
158
159
160
161
162 | def supports(self) -> Mapping[str, Iterable[str]]:
algs = self._allowed_algs or (
# Sensible defaults; may be narrowed by discovery later
"RS256",
"PS256",
"ES256",
"EdDSA",
)
fmts = ("JWT", "JWS")
return {"formats": fmts, "algs": algs}
|
mint
async
mint(
claims,
*,
alg,
kid=None,
key_version=None,
headers=None,
lifetime_s=3600,
issuer=None,
subject=None,
audience=None,
scope=None,
)
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 | async def mint(
self,
claims: Dict[str, Any],
*,
alg: str,
kid: str | None = None,
key_version: int | None = None,
headers: Optional[Dict[str, Any]] = None,
lifetime_s: Optional[int] = 3600,
issuer: Optional[str] = None,
subject: Optional[str] = None,
audience: Optional[str | list[str]] = None,
scope: Optional[str] = None,
) -> str: # pragma: no cover - mint not implemented
raise NotImplementedError(
"RemoteOIDCTokenService is verification-only (no mint)"
)
|
verify
async
verify(
token,
*,
issuer=None,
audience=None,
leeway_s=60,
max_age_s=None,
nonce=None,
)
Verify a JWT/JWS against the configured OIDC issuer and remote JWKS.
Checks performed
- JWS signature using remote JWKS (by 'kid' header).
- 'iss' must equal configured issuer (or explicit 'issuer' arg if provided).
- 'aud' validated if provided by caller.
- 'exp','nbf','iat' validated with 'leeway_s'.
- Optional 'nonce' and 'auth_time'/'max_age' checks if provided.
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266 | async def verify(
self,
token: str,
*,
issuer: Optional[str] = None,
audience: Optional[str | list[str]] = None,
leeway_s: int = 60,
max_age_s: Optional[int] = None, # optional OIDC 'max_age' enforcement
nonce: Optional[str] = None, # if you flow through /authorize with nonce
) -> Dict[str, Any]:
"""
Verify a JWT/JWS against the configured OIDC issuer and remote JWKS.
Checks performed:
- JWS signature using remote JWKS (by 'kid' header).
- 'iss' must equal configured issuer (or explicit 'issuer' arg if provided).
- 'aud' validated if provided by caller.
- 'exp','nbf','iat' validated with 'leeway_s'.
- Optional 'nonce' and 'auth_time'/'max_age' checks if provided.
"""
# Refresh caches if stale
with self._lock:
self._ensure_discovery_locked(force=False)
self._ensure_jwks_locked(force=False)
iss_expected = issuer or self._issuer
# Choose allowed algorithms
allowed = self._derive_allowed_algs_locked()
# Build a key resolver that picks verification key from cached JWKS by kid
jwks = self._jwks_obj or {"keys": []}
def _resolve_key(header, payload): # pragma: no cover - internal
kid = header.get("kid")
if header.get("alg") == "none":
return (
None
if self._accept_unsigned
else jwt.InvalidAlgorithmError("Unsigned tokens are not accepted")
)
if not kid:
return None
for jwk in jwks.get("keys", []):
if jwk.get("kid") == kid:
kty = jwk.get("kty")
if kty == "RSA":
return algorithms.RSAAlgorithm.from_jwk(jwk)
if kty == "EC":
return algorithms.ECAlgorithm.from_jwk(jwk)
if kty == "OKP":
return algorithms.Ed25519Algorithm.from_jwk(jwk)
if kty == "oct":
return algorithms.HMACAlgorithm.from_jwk(jwk)
return None
options = {
"verify_aud": audience is not None,
"require": ["exp", "iat"],
}
header = jwt.get_unverified_header(token)
key_obj = _resolve_key(header, None)
claims = jwt.decode(
token,
key=key_obj,
algorithms=list(allowed),
audience=audience,
issuer=iss_expected,
leeway=leeway_s,
options=options,
)
if nonce is not None:
if claims.get("nonce") != nonce:
raise jwt.InvalidTokenError("OIDC nonce mismatch")
if max_age_s is not None:
auth_time = claims.get("auth_time")
now = int(_now())
if isinstance(auth_time, int):
if now > (auth_time + int(max_age_s) + int(leeway_s)):
raise jwt.ExpiredSignatureError("OIDC max_age exceeded")
return claims
|
jwks
async
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
| async def jwks(self) -> dict:
with self._lock:
self._ensure_discovery_locked(force=False)
self._ensure_jwks_locked(force=False)
return dict(self._jwks_obj or {"keys": []})
|
refresh
Synchronous refresh of discovery + JWKS caches. Safe to call
at process start or when you receive a rotation signal.
Source code in swarmauri_tokens_remoteoidc/RemoteOIDCTokenService.py
274
275
276
277
278
279
280
281 | def refresh(self, *, force: bool = False) -> None:
"""
Synchronous refresh of discovery + JWKS caches. Safe to call
at process start or when you receive a rotation signal.
"""
with self._lock:
self._ensure_discovery_locked(force=force)
self._ensure_jwks_locked(force=force)
|