async def create_csr(
self,
key: KeyRef,
subject: SubjectSpec,
*,
san: Optional[AltNameSpec] = None,
extensions: Optional[CertExtensionSpec] = None,
sig_alg: Optional[str] = None,
challenge_password: Optional[str] = None,
output_der: bool = False,
opts: Optional[Dict[str, Any]] = None,
) -> CsrBytes:
"""Build a PKCS#10 CSR from a KeyRef containing a private key PEM."""
if key.material is None:
raise ValueError(
"KeyRef.material with private key PEM is required for CSR creation"
)
sk = serialization.load_pem_private_key(key.material, password=None)
name_attrs = []
if "CN" in subject:
name_attrs.append(x509.NameAttribute(NameOID.COMMON_NAME, subject["CN"]))
if "C" in subject:
name_attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, subject["C"]))
if "ST" in subject:
name_attrs.append(
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, subject["ST"])
)
if "L" in subject:
name_attrs.append(x509.NameAttribute(NameOID.LOCALITY_NAME, subject["L"]))
if "O" in subject:
name_attrs.append(
x509.NameAttribute(NameOID.ORGANIZATION_NAME, subject["O"])
)
if "OU" in subject:
name_attrs.append(
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, subject["OU"])
)
if "emailAddress" in subject:
name_attrs.append(
x509.NameAttribute(NameOID.EMAIL_ADDRESS, subject["emailAddress"])
)
csr_builder = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name(name_attrs)
)
if san:
san_entries = []
for d in san.get("dns", []):
san_entries.append(x509.DNSName(d))
for i in san.get("ip", []):
san_entries.append(x509.IPAddress(i))
for u in san.get("uri", []):
san_entries.append(x509.UniformResourceIdentifier(u))
for e in san.get("email", []):
san_entries.append(x509.RFC822Name(e))
if san_entries:
csr_builder = csr_builder.add_extension(
x509.SubjectAlternativeName(san_entries), critical=False
)
if challenge_password:
csr_builder = csr_builder.add_attribute(
x509.OID_PKCS9_CHALLENGE_PASSWORD, challenge_password.encode()
)
if extensions and "basic_constraints" in extensions:
bc = extensions["basic_constraints"]
csr_builder = csr_builder.add_extension(
x509.BasicConstraints(
ca=bc.get("ca", False), path_length=bc.get("path_len")
),
critical=True,
)
sig = (
None
if isinstance(sk, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey))
else hashes.SHA256()
)
csr = csr_builder.sign(private_key=sk, algorithm=sig)
return csr.public_bytes(
serialization.Encoding.DER if output_der else serialization.Encoding.PEM
)