DPoPBoundJWTTokenService(
key_provider,
*,
default_issuer=None,
dpop_ctx_getter=None,
proof_max_age_s=300,
replay_check=None,
enforce_proof=True,
)
Bases: JWTTokenService
DPoP-bound JWTs per RFC 9449 using the 'cnf' claim with the JWK thumbprint:
cnf: { "jkt": "" }
Verification requires access to the current request's DPoP proof JWT and request
context (HTTP method + URI, optional nonce). Provide a getter that returns:
{
"proof": "",
"htm": "GET",
"htu": "https://api.example.com/resource",
"nonce": ""
}
Source code in swarmauri_tokens_dpopboundjwt/DPoPBoundJWTTokenService.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 | def __init__(
self,
key_provider: IKeyProvider,
*,
default_issuer: Optional[str] = None,
dpop_ctx_getter: Optional[Callable[[], Optional[Dict[str, object]]]] = None,
proof_max_age_s: int = 300,
replay_check: Optional[Callable[[str], bool]] = None,
enforce_proof: bool = True,
) -> None:
super().__init__(key_provider, default_issuer=default_issuer)
self._get_ctx = dpop_ctx_getter
self._max_age = int(proof_max_age_s)
# replay_check(jti) should return True if JTI is fresh (i.e., not seen before).
# If None, replay protection is skipped.
self._replay_check = replay_check
# Toggle strict RFC 9449 verification of the DPoP proof.
self._enforce = enforce_proof
|
type
class-attribute
instance-attribute
type = 'DPoPBoundJWTTokenService'
supports
Source code in swarmauri_tokens_dpopboundjwt/DPoPBoundJWTTokenService.py
| def supports(self) -> Mapping[str, Iterable[JWAAlg]]:
base = super().supports()
return {"formats": (*base["formats"], "JWT"), "algs": base["algs"]}
|
mint
async
mint(
claims,
*,
alg,
kid=None,
key_version=None,
headers=None,
lifetime_s=3600,
issuer=None,
subject=None,
audience=None,
scope=None,
)
Source code in swarmauri_tokens_dpopboundjwt/DPoPBoundJWTTokenService.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 | async def mint(
self,
claims: Dict[str, object],
*,
alg: JWAAlg,
kid: str | None = None,
key_version: int | None = None,
headers: Optional[Dict[str, object]] = None,
lifetime_s: Optional[int] = 3600,
issuer: Optional[str] = None,
subject: Optional[str] = None,
audience: Optional[str | list[str]] = None,
scope: Optional[str] = None,
) -> str:
# Respect explicit cnf if caller already set it.
if "cnf" not in claims:
# If caller wired a context getter that exposes the public JWK (e.g., during token mint),
# compute jkt automatically. Otherwise require caller to provide claims["cnf"]["jkt"].
ctx = self._get_ctx() if self._get_ctx else None
jwk = ctx.get("jwk") if isinstance(ctx, dict) else None
if isinstance(jwk, dict):
claims = dict(claims)
claims["cnf"] = {"jkt": jwk_thumbprint_sha256(jwk)}
return await super().mint(
claims,
alg=alg,
kid=kid,
key_version=key_version,
headers=headers,
lifetime_s=lifetime_s,
issuer=issuer,
subject=subject,
audience=audience,
scope=scope,
)
|
verify
async
verify(token, *, issuer=None, audience=None, leeway_s=60)
Source code in swarmauri_tokens_dpopboundjwt/DPoPBoundJWTTokenService.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199 | async def verify(
self,
token: str,
*,
issuer: Optional[str] = None,
audience: Optional[str | list[str]] = None,
leeway_s: int = 60,
) -> Dict[str, object]:
claims = await super().verify(
token, issuer=issuer, audience=audience, leeway_s=leeway_s
)
cnf = claims.get("cnf") if isinstance(claims, dict) else None
jkt = cnf.get("jkt") if isinstance(cnf, dict) else None
if self._enforce and not jkt:
raise ValueError("DPoP-bound token missing cnf.jkt")
if not self._enforce:
return claims
if not self._get_ctx:
raise ValueError("DPoP verification requires a context getter")
ctx = self._get_ctx() or {}
proof_jwt = ctx.get("proof")
htm = ctx.get("htm")
htu = ctx.get("htu")
nonce = ctx.get("nonce")
if (
not isinstance(proof_jwt, str)
or not isinstance(htm, str)
or not isinstance(htu, str)
):
raise ValueError("Missing DPoP context (proof, htm, htu)")
# 1) Extract JWK from the DPoP proof header and verify the proof signature
hdr = jwt.get_unverified_header(proof_jwt)
if hdr.get("typ") != "dpop+jwt":
raise ValueError("DPoP proof header 'typ' must be 'dpop+jwt'")
jwk = hdr.get("jwk")
if not isinstance(jwk, dict):
raise ValueError("DPoP proof missing 'jwk' in header")
key = None
if jwk.get("kty") == "RSA":
key = algorithms.RSAAlgorithm.from_jwk(jwk)
elif jwk.get("kty") == "EC":
key = algorithms.ECAlgorithm.from_jwk(jwk)
elif jwk.get("kty") == "OKP":
key = algorithms.OKPAlgorithm.from_jwk(jwk)
else:
raise ValueError("Unsupported DPoP JWK kty")
proof_claims = jwt.decode(
proof_jwt,
key=key,
algorithms=[
alg.value
for alg in (JWAAlg.RS256, JWAAlg.PS256, JWAAlg.ES256, JWAAlg.EDDSA)
],
options={"verify_aud": False, "verify_iss": False},
)
# 2) Check the JWK thumbprint matches the token's cnf.jkt
thumb = jwk_thumbprint_sha256(jwk)
if thumb != jkt:
raise ValueError("DPoP binding mismatch (cnf.jkt != proof JWK thumbprint)")
# 3) Validate method, URL, iat, nonce (if present)
if proof_claims.get("htm") != htm:
raise ValueError("DPoP proof 'htm' mismatch")
if proof_claims.get("htu") != htu:
raise ValueError("DPoP proof 'htu' mismatch")
iat = int(proof_claims.get("iat", 0))
now = int(time.time())
if not (now - self._max_age <= iat <= now + leeway_s):
raise ValueError("DPoP proof 'iat' out of acceptable window")
if nonce is not None and proof_claims.get("nonce") != nonce:
raise ValueError("DPoP proof 'nonce' mismatch")
jti = proof_claims.get("jti")
if not isinstance(jti, str):
raise ValueError("DPoP proof missing 'jti'")
# 4) Replay protection (optional but recommended): require fresh JTI
if self._replay_check and not self._replay_check(jti):
raise ValueError("DPoP proof replay detected")
return claims
|