Bases: TransportBase
, UnicastTransportMixin
, MulticastTransportMixin
, BroadcastTransportMixin
, AnycastTransportMixin
Datagram transport built on UDP sockets.
Source code in swarmauri_transport_udp/udp_transport.py
| def __init__(
self, bind: Optional[str] = None, multicast_groups: Sequence[str] = ()
): # noqa: B008
super().__init__(name="UDP")
self._bind = bind
self._groups = tuple(multicast_groups)
self._sock: Optional[socket.socket] = None
self._loop = asyncio.get_event_loop()
|
supports
Source code in swarmauri_transport_udp/udp_transport.py
42
43
44
45
46
47
48
49
50
51
52 | def supports(self) -> TransportCapabilities:
return TransportCapabilities(
protocols=frozenset({Protocol.UDP}),
io=IOModel.DATAGRAM,
casts=frozenset(
{Cast.UNICAST, Cast.BROADCAST, Cast.MULTICAST, Cast.ANYCAST}
),
features=frozenset(),
security=SecurityMode.NONE,
schemes=frozenset({AddressScheme.UDP}),
)
|
send
async
send(target, data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
| async def send(
self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
if not self._sock:
raise RuntimeError("socket not initialized")
host, port = target.split(":", 1)
self._sock.sendto(data, (host, int(port)))
|
recv
async
Source code in swarmauri_transport_udp/udp_transport.py
| async def recv(self, *, timeout: Optional[float] = None) -> bytes:
if not self._sock:
raise RuntimeError("socket not initialized")
fut = self._loop.run_in_executor(None, self._sock.recv, 65536)
return await asyncio.wait_for(fut, timeout)
|
broadcast
async
broadcast(data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
| async def broadcast(self, data: bytes, *, timeout: Optional[float] = None) -> None:
if not self._sock:
raise RuntimeError("socket not initialized")
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
port = self._sock.getsockname()[1]
self._sock.sendto(data, ("255.255.255.255", port))
|
multicast
async
multicast(group, data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
101
102
103
104
105
106
107
108 | async def multicast(
self, group: Sequence[str], data: bytes, *, timeout: Optional[float] = None
) -> None:
if not self._sock:
raise RuntimeError("socket not initialized")
for address in group:
host, port = address.split(":", 1)
self._sock.sendto(data, (host, int(port)))
|
anycast
async
anycast(candidates, data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
110
111
112
113
114
115
116
117
118
119 | async def anycast(
self, candidates: Sequence[str], data: bytes, *, timeout: Optional[float] = None
) -> str:
if not self._sock:
raise RuntimeError("socket not initialized")
for candidate in candidates:
host, port = candidate.split(":", 1)
self._sock.sendto(data, (host, int(port)))
return candidate
raise RuntimeError("no candidates provided")
|