Skip to content

Class swarmauri_signing_pgp.pgp_signer.PgpEnvelopeSigner

swarmauri_signing_pgp.pgp_signer.PgpEnvelopeSigner

Bases: SigningBase

Generate and verify detached OpenPGP signatures.

The signer operates on raw byte payloads or structured envelopes that are canonicalized to JSON or CBOR prior to signing.

supports

supports()

Describe the algorithms and canonicalizations supported by the signer.

RETURNS DESCRIPTION
Mapping[str, Iterable[str]]

Mapping[str, Iterable[str]]: Mapping of capability names to supported values.

Source code in swarmauri_signing_pgp/pgp_signer.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def supports(self) -> Mapping[str, Iterable[str]]:
    """Describe the algorithms and canonicalizations supported by the signer.

    Returns:
        Mapping[str, Iterable[str]]: Mapping of capability names to supported values.
    """

    canons = ("json", "cbor") if _CBOR_OK else ("json",)
    structured = tuple(f"structured-{canon}" for canon in canons)
    return {
        "signs": ("bytes", "digest", "envelope", "stream"),
        "verifies": ("bytes", "digest", "envelope", "stream"),
        "envelopes": ("mapping", "detached-bytes", *structured),
        "algs": ("OpenPGP",),
        "canons": canons,
        "features": ("multi", "detached_only"),
    }

sign_bytes async

sign_bytes(key, payload, *, alg=None, opts=None)

Create a detached OpenPGP signature for raw bytes.

PARAMETER DESCRIPTION
key

Reference to the private key used for signing.

TYPE: KeyRef

payload

Data to sign.

TYPE: bytes

alg

Requested algorithm, defaults to "OpenPGP".

TYPE: Optional[Alg] DEFAULT: None

opts

Additional options such as passphrase for locked keys.

TYPE: Optional[Mapping[str, object]] DEFAULT: None

RETURNS DESCRIPTION
Sequence[Signature]

Sequence[Signature]: A list containing the generated signature.

RAISES DESCRIPTION
RuntimeError

If the private key is locked and no passphrase is provided.

ValueError

If an unsupported algorithm is requested.

