Skip to content

Class swarmauri_tokens_dpopboundjwt.JWTTokenService.JWTTokenService

swarmauri_tokens_dpopboundjwt.JWTTokenService.JWTTokenService

JWTTokenService(key_provider, *, default_issuer=None)

Basic JWT minting and verification service.

Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
17
18
19
20
21
def __init__(
    self, key_provider: IKeyProvider, *, default_issuer: Optional[str] = None
) -> None:
    self._keys = key_provider
    self._issuer = default_issuer

type class-attribute instance-attribute

type = 'JWTTokenService'

supports

supports()
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
23
24
25
26
27
28
29
30
31
32
33
def supports(self) -> Mapping[str, Iterable[JWAAlg]]:
    return {
        "formats": ("JWT",),
        "algs": (
            JWAAlg.HS256,
            JWAAlg.RS256,
            JWAAlg.PS256,
            JWAAlg.ES256,
            JWAAlg.EDDSA,
        ),
    }

mint async

mint(
    claims,
    *,
    alg,
    kid=None,
    key_version=None,
    headers=None,
    lifetime_s=3600,
    issuer=None,
    subject=None,
    audience=None,
    scope=None,
)
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def mint(
    self,
    claims: dict[str, object],
    *,
    alg: JWAAlg,
    kid: str | None = None,
    key_version: int | None = None,
    headers: Optional[dict[str, object]] = None,
    lifetime_s: Optional[int] = 3600,
    issuer: Optional[str] = None,
    subject: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    scope: Optional[str] = None,
) -> str:
    now = int(time.time())
    payload = dict(claims)
    if issuer or self._issuer:
        payload["iss"] = issuer or self._issuer
    if subject:
        payload["sub"] = subject
    if audience:
        payload["aud"] = audience
    if scope:
        payload["scope"] = scope
    if lifetime_s is not None:
        payload["iat"] = now
        payload["exp"] = now + int(lifetime_s)

    kid = kid or "default"
    keyref = await self._keys.get_key(kid, version=key_version, include_secret=True)
    secret = keyref.material or b""
    return jwt.encode(
        payload,
        secret,
        algorithm=alg.value,
        headers={"kid": kid, **(headers or {})},
    )

verify async

verify(token, *, issuer=None, audience=None, leeway_s=60)
Source code in swarmauri_tokens_dpopboundjwt/JWTTokenService.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def verify(
    self,
    token: str,
    *,
    issuer: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    leeway_s: int = 60,
) -> dict[str, object]:
    hdr = jwt.get_unverified_header(token)
    alg = JWAAlg(hdr.get("alg"))
    kid = hdr.get("kid", "default")
    keyref = await self._keys.get_key(kid, include_secret=True)
    secret = keyref.material or b""
    return jwt.decode(
        token,
        key=secret,
        algorithms=[alg.value],
        issuer=issuer or self._issuer,
        audience=audience,
        leeway=leeway_s,
    )