Skip to content

Class tigrbl_auth.jwtoken.JWTCoder

tigrbl_auth.jwtoken.JWTCoder

JWTCoder(arg1, arg2)

Stateless JWT helper backed by JWTTokenService.

JWTCoder historically accepted a private/public key pair and constructed its own :class:JWTTokenService. Recent refactoring switched the constructor to require an already configured service instance. The tests for RFC 9068 still rely on the original behaviour, so the initializer now supports both invocation styles:

JWTCoder(service, kid) -- use the provided service directly.

JWTCoder(private_key_pem, public_key_pem) -- build an ephemeral service from the PEM encoded Ed25519 key pair.

Source code in tigrbl_auth/jwtoken.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def __init__(self, arg1: JWTTokenService | bytes, arg2: str | bytes):
    if isinstance(arg1, JWTTokenService) and isinstance(arg2, str):
        self._svc = arg1
        self._kid = arg2
        return

    if isinstance(arg1, (bytes, bytearray)) and isinstance(
        arg2, (bytes, bytearray)
    ):
        kp = LocalKeyProvider()
        spec = KeySpec(
            klass=KeyClass.asymmetric,
            alg=KeyAlg.ED25519,
            uses=(KeyUse.SIGN, KeyUse.VERIFY),
            export_policy=ExportPolicy.SECRET_WHEN_ALLOWED,
            label="jwt_ed25519",
        )
        ref = _run(kp.import_key(spec, arg1, public=arg2))
        self._svc = JWTTokenService(kp)
        self._kid = ref.kid
        return

    raise TypeError(
        "JWTCoder requires (JWTTokenService, kid) or (private_pem, public_pem)"
    )

default classmethod

default()
Source code in tigrbl_auth/jwtoken.py
129
130
131
132
@classmethod
def default(cls) -> "JWTCoder":
    svc, kid = _svc()
    return cls(svc, kid)

async_sign async

async_sign(
    *,
    sub,
    tid=None,
    ttl=_ACCESS_TTL,
    typ="access",
    issuer=None,
    audience=None,
    cert_thumbprint=None,
    **extra,
)
Source code in tigrbl_auth/jwtoken.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
async def async_sign(
    self,
    *,
    sub: str,
    tid: Optional[str] = None,
    ttl: timedelta = _ACCESS_TTL,
    typ: str = "access",
    issuer: Optional[str] = None,
    audience: Optional[Iterable[str] | str] = None,
    cert_thumbprint: Optional[str] = None,
    **extra: Any,
) -> str:
    now = datetime.now(timezone.utc)
    payload: Dict[str, Any] = {
        "sub": sub,
        "typ": typ,
        "iat": int(now.timestamp()),
        "exp": int((now + ttl).timestamp()),
        **extra,
    }
    if tid is not None:
        payload["tid"] = tid
    if settings.enable_rfc8705:
        if cert_thumbprint is None:
            raise ValueError(
                "cert_thumbprint required when RFC 8705 support is enabled",
            )
        payload["cnf"] = {"x5t#S256": cert_thumbprint}
    if settings.enable_rfc9068:
        if issuer is None or audience is None:
            raise ValueError(
                "issuer and audience required when RFC 9068 support is enabled",
            )
        from .rfc.rfc9068 import add_rfc9068_claims

        payload = add_rfc9068_claims(payload, issuer=issuer, audience=audience)
    token = await self._svc.mint(
        payload,
        alg=JWAAlg.EDDSA,
        kid=self._kid,
        lifetime_s=int(ttl.total_seconds()),
        subject=sub,
        issuer=issuer,
        audience=audience,
    )
    if settings.enable_rfc7662:
        claims = {"sub": sub, "kind": typ}
        if tid is not None:
            claims["tid"] = tid
        register_token(token, claims)
    return token

sign

sign(
    *,
    sub,
    tid=None,
    ttl=_ACCESS_TTL,
    typ="access",
    issuer=None,
    audience=None,
    cert_thumbprint=None,
    **extra,
)
Source code in tigrbl_auth/jwtoken.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def sign(
    self,
    *,
    sub: str,
    tid: Optional[str] = None,
    ttl: timedelta = _ACCESS_TTL,
    typ: str = "access",
    issuer: Optional[str] = None,
    audience: Optional[Iterable[str] | str] = None,
    cert_thumbprint: Optional[str] = None,
    **extra: Any,
) -> str:
    return _run(
        self.async_sign(
            sub=sub,
            tid=tid,
            ttl=ttl,
            typ=typ,
            issuer=issuer,
            audience=audience,
            cert_thumbprint=cert_thumbprint,
            **extra,
        ),
    )

