Skip to content

Class swarmauri_certs_csronly.CsrOnlyService.CsrOnlyService

swarmauri_certs_csronly.CsrOnlyService.CsrOnlyService

Bases: CertServiceBase

Service that only builds PKCS#10 CSRs.

Implements CSR generation as defined in RFC 2986 and leverages X.509 naming and extension rules from RFC 5280. This service does not issue or verify certificates.

type class-attribute instance-attribute

type = 'CsrOnlyService'

supports

supports()
Source code in swarmauri_certs_csronly/CsrOnlyService.py
30
31
32
33
34
35
def supports(self) -> Mapping[str, Iterable[str]]:
    return {
        "key_algs": ("RSA-2048", "RSA-3072", "RSA-4096", "EC-P256", "Ed25519"),
        "sig_algs": ("RSA-PSS-SHA256", "ECDSA-P256-SHA256", "Ed25519"),
        "features": ("csr",),
    }

create_csr async

create_csr(
    key,
    subject,
    *,
    san=None,
    extensions=None,
    sig_alg=None,
    challenge_password=None,
    output_der=False,
    opts=None,
)

Build a PKCS#10 CSR from a KeyRef containing a private key PEM.

Source code in swarmauri_certs_csronly/CsrOnlyService.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def create_csr(
    self,
    key: KeyRef,
    subject: SubjectSpec,
    *,
    san: Optional[AltNameSpec] = None,
    extensions: Optional[CertExtensionSpec] = None,
    sig_alg: Optional[str] = None,
    challenge_password: Optional[str] = None,
    output_der: bool = False,
    opts: Optional[Dict[str, Any]] = None,
) -> CsrBytes:
    """Build a PKCS#10 CSR from a KeyRef containing a private key PEM."""
    if key.material is None:
        raise ValueError(
            "KeyRef.material with private key PEM is required for CSR creation"
        )

    sk = serialization.load_pem_private_key(key.material, password=None)

    name_attrs = []
    if "CN" in subject:
        name_attrs.append(x509.NameAttribute(NameOID.COMMON_NAME, subject["CN"]))
    if "C" in subject:
        name_attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, subject["C"]))
    if "ST" in subject:
        name_attrs.append(
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, subject["ST"])
        )
    if "L" in subject:
        name_attrs.append(x509.NameAttribute(NameOID.LOCALITY_NAME, subject["L"]))
    if "O" in subject:
        name_attrs.append(
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, subject["O"])
        )
    if "OU" in subject:
        name_attrs.append(
            x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, subject["OU"])
        )
    if "emailAddress" in subject:
        name_attrs.append(
            x509.NameAttribute(NameOID.EMAIL_ADDRESS, subject["emailAddress"])
        )

    csr_builder = x509.CertificateSigningRequestBuilder().subject_name(
        x509.Name(name_attrs)
    )

    if san:
        san_entries = []
        for d in san.get("dns", []):
            san_entries.append(x509.DNSName(d))
        for i in san.get("ip", []):
            san_entries.append(x509.IPAddress(i))
        for u in san.get("uri", []):
            san_entries.append(x509.UniformResourceIdentifier(u))
        for e in san.get("email", []):
            san_entries.append(x509.RFC822Name(e))
        if san_entries:
            csr_builder = csr_builder.add_extension(
                x509.SubjectAlternativeName(san_entries), critical=False
            )

    if challenge_password:
        csr_builder = csr_builder.add_attribute(
            x509.OID_PKCS9_CHALLENGE_PASSWORD, challenge_password.encode()
        )

    if extensions and "basic_constraints" in extensions:
        bc = extensions["basic_constraints"]
        csr_builder = csr_builder.add_extension(
            x509.BasicConstraints(
                ca=bc.get("ca", False), path_length=bc.get("path_len")
            ),
            critical=True,
        )

    sig = (
        None
        if isinstance(sk, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey))
        else hashes.SHA256()
    )
    csr = csr_builder.sign(private_key=sk, algorithm=sig)

    return csr.public_bytes(
        serialization.Encoding.DER if output_der else serialization.Encoding.PEM
    )

create_self_signed async

create_self_signed(*a, **kw)
Source code in swarmauri_certs_csronly/CsrOnlyService.py
125
126
async def create_self_signed(self, *a, **kw):  # pragma: no cover - not implemented
    raise NotImplementedError("CsrOnlyService does not create certificates")

sign_cert async

sign_cert(*a, **kw)
Source code in swarmauri_certs_csronly/CsrOnlyService.py
128
129
async def sign_cert(self, *a, **kw):  # pragma: no cover - not implemented
    raise NotImplementedError("CsrOnlyService does not sign certificates")

verify_cert async

verify_cert(*a, **kw)
Source code in swarmauri_certs_csronly/CsrOnlyService.py
131
132
async def verify_cert(self, *a, **kw):  # pragma: no cover - not implemented
    raise NotImplementedError("CsrOnlyService does not verify certificates")

parse_cert async

parse_cert(*a, **kw)
Source code in swarmauri_certs_csronly/CsrOnlyService.py
134
135
async def parse_cert(self, *a, **kw):  # pragma: no cover - not implemented
    raise NotImplementedError("CsrOnlyService does not parse certificates")