TlsBoundJWTTokenService(
key_provider,
*,
default_issuer=None,
client_cert_der_getter=None,
client_cert_x5tS256_getter=None,
)
Bases: JWTTokenService
mTLS-bound JWTs per RFC 8705 using the 'cnf' claim:
cnf: { "x5t#S256": "" }
Verification requires access to the current request's client certificate.
Provide a getter that returns the DER bytes, or pre-computed x5t#S256.
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
32
33
34
35
36
37
38
39
40
41
42 | def __init__(
self,
key_provider: IKeyProvider,
*,
default_issuer: Optional[str] = None,
client_cert_der_getter: Optional[Callable[[], Optional[bytes]]] = None,
client_cert_x5tS256_getter: Optional[Callable[[], Optional[str]]] = None,
) -> None:
super().__init__(key_provider, default_issuer=default_issuer)
self._get_der = client_cert_der_getter
self._get_x5t = client_cert_x5tS256_getter
|
type
class-attribute
instance-attribute
type = 'TlsBoundJWTTokenService'
supports
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
| def supports(self) -> Mapping[str, Iterable[JWAAlg]]:
base = super().supports()
return {"formats": (*base["formats"], "JWT"), "algs": base["algs"]}
|
mint
async
mint(
claims,
*,
alg,
kid=None,
key_version=None,
headers=None,
lifetime_s=3600,
issuer=None,
subject=None,
audience=None,
scope=None,
)
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | async def mint(
self,
claims: Dict[str, object],
*,
alg: JWAAlg,
kid: str | None = None,
key_version: int | None = None,
headers: Optional[Dict[str, object]] = None,
lifetime_s: Optional[int] = 3600,
issuer: Optional[str] = None,
subject: Optional[str] = None,
audience: Optional[str | list[str]] = None,
scope: Optional[str] = None,
) -> str:
# Respect explicit cnf if caller already set it.
if "cnf" not in claims:
x5t = self._live_x5tS256()
if x5t:
claims = dict(claims)
claims["cnf"] = {"x5t#S256": x5t}
return await super().mint(
claims,
alg=alg,
kid=kid,
key_version=key_version,
headers=headers,
lifetime_s=lifetime_s,
issuer=issuer,
subject=subject,
audience=audience,
scope=scope,
)
|
verify
async
verify(token, *, issuer=None, audience=None, leeway_s=60)
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 | async def verify(
self,
token: str,
*,
issuer: Optional[str] = None,
audience: Optional[str | list[str]] = None,
leeway_s: int = 60,
) -> Dict[str, object]:
claims = await super().verify(
token, issuer=issuer, audience=audience, leeway_s=leeway_s
)
cnf = claims.get("cnf") if isinstance(claims, dict) else None
bound = cnf.get("x5t#S256") if isinstance(cnf, dict) else None
if not bound:
raise ValueError("mTLS-bound token missing cnf.x5t#S256")
live = self._live_x5tS256()
if not live:
raise ValueError("mTLS verification requires the live client certificate")
if live != bound:
raise ValueError("mTLS binding mismatch (x5t#S256)")
return claims
|