Basic JWT minting and verification service.
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
| def __init__(
self, key_provider: IKeyProvider, *, default_issuer: Optional[str] = None
) -> None:
self._keys = key_provider
self._issuer = default_issuer
|
type
class-attribute
instance-attribute
supports
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
23
24
25
26
27
28
29
30
31
32
33 | def supports(self) -> Mapping[str, Iterable[JWAAlg]]:
return {
"formats": ("JWT",),
"algs": (
JWAAlg.HS256,
JWAAlg.RS256,
JWAAlg.PS256,
JWAAlg.ES256,
JWAAlg.EDDSA,
),
}
|
mint
async
mint(
claims,
*,
alg,
kid=None,
key_version=None,
headers=None,
lifetime_s=3600,
issuer=None,
subject=None,
audience=None,
scope=None,
)
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 | async def mint(
self,
claims: dict[str, object],
*,
alg: JWAAlg,
kid: str | None = None,
key_version: int | None = None,
headers: Optional[dict[str, object]] = None,
lifetime_s: Optional[int] = 3600,
issuer: Optional[str] = None,
subject: Optional[str] = None,
audience: Optional[str | list[str]] = None,
scope: Optional[str] = None,
) -> str:
now = int(time.time())
payload = dict(claims)
if issuer or self._issuer:
payload["iss"] = issuer or self._issuer
if subject:
payload["sub"] = subject
if audience:
payload["aud"] = audience
if scope:
payload["scope"] = scope
if lifetime_s is not None:
payload["iat"] = now
payload["exp"] = now + int(lifetime_s)
kid = kid or "default"
keyref = await self._keys.get_key(kid, version=key_version, include_secret=True)
secret = keyref.material or b""
return jwt.encode(
payload,
secret,
algorithm=alg.value,
headers={"kid": kid, **(headers or {})},
)
|
verify
async
verify(token, *, issuer=None, audience=None, leeway_s=60)
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 | async def verify(
self,
token: str,
*,
issuer: Optional[str] = None,
audience: Optional[str | list[str]] = None,
leeway_s: int = 60,
) -> dict[str, object]:
hdr = jwt.get_unverified_header(token)
alg = JWAAlg(hdr.get("alg"))
kid = hdr.get("kid", "default")
keyref = await self._keys.get_key(kid, include_secret=True)
secret = keyref.material or b""
return jwt.decode(
token,
key=secret,
algorithms=[alg.value],
issuer=issuer or self._issuer,
audience=audience,
leeway=leeway_s,
)
|