Skip to content

Class swarmauri_tokens_jwt.JWTTokenService.JWTTokenService

swarmauri_tokens_jwt.JWTTokenService.JWTTokenService

JWTTokenService(key_provider, *, default_issuer=None)

Bases: TokenServiceBase

JWS/JWT issuer and verifier backed by a key provider.

key_provider (IKeyProvider): Source of signing keys. default_issuer (str): Default issuer claim for minted tokens.

Initialize the token service.

key_provider (IKeyProvider): Provider used to fetch signing keys. default_issuer (str): Optional issuer applied when minting tokens.

Source code in swarmauri_tokens_jwt/JWTTokenService.py
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self, key_provider: IKeyProvider, *, default_issuer: Optional[str] = None
) -> None:
    """Initialize the token service.

    key_provider (IKeyProvider): Provider used to fetch signing keys.
    default_issuer (str): Optional issuer applied when minting tokens.
    """

    super().__init__()
    self._kp = key_provider
    self._iss = default_issuer
    self._jws = JwsSignerVerifier()

type class-attribute instance-attribute

type = 'JWTTokenService'

supports

supports()

Describe supported formats and algorithms.

RETURNS (Mapping[str, Iterable[JWAAlg]]): Supported formats and algorithms for minting and verification.

Source code in swarmauri_tokens_jwt/JWTTokenService.py
78
79
80
81
82
83
84
85
def supports(self) -> Mapping[str, Iterable[JWAAlg]]:
    """Describe supported formats and algorithms.

    RETURNS (Mapping[str, Iterable[JWAAlg]]): Supported formats and
        algorithms for minting and verification.
    """

    return {"formats": ("JWT", "JWS"), "algs": tuple(ALG_MAP_SIGN.keys())}

mint async

mint(
    claims,
    *,
    alg,
    kid=None,
    key_version=None,
    headers=None,
    lifetime_s=3600,
    issuer=None,
    subject=None,
    audience=None,
    scope=None,
)

Mint a signed JWT.

claims (Dict[str, Any]): Claims to embed in the token. alg (JWAAlg): Signing algorithm. kid (str): Identifier of the signing key. key_version (int): Optional version of the signing key. headers (Dict[str, Any]): Extra protected headers. lifetime_s (int): Token lifetime in seconds. issuer (str): Issuer claim to include. subject (str): Subject claim to include. audience (str or list[str]): Audience claim. scope (str): Scope claim.

RETURNS (str): Encoded JWT string.

RAISES (ValueError): If no kid is provided. RAISES (RuntimeError): If signing key material cannot be exported.

Source code in swarmauri_tokens_jwt/JWTTokenService.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def mint(
    self,
    claims: Dict[str, Any],
    *,
    alg: JWAAlg,
    kid: str | None = None,
    key_version: int | None = None,
    headers: Optional[Dict[str, Any]] = None,
    lifetime_s: Optional[int] = 3600,
    issuer: Optional[str] = None,
    subject: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    scope: Optional[str] = None,
) -> str:
    """Mint a signed JWT.

    claims (Dict[str, Any]): Claims to embed in the token.
    alg (JWAAlg): Signing algorithm.
    kid (str): Identifier of the signing key.
    key_version (int): Optional version of the signing key.
    headers (Dict[str, Any]): Extra protected headers.
    lifetime_s (int): Token lifetime in seconds.
    issuer (str): Issuer claim to include.
    subject (str): Subject claim to include.
    audience (str or list[str]): Audience claim.
    scope (str): Scope claim.

    RETURNS (str): Encoded JWT string.

    RAISES (ValueError): If no ``kid`` is provided.
    RAISES (RuntimeError): If signing key material cannot be exported.
    """

    now = int(time.time())
    payload = dict(claims)
    payload.setdefault("iat", now)
    payload.setdefault("nbf", now)
    if lifetime_s:
        payload.setdefault("exp", now + int(lifetime_s))
    if issuer or self._iss:
        payload.setdefault("iss", issuer or self._iss)
    if subject:
        payload.setdefault("sub", subject)
    if audience:
        payload.setdefault("aud", audience)
    if scope:
        payload.setdefault("scope", scope)

    if not kid:
        raise ValueError("mint requires 'kid' of a signing key")
    ref = await self._kp.get_key(kid, key_version, include_secret=True)
    if ref.material is None:
        raise RuntimeError("Signing key is not exportable under current policy")

    headers = dict(headers or {})
    headers.setdefault("kid", f"{ref.kid}.{ref.version}")

    key = _ref_to_signing_key(ref, alg)
    return await self._jws.sign_compact(
        payload=payload,
        alg=alg,
        key=key,
        kid=headers["kid"],
        header_extra={k: v for k, v in headers.items() if k != "kid"},
        typ="JWT",
    )

verify async

verify(token, *, issuer=None, audience=None, leeway_s=60)

Verify a JWT and return its claims.

token (str): Encoded JWT to verify. issuer (str): Expected issuer claim. audience (str or list[str]): Expected audience claim. leeway_s (int): Allowed clock skew in seconds.

RETURNS (Dict[str, Any]): Verified token claims.

RAISES (ValueError): If token is invalid or verification fails.

Source code in swarmauri_tokens_jwt/JWTTokenService.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
async def verify(
    self,
    token: str,
    *,
    issuer: Optional[str] = None,
    audience: Optional[str | list[str]] = None,
    leeway_s: int = 60,
) -> Dict[str, Any]:
    """Verify a JWT and return its claims.

    token (str): Encoded JWT to verify.
    issuer (str): Expected issuer claim.
    audience (str or list[str]): Expected audience claim.
    leeway_s (int): Allowed clock skew in seconds.

    RETURNS (Dict[str, Any]): Verified token claims.

    RAISES (ValueError): If token is invalid or verification fails.
    """

    jwks = await self._kp.jwks()

    def _resolver(kid: str | None, alg: str) -> dict[str, Any] | None:
        if not kid:
            return None
        for jwk in jwks.get("keys", []):
            if jwk.get("kid") == kid:
                return jwk
        return None

    res = await self._jws.verify_compact(
        token,
        jwks_resolver=lambda k, a: _resolver(k, a),
        alg_allowlist=self.supports()["algs"],
    )
    payload = json.loads(res.payload.decode("utf-8"))

    now = int(time.time())
    exp = payload.get("exp")
    if exp is not None and now > int(exp) + leeway_s:
        raise ValueError("token is expired")
    nbf = payload.get("nbf")
    if nbf is not None and now + leeway_s < int(nbf):
        raise ValueError("token not yet valid")
    iss_expected = issuer or self._iss
    if iss_expected and payload.get("iss") != iss_expected:
        raise ValueError("invalid issuer")
    if audience is not None:
        aud_claim = payload.get("aud")
        if isinstance(aud_claim, list):
            if audience not in aud_claim:
                raise ValueError("invalid audience")
        elif aud_claim != audience:
            raise ValueError("invalid audience")
    return payload

jwks async

jwks()

Return the JSON Web Key Set for public key discovery.

RETURNS (dict): JWKS document containing available public keys.

Source code in swarmauri_tokens_jwt/JWTTokenService.py
210
211
212
213
214
215
216
async def jwks(self) -> dict:
    """Return the JSON Web Key Set for public key discovery.

    RETURNS (dict): JWKS document containing available public keys.
    """

    return await self._kp.jwks()