Stateless JWT helper backed by JWTTokenService
.
JWTCoder
historically accepted a private/public key pair and
constructed its own :class:JWTTokenService
. Recent refactoring switched
the constructor to require an already configured service instance. The
tests for RFC 9068 still rely on the original behaviour, so the
initializer now supports both invocation styles:
JWTCoder(service, kid)
-- use the provided service directly.
JWTCoder(private_key_pem, public_key_pem)
-- build an ephemeral
service from the PEM encoded Ed25519 key pair.
Source code in tigrbl_auth/jwtoken.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | def __init__(self, arg1: JWTTokenService | bytes, arg2: str | bytes):
if isinstance(arg1, JWTTokenService) and isinstance(arg2, str):
self._svc = arg1
self._kid = arg2
return
if isinstance(arg1, (bytes, bytearray)) and isinstance(
arg2, (bytes, bytearray)
):
kp = LocalKeyProvider()
spec = KeySpec(
klass=KeyClass.asymmetric,
alg=KeyAlg.ED25519,
uses=(KeyUse.SIGN, KeyUse.VERIFY),
export_policy=ExportPolicy.SECRET_WHEN_ALLOWED,
label="jwt_ed25519",
)
ref = _run(kp.import_key(spec, arg1, public=arg2))
self._svc = JWTTokenService(kp)
self._kid = ref.kid
return
raise TypeError(
"JWTCoder requires (JWTTokenService, kid) or (private_pem, public_pem)"
)
|
default
classmethod
Source code in tigrbl_auth/jwtoken.py
| @classmethod
def default(cls) -> "JWTCoder":
svc, kid = _svc()
return cls(svc, kid)
|
async_sign
async
async_sign(
*,
sub,
tid=None,
ttl=_ACCESS_TTL,
typ="access",
issuer=None,
audience=None,
cert_thumbprint=None,
**extra,
)
Source code in tigrbl_auth/jwtoken.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 | async def async_sign(
self,
*,
sub: str,
tid: Optional[str] = None,
ttl: timedelta = _ACCESS_TTL,
typ: str = "access",
issuer: Optional[str] = None,
audience: Optional[Iterable[str] | str] = None,
cert_thumbprint: Optional[str] = None,
**extra: Any,
) -> str:
now = datetime.now(timezone.utc)
payload: Dict[str, Any] = {
"sub": sub,
"typ": typ,
"iat": int(now.timestamp()),
"exp": int((now + ttl).timestamp()),
**extra,
}
if tid is not None:
payload["tid"] = tid
if settings.enable_rfc8705:
if cert_thumbprint is None:
raise ValueError(
"cert_thumbprint required when RFC 8705 support is enabled",
)
payload["cnf"] = {"x5t#S256": cert_thumbprint}
if settings.enable_rfc9068:
if issuer is None or audience is None:
raise ValueError(
"issuer and audience required when RFC 9068 support is enabled",
)
from .rfc.rfc9068 import add_rfc9068_claims
payload = add_rfc9068_claims(payload, issuer=issuer, audience=audience)
token = await self._svc.mint(
payload,
alg=JWAAlg.EDDSA,
kid=self._kid,
lifetime_s=int(ttl.total_seconds()),
subject=sub,
issuer=issuer,
audience=audience,
)
if settings.enable_rfc7662:
claims = {"sub": sub, "kind": typ}
if tid is not None:
claims["tid"] = tid
register_token(token, claims)
return token
|
sign
sign(
*,
sub,
tid=None,
ttl=_ACCESS_TTL,
typ="access",
issuer=None,
audience=None,
cert_thumbprint=None,
**extra,
)
Source code in tigrbl_auth/jwtoken.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 | def sign(
self,
*,
sub: str,
tid: Optional[str] = None,
ttl: timedelta = _ACCESS_TTL,
typ: str = "access",
issuer: Optional[str] = None,
audience: Optional[Iterable[str] | str] = None,
cert_thumbprint: Optional[str] = None,
**extra: Any,
) -> str:
return _run(
self.async_sign(
sub=sub,
tid=tid,
ttl=ttl,
typ=typ,
issuer=issuer,
audience=audience,
cert_thumbprint=cert_thumbprint,
**extra,
),
)
|
async_sign_pair
async
async_sign_pair(*, sub, tid, cert_thumbprint=None, **extra)
Source code in tigrbl_auth/jwtoken.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 | async def async_sign_pair(
self, *, sub: str, tid: str, cert_thumbprint: Optional[str] = None, **extra: Any
) -> Tuple[str, str]:
access = await self.async_sign(
sub=sub, tid=tid, cert_thumbprint=cert_thumbprint, **extra
)
refresh = await self.async_sign(
sub=sub,
tid=tid,
ttl=_REFRESH_TTL,
typ="refresh",
cert_thumbprint=cert_thumbprint,
**extra,
)
return access, refresh
|
sign_pair
sign_pair(*, sub, tid, cert_thumbprint=None, **extra)
Source code in tigrbl_auth/jwtoken.py
228
229
230
231
232
233
234
235
236
237
238 | def sign_pair(
self, *, sub: str, tid: str, cert_thumbprint: Optional[str] = None, **extra: Any
) -> Tuple[str, str]:
return _run(
self.async_sign_pair(
sub=sub,
tid=tid,
cert_thumbprint=cert_thumbprint,
**extra,
),
)
|
async_decode
async
async_decode(
token,
verify_exp=True,
issuer=None,
audience=None,
cert_thumbprint=None,
)
Source code in tigrbl_auth/jwtoken.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 | async def async_decode(
self,
token: str,
verify_exp: bool = True,
issuer: Optional[str] = None,
audience: Optional[Iterable[str] | str] = None,
cert_thumbprint: Optional[str] = None,
) -> Dict[str, Any]:
if is_revoked(token):
raise InvalidTokenError("token has been revoked")
if _header_alg(token) in {"", "none"}:
raise InvalidTokenError("unsigned JWTs are not accepted")
try:
payload = await self._svc.verify(token, issuer=issuer, audience=audience)
except Exception as exc:
# Delegate any verification issues to our own domain specific error
raise InvalidTokenError("unable to verify token") from exc
if verify_exp:
exp = payload.get("exp")
if exp is not None and int(exp) < int(
datetime.now(timezone.utc).timestamp()
):
raise InvalidTokenError("token is expired")
if settings.enable_rfc8705:
if cert_thumbprint is None:
raise ValueError(
"certificate thumbprint required for mTLS per RFC 8705",
)
validate_certificate_binding(payload, cert_thumbprint)
if settings.enable_rfc9068:
if issuer is None or audience is None:
raise ValueError(
"issuer and audience required for JWT access tokens per RFC 9068",
)
from .rfc.rfc9068 import validate_rfc9068_claims
validate_rfc9068_claims(payload, issuer=issuer, audience=audience)
return payload
|
decode
decode(
token,
verify_exp=True,
issuer=None,
audience=None,
cert_thumbprint=None,
)
Source code in tigrbl_auth/jwtoken.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295 | def decode(
self,
token: str,
verify_exp: bool = True,
issuer: Optional[str] = None,
audience: Optional[Iterable[str] | str] = None,
cert_thumbprint: Optional[str] = None,
) -> Dict[str, Any]:
return _run(
self.async_decode(
token,
verify_exp=verify_exp,
issuer=issuer,
audience=audience,
cert_thumbprint=cert_thumbprint,
),
)
|
refresh
Source code in tigrbl_auth/jwtoken.py
297
298
299
300
301
302
303
304 | def refresh(self, refresh_token: str) -> Tuple[str, str]:
payload = self.decode(refresh_token)
if payload.get("typ") != "refresh":
raise ValueError("token is not a refresh token")
base_claims = {
k: v for k, v in payload.items() if k not in {"iat", "exp", "typ"}
}
return self.sign_pair(**base_claims)
|