Skip to content

Class swarmauri_mre_crypto_keyring.KeyringMreCrypto.KeyringMreCrypto

swarmauri_mre_crypto_keyring.KeyringMreCrypto.KeyringMreCrypto

KeyringMreCrypto(**data)

Bases: MreCryptoBase

MRE provider using multiple independent keyrings/HSMs.

Source code in swarmauri_mre_crypto_keyring/KeyringMreCrypto.py
113
114
115
116
117
118
def __init__(self, **data: Any) -> None:
    if not _CRYPTO_OK:
        raise RuntimeError(
            "KeyringMreCrypto requires 'cryptography' package. Install with: pip install cryptography"
        )
    super().__init__(**data)

type class-attribute instance-attribute

type = 'KeyringMreCrypto'

supports

supports()
Source code in swarmauri_mre_crypto_keyring/KeyringMreCrypto.py
120
121
122
123
124
125
126
def supports(self) -> Dict[str, Iterable[str | MreMode]]:
    return {
        "payload": VALID_AEAD_ALGS,
        "recipient": ("KEYRING",),
        "modes": (MreMode.SEALED_CEK_AEAD,),
        "features": ("aad", "rewrap_without_reencrypt"),
    }

encrypt_for_many async

encrypt_for_many(
    recipients,
    pt,
    *,
    payload_alg=None,
    recipient_alg=None,
    mode=None,
    aad=None,
    shared=None,
    opts=None,
)
Source code in swarmauri_mre_crypto_keyring/KeyringMreCrypto.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
async def encrypt_for_many(
    self,
    recipients: Sequence[KeyRef],
    pt: bytes,
    *,
    payload_alg: Optional[Alg] = None,
    recipient_alg: Optional[Alg] = None,
    mode: Optional[MreMode | str] = None,
    aad: Optional[bytes] = None,
    shared: Optional[Mapping[str, bytes]] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> MultiRecipientEnvelope:
    if not recipients:
        raise ValueError("encrypt_for_many: 'recipients' must be non-empty.")

    mode = MreMode(mode or MreMode.SEALED_CEK_AEAD)
    if mode is not MreMode.SEALED_CEK_AEAD:
        raise ValueError(f"Unsupported mode for KeyringMreCrypto: {mode}")

    recipient_alg = recipient_alg or "KEYRING"
    if recipient_alg != "KEYRING":
        raise ValueError("KeyringMreCrypto only supports recipient_alg='KEYRING'.")

    payload_alg = payload_alg or "AES-256-GCM"
    if payload_alg not in VALID_AEAD_ALGS:
        raise ValueError(f"Unsupported payload AEAD alg: {payload_alg}")

    quorum_k = int((opts or {}).get("quorum_k", len(recipients)))
    if not (1 <= quorum_k <= len(recipients)):
        raise ValueError("opts['quorum_k'] must satisfy 1 <= k <= len(recipients).")

    cek_len = 32
    cek = secrets.token_bytes(cek_len)
    nonce, ct = _AEAD.encrypt_with_key(cek, pt, alg=payload_alg, aad=aad)

    recipient_entries: list[dict[str, Any]] = []
    for keyref in recipients:
        client, r_context = self._extract_keyring_client(keyref)
        header = await client.wrap_cek(
            cek, context=self._default_context(shared).update_copy(r_context)
        )
        rid = self._stable_id(client)
        recipient_entries.append({"id": rid, "header": header})

    return {
        "mode": str(MreMode.SEALED_CEK_AEAD),
        "recipient_alg": "KEYRING",
        "payload": {
            "kind": "aead",
            "alg": payload_alg,
            "nonce": nonce,
            "ct": ct,
            "aad": aad,
        },
        "recipients": recipient_entries,
        "shared": dict(shared or {}),
        "meta": {"quorum_k": quorum_k, "recipients_m": len(recipient_entries)},
    }

open_for_many async

open_for_many(my_identities, env, *, aad=None, opts=None)
Source code in swarmauri_mre_crypto_keyring/KeyringMreCrypto.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
async def open_for_many(
    self,
    my_identities: Sequence[KeyRef],
    env: MultiRecipientEnvelope,
    *,
    aad: Optional[bytes] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> bytes:
    if not my_identities:
        raise ValueError("open_for_many: 'my_identities' must be non-empty.")

    self._assert_env_shape(env)
    meta = env.get("meta", {}) or {}
    quorum_k = int(meta.get("quorum_k", 1))
    recips = env["recipients"]
    if quorum_k < 1:
        raise ValueError("Envelope meta.quorum_k must be >= 1.")

    headers_by_id: Dict[str, bytes] = {r["id"]: r["header"] for r in recips}
    shared: Mapping[str, bytes] = env.get("shared") or {}
    recovered: Dict[bytes, int] = {}
    payload = env["payload"]
    payload_alg = payload["alg"]
    payload_nonce = payload["nonce"]
    payload_ct = payload["ct"]
    aad_env = payload.get("aad", None)
    aad_to_check = aad if aad is not None else aad_env
    if aad_to_check != aad_env:
        raise ValueError("AAD mismatch for AEAD mode.")

    for keyref in my_identities:
        client, id_context = self._extract_keyring_client(keyref)
        rid = self._stable_id(client)
        try_ids: Sequence[str] = (
            [rid] if rid in headers_by_id else list(headers_by_id.keys())
        )

        for hdr_id in try_ids:
            header = headers_by_id[hdr_id]
            try:
                cek = await client.unwrap_cek(
                    header,
                    context=self._default_context(shared).update_copy(id_context),
                )
                recovered[cek] = recovered.get(cek, 0) + 1
                if recovered[cek] >= quorum_k:
                    return _AEAD.decrypt_with_key(
                        cek,
                        payload_nonce,
                        payload_ct,
                        alg=payload_alg,
                        aad=aad_to_check,
                    )
            except Exception:
                continue

    raise PermissionError(f"Unable to satisfy CEK quorum (required {quorum_k}).")

open_for async

open_for(my_identity, env, *, aad=None, opts=None)
Source code in swarmauri_mre_crypto_keyring/KeyringMreCrypto.py
245
246
247
248
249
250
251
252
253
async def open_for(
    self,
    my_identity: KeyRef,
    env: MultiRecipientEnvelope,
    *,
    aad: Optional[bytes] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> bytes:
    return await self.open_for_many([my_identity], env, aad=aad, opts=opts)

rewrap async

rewrap(
    env,
    *,
    add=None,
    remove=None,
    recipient_alg=None,
    opts=None,
)
Source code in swarmauri_mre_crypto_keyring/KeyringMreCrypto.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
async def rewrap(
    self,
    env: MultiRecipientEnvelope,
    *,
    add: Optional[Sequence[KeyRef]] = None,
    remove: Optional[Sequence[RecipientId]] = None,
    recipient_alg: Optional[Alg] = None,
    opts: Optional[Mapping[str, object]] = None,
) -> MultiRecipientEnvelope:
    self._assert_env_shape(env)
    if recipient_alg not in (None, "KEYRING"):
        raise ValueError("KeyringMreCrypto only supports recipient_alg='KEYRING'.")

    add = add or []
    remove = remove or []
    shared: Mapping[str, bytes] = env.get("shared") or {}

    rotate_on_revoke = bool((opts or {}).get("rotate_payload_on_revoke", False))

    if remove or rotate_on_revoke:
        identities = (opts or {}).get("identities", None)
        if not isinstance(identities, (list, tuple)) or not identities:
            raise ValueError(
                "rewrap with removal/rotation requires opts['identities']: Sequence[KeyRef] able to meet quorum."
            )
        cek = await self._recover_cek_for_env(env, identities, shared)
    else:
        cek = await self._recover_cek_if_present(env, shared, opts)

    if remove:
        kept = [r for r in env["recipients"] if r["id"] not in set(remove)]
    else:
        kept = list(env["recipients"])

    if rotate_on_revoke:
        payload = env["payload"]
        payload_alg = payload["alg"]
        pt = _AEAD.decrypt_with_key(
            cek,
            payload["nonce"],
            payload["ct"],
            alg=payload_alg,
            aad=payload.get("aad"),
        )
        new_cek = secrets.token_bytes(len(cek))
        new_nonce, new_ct = _AEAD.encrypt_with_key(
            new_cek, pt, alg=payload_alg, aad=payload.get("aad")
        )
        env["payload"]["nonce"] = new_nonce
        env["payload"]["ct"] = new_ct
        cek = new_cek
        meta = dict(env.get("meta") or {})
        meta["rotated_at"] = meta.get("rotated_at", 0) + 1
        env["meta"] = meta

    for keyref in add:
        client, a_context = self._extract_keyring_client(keyref)
        header = await client.wrap_cek(
            cek, context=self._default_context(shared).update_copy(a_context)
        )
        rid = self._stable_id(client)
        kept.append({"id": rid, "header": header})

    env["recipients"] = kept
    return env