Source code in swarmauri_signing_pgp/pgp_signer.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
async def sign_bytes(
    self,
    key: KeyRef,
    payload: bytes,
    *,
    alg: Optional[Alg] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> Sequence[Signature]:
    """Create a detached OpenPGP signature for raw bytes.

    Args:
        key (KeyRef): Reference to the private key used for signing.
        payload (bytes): Data to sign.
        alg (Optional[Alg]): Requested algorithm, defaults to ``"OpenPGP"``.
        opts (Optional[Mapping[str, object]]): Additional options such as
            ``passphrase`` for locked keys.

    Returns:
        Sequence[Signature]: A list containing the generated signature.

    Raises:
        RuntimeError: If the private key is locked and no passphrase is provided.
        ValueError: If an unsupported algorithm is requested.
    """

    _ensure_pgpy()
    if alg not in (None, "OpenPGP"):
        raise ValueError("Unsupported alg for PgpEnvelopeSigner.")

    k = self._load_private_key(key, opts)
    must_unlock = getattr(k, "is_unlocked", True) is False
    if must_unlock:
        pw = (opts or {}).get("passphrase")
        if not isinstance(pw, (str, bytes)):
            raise RuntimeError(
                "PGP private key is locked; supply opts['passphrase']."
            )
        k.unlock(pw)

    sig = k.sign(payload, detached=True)
    kid = str(k.fingerprint)
    sig_bytes = bytes(sig.__bytes__())
    sig_asc = str(sig)
    return [
        _Sig({"alg": "OpenPGP", "kid": kid, "sig": sig_bytes, "armored": sig_asc})
    ]

verify_bytes async

verify_bytes(
    payload, signatures, *, require=None, opts=None
)

Verify detached OpenPGP signatures against raw bytes.

PARAMETER DESCRIPTION
payload

Signed data.

TYPE: bytes

signatures

Signatures to validate.

TYPE: Sequence[Signature]

require

Verification requirements such as min_signers.

TYPE: Optional[Mapping[str, object]] DEFAULT: None

opts

Options containing public keys in pubkeys.

TYPE: Optional[Mapping[str, object]] DEFAULT: None

RETURNS DESCRIPTION
bool

True if the signatures satisfy the requirements, False otherwise.

TYPE: bool

RAISES DESCRIPTION
TypeError

If an unsupported public key is supplied in opts.

Source code in swarmauri_signing_pgp/pgp_signer.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
async def verify_bytes(
    self,
    payload: bytes,
    signatures: Sequence[Signature],
    *,
    require: Optional[Mapping[str, object]] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> bool:
    """Verify detached OpenPGP signatures against raw bytes.

    Args:
        payload (bytes): Signed data.
        signatures (Sequence[Signature]): Signatures to validate.
        require (Optional[Mapping[str, object]]): Verification requirements such
            as ``min_signers``.
        opts (Optional[Mapping[str, object]]): Options containing public keys in
            ``pubkeys``.

    Returns:
        bool: ``True`` if the signatures satisfy the requirements, ``False`` otherwise.

    Raises:
        TypeError: If an unsupported public key is supplied in ``opts``.
    """

    _ensure_pgpy()
    min_signers = int(require.get("min_signers", 1)) if require else 1

    pubkeys = []
    if opts and "pubkeys" in opts:
        for entry in opts["pubkeys"]:  # type: ignore[index]
            if isinstance(entry, pgpy.PGPKey):
                pubkeys.append(entry)
            elif isinstance(entry, str):
                pubkeys.append(pgpy.PGPKey.from_blob(entry)[0])
            else:
                raise TypeError("Unsupported public key in opts['pubkeys'].")

    accepted = 0
    for sig in signatures:
        if sig.get("alg") != "OpenPGP":
            continue
        sig_bytes = sig.get("sig")
        sig_arm = sig.get("armored")
        if isinstance(sig_bytes, (bytes, bytearray)):
            s = pgpy.PGPSignature.from_blob(bytes(sig_bytes))
        elif isinstance(sig_arm, str):
            s = pgpy.PGPSignature.from_blob(sig_arm)
        else:
            continue

        ok_one = False
        for pk in pubkeys:
            if pk.verify(payload, s):
                ok_one = True
                break
        if ok_one:
            accepted += 1
        if accepted >= min_signers:
            return True
    return False

sign_digest async

sign_digest(key, digest, *, alg=None, opts=None)
Source code in swarmauri_signing_pgp/pgp_signer.py
252
253
254
255
256
257
258
259
260
async def sign_digest(
    self,
    key: KeyRef,
    digest: bytes,
    *,
    alg: Optional[Alg] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> Sequence[Signature]:
    return await self.sign_bytes(key, digest, alg=alg, opts=opts)

verify_digest async

verify_digest(
    digest, signatures, *, require=None, opts=None
)
Source code in swarmauri_signing_pgp/pgp_signer.py
262
263
264
265
266
267
268
269
270
async def verify_digest(
    self,
    digest: bytes,
    signatures: Sequence[Signature],
    *,
    require: Optional[Mapping[str, object]] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> bool:
    return await self.verify_bytes(digest, signatures, require=require, opts=opts)

canonicalize_envelope async

canonicalize_envelope(env, *, canon=None, opts=None)

Canonicalize an envelope to bytes.

PARAMETER DESCRIPTION
env

Envelope to canonicalize.

TYPE: Envelope

canon

Canonicalization format, "json" or "cbor".

TYPE: Optional[Canon] DEFAULT: None

opts

Additional options (unused).

TYPE: Optional[Mapping[str, object]] DEFAULT: None

RETURNS DESCRIPTION
bytes

Canonical representation of the envelope.

TYPE: bytes

RAISES DESCRIPTION
ValueError

If an unsupported canonicalization format is provided.

RuntimeError

If CBOR canonicalization is requested without cbor2.

Source code in swarmauri_signing_pgp/pgp_signer.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def canonicalize_envelope(
    self,
    env: Envelope,
    *,
    canon: Optional[Canon] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> bytes:
    """Canonicalize an envelope to bytes.

    Args:
        env (Envelope): Envelope to canonicalize.
        canon (Optional[Canon]): Canonicalization format, ``"json"`` or ``"cbor"``.
        opts (Optional[Mapping[str, object]]): Additional options (unused).

    Returns:
        bytes: Canonical representation of the envelope.

    Raises:
        ValueError: If an unsupported canonicalization format is provided.
        RuntimeError: If CBOR canonicalization is requested without ``cbor2``.
    """

    if canon in (None, "json"):
        return _canon_json(env)  # type: ignore[arg-type]
    if canon == "cbor":
        return _canon_cbor(env)  # type: ignore[arg-type]
    raise ValueError(f"Unsupported canon: {canon}")

sign_envelope async

sign_envelope(key, env, *, alg=None, canon=None, opts=None)

Sign a structured envelope after canonicalization.

PARAMETER DESCRIPTION
key

Private key reference.

TYPE: KeyRef

env

Envelope to sign.

TYPE: Envelope

alg

Requested algorithm, defaults to "OpenPGP".

TYPE: Optional[Alg] DEFAULT: None

canon

Canonicalization format.

TYPE: Optional[Canon] DEFAULT: None

opts

Additional options.

TYPE: Optional[Mapping[str, object]] DEFAULT: None

RETURNS DESCRIPTION
Sequence[Signature]

Sequence[Signature]: Detached signature over the canonicalized envelope.

Source code in swarmauri_signing_pgp/pgp_signer.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
async def sign_envelope(
    self,
    key: KeyRef,
    env: Envelope,
    *,
    alg: Optional[Alg] = None,
    canon: Optional[Canon] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> Sequence[Signature]:
    """Sign a structured envelope after canonicalization.

    Args:
        key (KeyRef): Private key reference.
        env (Envelope): Envelope to sign.
        alg (Optional[Alg]): Requested algorithm, defaults to ``"OpenPGP"``.
        canon (Optional[Canon]): Canonicalization format.
        opts (Optional[Mapping[str, object]]): Additional options.

    Returns:
        Sequence[Signature]: Detached signature over the canonicalized envelope.
    """

    payload = await self.canonicalize_envelope(env, canon=canon, opts=opts)
    return await self.sign_bytes(key, payload, alg="OpenPGP", opts=opts)

verify_envelope async

verify_envelope(
    env, signatures, *, canon=None, require=None, opts=None
)

Verify signatures over a structured envelope.

PARAMETER DESCRIPTION
env

Envelope whose signatures are being verified.

TYPE: Envelope

signatures

Signatures to check.

TYPE: Sequence[Signature]

canon

Canonicalization format used.

TYPE: Optional[Canon] DEFAULT: None

require

Verification requirements.

TYPE: Optional[Mapping[str, object]] DEFAULT: None

opts

Additional options.

TYPE: Optional[Mapping[str, object]] DEFAULT: None

RETURNS DESCRIPTION
bool

True if verification succeeds, False otherwise.

TYPE: bool

Source code in swarmauri_signing_pgp/pgp_signer.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
async def verify_envelope(
    self,
    env: Envelope,
    signatures: Sequence[Signature],
    *,
    canon: Optional[Canon] = None,
    require: Optional[Mapping[str, object]] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> bool:
    """Verify signatures over a structured envelope.

    Args:
        env (Envelope): Envelope whose signatures are being verified.
        signatures (Sequence[Signature]): Signatures to check.
        canon (Optional[Canon]): Canonicalization format used.
        require (Optional[Mapping[str, object]]): Verification requirements.
        opts (Optional[Mapping[str, object]]): Additional options.

    Returns:
        bool: ``True`` if verification succeeds, ``False`` otherwise.
    """

    payload = await self.canonicalize_envelope(env, canon=canon, opts=opts)
    return await self.verify_bytes(payload, signatures, require=require, opts=opts)