Bases: TransportBase
, UnicastTransportMixin
, PeerTransportMixin
TLS-secured unicast transport built on top of asyncio streams.
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
| def __init__(self, ssl_ctx: ssl.SSLContext, sni: Optional[str] = None):
super().__init__(name="TLS")
self._ssl_ctx = ssl_ctx
self._sni = sni
self._server: asyncio.AbstractServer | None = None
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
|
supports
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 | def supports(self) -> TransportCapabilities:
security = (
SecurityMode.MTLS
if self._ssl_ctx.verify_mode == ssl.CERT_REQUIRED
else SecurityMode.TLS
)
features = {
Feature.RELIABLE,
Feature.ORDERED,
Feature.ENCRYPTED,
Feature.AUTHENTICATED,
}
if security is SecurityMode.MTLS:
features.add(Feature.MUTUAL_AUTH)
return TransportCapabilities(
protocols=frozenset({Protocol.TLS}),
io=IOModel.STREAM,
casts=frozenset({Cast.UNICAST}),
features=frozenset(features),
security=security,
schemes=frozenset({AddressScheme.TLS}),
)
|
accept
async
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
| async def accept(self):
if not self._server:
raise RuntimeError("server not started")
while True:
await asyncio.sleep(3600)
|
send
async
send(target, data, *, timeout=None)
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
| async def send(
self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
if not self._writer:
raise RuntimeError("not connected")
self._writer.write(data)
await asyncio.wait_for(self._writer.drain(), timeout)
|
recv
async
Source code in swarmauri_transport_tls_unicast/tls_unicast.py
| async def recv(self, *, timeout: Optional[float] = None) -> bytes:
if not self._reader:
raise RuntimeError("not connected")
return await asyncio.wait_for(self._reader.read(65536), timeout)
|