Skip to content

Class swarmauri_transport_udp.udp_transport.UdpTransport

swarmauri_transport_udp.udp_transport.UdpTransport

UdpTransport(bind=None, multicast_groups=())

Bases: TransportBase, UnicastTransportMixin, MulticastTransportMixin, BroadcastTransportMixin, AnycastTransportMixin

Datagram transport built on UDP sockets.

Source code in swarmauri_transport_udp/udp_transport.py
33
34
35
36
37
38
39
40
def __init__(
    self, bind: Optional[str] = None, multicast_groups: Sequence[str] = ()
):  # noqa: B008
    super().__init__(name="UDP")
    self._bind = bind
    self._groups = tuple(multicast_groups)
    self._sock: Optional[socket.socket] = None
    self._loop = asyncio.get_event_loop()

supports

supports()
Source code in swarmauri_transport_udp/udp_transport.py
42
43
44
45
46
47
48
49
50
51
52
def supports(self) -> TransportCapabilities:
    return TransportCapabilities(
        protocols=frozenset({Protocol.UDP}),
        io=IOModel.DATAGRAM,
        casts=frozenset(
            {Cast.UNICAST, Cast.BROADCAST, Cast.MULTICAST, Cast.ANYCAST}
        ),
        features=frozenset(),
        security=SecurityMode.NONE,
        schemes=frozenset({AddressScheme.UDP}),
    )

send async

send(target, data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
80
81
82
83
84
85
86
async def send(
    self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
    if not self._sock:
        raise RuntimeError("socket not initialized")
    host, port = target.split(":", 1)
    self._sock.sendto(data, (host, int(port)))

recv async

recv(*, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
88
89
90
91
92
async def recv(self, *, timeout: Optional[float] = None) -> bytes:
    if not self._sock:
        raise RuntimeError("socket not initialized")
    fut = self._loop.run_in_executor(None, self._sock.recv, 65536)
    return await asyncio.wait_for(fut, timeout)

broadcast async

broadcast(data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
94
95
96
97
98
99
async def broadcast(self, data: bytes, *, timeout: Optional[float] = None) -> None:
    if not self._sock:
        raise RuntimeError("socket not initialized")
    self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    port = self._sock.getsockname()[1]
    self._sock.sendto(data, ("255.255.255.255", port))

multicast async

multicast(group, data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
101
102
103
104
105
106
107
108
async def multicast(
    self, group: Sequence[str], data: bytes, *, timeout: Optional[float] = None
) -> None:
    if not self._sock:
        raise RuntimeError("socket not initialized")
    for address in group:
        host, port = address.split(":", 1)
        self._sock.sendto(data, (host, int(port)))

anycast async

anycast(candidates, data, *, timeout=None)
Source code in swarmauri_transport_udp/udp_transport.py
110
111
112
113
114
115
116
117
118
119
async def anycast(
    self, candidates: Sequence[str], data: bytes, *, timeout: Optional[float] = None
) -> str:
    if not self._sock:
        raise RuntimeError("socket not initialized")
    for candidate in candidates:
        host, port = candidate.split(":", 1)
        self._sock.sendto(data, (host, int(port)))
        return candidate
    raise RuntimeError("no candidates provided")