Bases: TransportBase
, UnicastTransportMixin
, PeerTransportMixin
, MultiplexTransportMixin
QUIC-like transport built on asyncio UDP primitives.
Source code in swarmauri_transport_quic/quic_transport.py
| def __init__(self, **config: object) -> None:
super().__init__(name="QUIC")
self._config = dict(config)
self._reset_state()
|
supports
Source code in swarmauri_transport_quic/quic_transport.py
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 | def supports(self) -> TransportCapabilities:
return TransportCapabilities(
protocols=frozenset({Protocol.QUIC}),
io=IOModel.STREAM,
casts=frozenset({Cast.UNICAST}),
features=frozenset(
{
Feature.RELIABLE,
Feature.ORDERED,
Feature.ENCRYPTED,
Feature.AUTHENTICATED,
Feature.MULTIPLEX,
}
),
security=SecurityMode.TLS,
schemes=frozenset({AddressScheme.QUIC}),
)
|
accept
async
Source code in swarmauri_transport_quic/quic_transport.py
| async def accept(self) -> AsyncIterator[Tuple[str, int]]: # type: ignore[override]
while True:
addr = await self._peer_queue.get()
yield addr
|
send
async
send(target, data, *, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
| async def send(
self, target: str, data: bytes, *, timeout: Optional[float] = None
) -> None:
addr = self._resolve_target(target)
await self._send_datagram((0).to_bytes(4, "big") + data, addr, timeout)
|
recv
async
Source code in swarmauri_transport_quic/quic_transport.py
| async def recv(self, *, timeout: Optional[float] = None) -> bytes:
if timeout is None:
return await self._default_queue.get()
return await asyncio.wait_for(self._default_queue.get(), timeout)
|
open_channel
async
Source code in swarmauri_transport_quic/quic_transport.py
| async def open_channel(self) -> ChannelHandle:
handle = self._next_channel
self._next_channel += 1
remote = self._peer_addr or self._default_remote
self._channels[handle] = _QuicChannel(handle, asyncio.Queue(), remote)
return handle
|
close_channel
async
Source code in swarmauri_transport_quic/quic_transport.py
| async def close_channel(self, handle: ChannelHandle) -> None:
self._channels.pop(int(handle), None)
|
send_on
async
send_on(handle, data, *, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
184
185
186
187
188
189
190
191
192
193 | async def send_on(
self, handle: ChannelHandle, data: bytes, *, timeout: Optional[float] = None
) -> None:
channel = self._ensure_channel(handle)
if channel.remote is None:
channel.remote = self._peer_addr or self._default_remote
if channel.remote is None:
raise RuntimeError("no remote peer associated with channel")
payload = int(channel.identifier).to_bytes(4, "big") + data
await self._send_datagram(payload, channel.remote, timeout)
|
recv_on
async
recv_on(handle, *, timeout=None)
Source code in swarmauri_transport_quic/quic_transport.py
195
196
197
198
199
200
201 | async def recv_on(
self, handle: ChannelHandle, *, timeout: Optional[float] = None
) -> bytes:
channel = self._ensure_channel(handle)
if timeout is None:
return await channel.queue.get()
return await asyncio.wait_for(channel.queue.get(), timeout)
|