Skip to content

Class swarmauri_transport_quic.quic_transport.QuicTransport

swarmauri_transport_quic.quic_transport.QuicTransport

QuicTransport(**config)

Bases: TransportBase, UnicastTransportMixin, PeerTransportMixin, MultiplexTransportMixin

QUIC-like transport built on asyncio UDP primitives.

Source code in swarmauri_transport_quic/quic_transport.py
73
74
75
76
def __init__(self, **config: object) -> None:
    super().__init__(name="QUIC")
    self._config = dict(config)
    self._reset_state()

supports

supports()
Source code in swarmauri_transport_quic/quic_transport.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def supports(self) -> TransportCapabilities:
    return TransportCapabilities(
        protocols=frozenset({Protocol.QUIC}),
        io=IOModel.STREAM,
        casts=frozenset({Cast.UNICAST}),
        features=frozenset(
            {
                Feature.RELIABLE,
                Feature.ORDERED,
                Feature.ENCRYPTED,
                Feature.AUTHENTICATED,
                Feature.MULTIPLEX,
            }
        ),
        security=SecurityMode.TLS,
        schemes=frozenset({AddressScheme.QUIC}),
    )

accept async

accept()
Source code in swarmauri_transport_quic/quic_transport.py
152
153
154
155
async def accept(self) -> AsyncIterator[Tuple[str, int]]:  # type: ignore[override]
    while True:
        addr = await self._peer_queue.get()
        yield addr

send async

send(target, data, *, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
160
161
162
163
164
async def send(
    self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
    addr = self._resolve_target(target)
    await self._send_datagram((0).to_bytes(4, "big") + data, addr, timeout)

recv async

recv(*, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
166
167
168
169
async def recv(self, *, timeout: Optional[float] = None) -> bytes:
    if timeout is None:
        return await self._default_queue.get()
    return await asyncio.wait_for(self._default_queue.get(), timeout)

open_channel async

open_channel()
Source code in swarmauri_transport_quic/quic_transport.py
174
175
176
177
178
179
async def open_channel(self) -> ChannelHandle:
    handle = self._next_channel
    self._next_channel += 1
    remote = self._peer_addr or self._default_remote
    self._channels[handle] = _QuicChannel(handle, asyncio.Queue(), remote)
    return handle

close_channel async

close_channel(handle)
Source code in swarmauri_transport_quic/quic_transport.py
181
182
async def close_channel(self, handle: ChannelHandle) -> None:
    self._channels.pop(int(handle), None)

send_on async

send_on(handle, data, *, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
184
185
186
187
188
189
190
191
192
193
async def send_on(
    self, handle: ChannelHandle, data: bytes, *, timeout: Optional[float] = None
) -> None:
    channel = self._ensure_channel(handle)
    if channel.remote is None:
        channel.remote = self._peer_addr or self._default_remote
    if channel.remote is None:
        raise RuntimeError("no remote peer associated with channel")
    payload = int(channel.identifier).to_bytes(4, "big") + data
    await self._send_datagram(payload, channel.remote, timeout)

recv_on async

recv_on(handle, *, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
195
196
197
198
199
200
201
async def recv_on(
    self, handle: ChannelHandle, *, timeout: Optional[float] = None
) -> bytes:
    channel = self._ensure_channel(handle)
    if timeout is None:
        return await channel.queue.get()
    return await asyncio.wait_for(channel.queue.get(), timeout)