Bases: TransportBase
, UnicastTransportMixin
, PeerTransportMixin
Unix domain socket transport supporting unicast messaging.
Source code in swarmauri_transport_uds_unicast/uds_unicast.py
| def __init__(self, path: str):
super().__init__(name=f"UDS:{path}")
self._path = path
self._server: asyncio.AbstractServer | None = None
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
|
supports
Source code in swarmauri_transport_uds_unicast/uds_unicast.py
33
34
35
36
37
38
39
40
41 | def supports(self) -> TransportCapabilities:
return TransportCapabilities(
protocols=frozenset({Protocol.UDS}),
io=IOModel.STREAM,
casts=frozenset({Cast.UNICAST}),
features=frozenset({Feature.RELIABLE, Feature.ORDERED, Feature.LOCAL_ONLY}),
security=SecurityMode.NONE,
schemes=frozenset({AddressScheme.UDS}),
)
|
accept
async
Source code in swarmauri_transport_uds_unicast/uds_unicast.py
| async def accept(self):
if not self._server:
raise RuntimeError("server not started")
while True:
await asyncio.sleep(3600)
|
send
async
send(target, data, *, timeout=None)
Source code in swarmauri_transport_uds_unicast/uds_unicast.py
| async def send(
self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
if not self._writer:
raise RuntimeError("not connected")
self._writer.write(data)
await asyncio.wait_for(self._writer.drain(), timeout)
|
recv
async
Source code in swarmauri_transport_uds_unicast/uds_unicast.py
| async def recv(self, *, timeout: Optional[float] = None) -> bytes:
if not self._reader:
raise RuntimeError("not connected")
return await asyncio.wait_for(self._reader.read(65536), timeout)
|