Skip to content

Class swarmauri_transport_tls_unicast.tls_unicast.TlsUnicastTransport

swarmauri_transport_tls_unicast.tls_unicast.TlsUnicastTransport

TlsUnicastTransport(ssl_ctx, sni=None)

Bases: TransportBase, UnicastTransportMixin, PeerTransportMixin

TLS-secured unicast transport built on top of asyncio streams.

Source code in swarmauri_transport_tls_unicast/tls_unicast.py
26
27
28
29
30
31
32
def __init__(self, ssl_ctx: ssl.SSLContext, sni: Optional[str] = None):
    super().__init__(name="TLS")
    self._ssl_ctx = ssl_ctx
    self._sni = sni
    self._server: asyncio.AbstractServer | None = None
    self._reader: asyncio.StreamReader | None = None
    self._writer: asyncio.StreamWriter | None = None

supports

supports()
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def supports(self) -> TransportCapabilities:
    security = (
        SecurityMode.MTLS
        if self._ssl_ctx.verify_mode == ssl.CERT_REQUIRED
        else SecurityMode.TLS
    )
    features = {
        Feature.RELIABLE,
        Feature.ORDERED,
        Feature.ENCRYPTED,
        Feature.AUTHENTICATED,
    }
    if security is SecurityMode.MTLS:
        features.add(Feature.MUTUAL_AUTH)
    return TransportCapabilities(
        protocols=frozenset({Protocol.TLS}),
        io=IOModel.STREAM,
        casts=frozenset({Cast.UNICAST}),
        features=frozenset(features),
        security=security,
        schemes=frozenset({AddressScheme.TLS}),
    )

accept async

accept()
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
85
86
87
88
89
async def accept(self):
    if not self._server:
        raise RuntimeError("server not started")
    while True:
        await asyncio.sleep(3600)

send async

send(target, data, *, timeout=None)
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
91
92
93
94
95
96
97
async def send(
    self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
    if not self._writer:
        raise RuntimeError("not connected")
    self._writer.write(data)
    await asyncio.wait_for(self._writer.drain(), timeout)

recv async

recv(*, timeout=None)
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
 99
100
101
102
async def recv(self, *, timeout: Optional[float] = None) -> bytes:
    if not self._reader:
        raise RuntimeError("not connected")
    return await asyncio.wait_for(self._reader.read(65536), timeout)