Skip to content

Class swarmauri_tokens_tlsboundjwt.TlsBoundJWTTokenService.TlsBoundJWTTokenService

swarmauri_tokens_tlsboundjwt.TlsBoundJWTTokenService.TlsBoundJWTTokenService

TlsBoundJWTTokenService(
    key_provider,
    *,
    default_issuer=None,
    client_cert_der_getter=None,
    client_cert_x5tS256_getter=None,
)

Bases: JWTTokenService

mTLS-bound JWTs per RFC 8705 using the 'cnf' claim: cnf: { "x5t#S256": "" }

Verification requires access to the current request's client certificate. Provide a getter that returns the DER bytes, or pre-computed x5t#S256.

Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    key_provider: IKeyProvider,
    *,
    default_issuer: Optional[str] = None,
    client_cert_der_getter: Optional[Callable[[], Optional[bytes]]] = None,
    client_cert_x5tS256_getter: Optional[Callable[[], Optional[str]]] = None,
) -> None:
    super().__init__(key_provider, default_issuer=default_issuer)
    self._get_der = client_cert_der_getter
    self._get_x5t = client_cert_x5tS256_getter

type class-attribute instance-attribute

type = 'TlsBoundJWTTokenService'

supports

supports()
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
44
45
46
def supports(self) -> Mapping[str, Iterable[JWAAlg]]:
    base = super().supports()
    return {"formats": (*base["formats"], "JWT"), "algs": base["algs"]}

mint async

mint(
    claims,
    *,
    alg,
    kid=None,
    key_version=None,
    headers=None,
    lifetime_s=3600,
    issuer=None,
    subject=None,
    audience=None,
    scope=None,
)
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async def mint(
    self,
    claims: Dict[str, object],
    *,
    alg: JWAAlg,
    kid: str | None = None,
    key_version: int | None = None,
    headers: Optional[Dict[str, object]] = None,
    lifetime_s: Optional[int] = 3600,
    issuer: Optional[str] = None,
    subject: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    scope: Optional[str] = None,
) -> str:
    # Respect explicit cnf if caller already set it.
    if "cnf" not in claims:
        x5t = self._live_x5tS256()
        if x5t:
            claims = dict(claims)
            claims["cnf"] = {"x5t#S256": x5t}
    return await super().mint(
        claims,
        alg=alg,
        kid=kid,
        key_version=key_version,
        headers=headers,
        lifetime_s=lifetime_s,
        issuer=issuer,
        subject=subject,
        audience=audience,
        scope=scope,
    )

verify async

verify(token, *, issuer=None, audience=None, leeway_s=60)
Source code in swarmauri_tokens_tlsboundjwt/TlsBoundJWTTokenService.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def verify(
    self,
    token: str,
    *,
    issuer: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    leeway_s: int = 60,
) -> Dict[str, object]:
    claims = await super().verify(
        token, issuer=issuer, audience=audience, leeway_s=leeway_s
    )
    cnf = claims.get("cnf") if isinstance(claims, dict) else None
    bound = cnf.get("x5t#S256") if isinstance(cnf, dict) else None
    if not bound:
        raise ValueError("mTLS-bound token missing cnf.x5t#S256")

    live = self._live_x5tS256()
    if not live:
        raise ValueError("mTLS verification requires the live client certificate")
    if live != bound:
        raise ValueError("mTLS binding mismatch (x5t#S256)")
    return claims