async def encrypt_for_many(
self,
recipients: Sequence[KeyRef],
pt: bytes,
*,
payload_alg: Optional[Alg] = None,
recipient_alg: Optional[Alg] = None,
mode: Optional[MreMode | str] = None,
aad: Optional[bytes] = None,
shared: Optional[Mapping[str, bytes]] = None,
opts: Optional[Mapping[str, object]] = None,
) -> MultiRecipientEnvelope:
if not recipients:
raise ValueError("encrypt_for_many: 'recipients' must be non-empty.")
mode = MreMode(mode or MreMode.SEALED_CEK_AEAD)
if mode is not MreMode.SEALED_CEK_AEAD:
raise ValueError(f"Unsupported mode for KeyringMreCrypto: {mode}")
recipient_alg = recipient_alg or "KEYRING"
if recipient_alg != "KEYRING":
raise ValueError("KeyringMreCrypto only supports recipient_alg='KEYRING'.")
payload_alg = payload_alg or "AES-256-GCM"
if payload_alg not in VALID_AEAD_ALGS:
raise ValueError(f"Unsupported payload AEAD alg: {payload_alg}")
quorum_k = int((opts or {}).get("quorum_k", len(recipients)))
if not (1 <= quorum_k <= len(recipients)):
raise ValueError("opts['quorum_k'] must satisfy 1 <= k <= len(recipients).")
cek_len = 32
cek = secrets.token_bytes(cek_len)
nonce, ct = _AEAD.encrypt_with_key(cek, pt, alg=payload_alg, aad=aad)
recipient_entries: list[dict[str, Any]] = []
for keyref in recipients:
client, r_context = self._extract_keyring_client(keyref)
header = await client.wrap_cek(
cek, context=self._default_context(shared).update_copy(r_context)
)
rid = self._stable_id(client)
recipient_entries.append({"id": rid, "header": header})
return {
"mode": str(MreMode.SEALED_CEK_AEAD),
"recipient_alg": "KEYRING",
"payload": {
"kind": "aead",
"alg": payload_alg,
"nonce": nonce,
"ct": ct,
"aad": aad,
},
"recipients": recipient_entries,
"shared": dict(shared or {}),
"meta": {"quorum_k": quorum_k, "recipients_m": len(recipient_entries)},
}