async_sign_pair async

async_sign_pair(*, sub, tid, cert_thumbprint=None, **extra)
Source code in tigrbl_auth/jwtoken.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def async_sign_pair(
    self, *, sub: str, tid: str, cert_thumbprint: Optional[str] = None, **extra: Any
) -> Tuple[str, str]:
    access = await self.async_sign(
        sub=sub, tid=tid, cert_thumbprint=cert_thumbprint, **extra
    )
    refresh = await self.async_sign(
        sub=sub,
        tid=tid,
        ttl=_REFRESH_TTL,
        typ="refresh",
        cert_thumbprint=cert_thumbprint,
        **extra,
    )
    return access, refresh

sign_pair

sign_pair(*, sub, tid, cert_thumbprint=None, **extra)
Source code in tigrbl_auth/jwtoken.py
228
229
230
231
232
233
234
235
236
237
238
def sign_pair(
    self, *, sub: str, tid: str, cert_thumbprint: Optional[str] = None, **extra: Any
) -> Tuple[str, str]:
    return _run(
        self.async_sign_pair(
            sub=sub,
            tid=tid,
            cert_thumbprint=cert_thumbprint,
            **extra,
        ),
    )

async_decode async

async_decode(
    token,
    verify_exp=True,
    issuer=None,
    audience=None,
    cert_thumbprint=None,
)
Source code in tigrbl_auth/jwtoken.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def async_decode(
    self,
    token: str,
    verify_exp: bool = True,
    issuer: Optional[str] = None,
    audience: Optional[Iterable[str] | str] = None,
    cert_thumbprint: Optional[str] = None,
) -> Dict[str, Any]:
    if is_revoked(token):
        raise InvalidTokenError("token has been revoked")
    if _header_alg(token) in {"", "none"}:
        raise InvalidTokenError("unsigned JWTs are not accepted")
    try:
        payload = await self._svc.verify(token, issuer=issuer, audience=audience)
    except Exception as exc:
        # Delegate any verification issues to our own domain specific error
        raise InvalidTokenError("unable to verify token") from exc
    if verify_exp:
        exp = payload.get("exp")
        if exp is not None and int(exp) < int(
            datetime.now(timezone.utc).timestamp()
        ):
            raise InvalidTokenError("token is expired")
    if settings.enable_rfc8705:
        if cert_thumbprint is None:
            raise ValueError(
                "certificate thumbprint required for mTLS per RFC 8705",
            )
        validate_certificate_binding(payload, cert_thumbprint)
    if settings.enable_rfc9068:
        if issuer is None or audience is None:
            raise ValueError(
                "issuer and audience required for JWT access tokens per RFC 9068",
            )
        from .rfc.rfc9068 import validate_rfc9068_claims

        validate_rfc9068_claims(payload, issuer=issuer, audience=audience)
    return payload

decode

decode(
    token,
    verify_exp=True,
    issuer=None,
    audience=None,
    cert_thumbprint=None,
)
Source code in tigrbl_auth/jwtoken.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def decode(
    self,
    token: str,
    verify_exp: bool = True,
    issuer: Optional[str] = None,
    audience: Optional[Iterable[str] | str] = None,
    cert_thumbprint: Optional[str] = None,
) -> Dict[str, Any]:
    return _run(
        self.async_decode(
            token,
            verify_exp=verify_exp,
            issuer=issuer,
            audience=audience,
            cert_thumbprint=cert_thumbprint,
        ),
    )

refresh

refresh(refresh_token)
Source code in tigrbl_auth/jwtoken.py
297
298
299
300
301
302
303
304
def refresh(self, refresh_token: str) -> Tuple[str, str]:
    payload = self.decode(refresh_token)
    if payload.get("typ") != "refresh":
        raise ValueError("token is not a refresh token")
    base_claims = {
        k: v for k, v in payload.items() if k not in {"iat", "exp", "typ"}
    }
    return self.sign_pair(**base_claims)