In-memory stand-in for Redis with minimal async APIs.
Source code in peagen/plugins/queues/in_memory_queue.py
11
12
13
14
15
16
17
18
19 | def __init__(self, maxsize: int = 0, **_: object) -> None:
self.lists: dict[str, list[str]] = defaultdict(list)
self.sets: dict[str, set[str]] = defaultdict(set)
self.hashes: dict[str, dict[str, Any]] = defaultdict(dict)
self.expiry: dict[str, float] = {}
self.pubsub: dict[str, list[str]] = defaultdict(list)
self._loop = asyncio.get_event_loop()
self._cond = asyncio.Condition()
self.maxsize = maxsize
|
lists
instance-attribute
lists = defaultdict(list)
hashes
instance-attribute
hashes = defaultdict(dict)
expiry
instance-attribute
pubsub
instance-attribute
pubsub = defaultdict(list)
maxsize
instance-attribute
get_client
Source code in peagen/plugins/queues/in_memory_queue.py
| def get_client(self) -> "InMemoryQueue":
return self
|
sadd
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def sadd(self, key: str, member: str) -> None:
self.sets[key].add(member)
|
smembers
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def smembers(self, key: str) -> list[str]:
return list(self.sets.get(key, set()))
|
rpush
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def rpush(self, key: str, value: str) -> None:
self.lists[key].append(value)
async with self._cond:
self._cond.notify_all()
|
lrange
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def lrange(self, key: str, start: int, end: int) -> list[str]:
lst = self.lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]
|
blpop
async
Source code in peagen/plugins/queues/in_memory_queue.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 | async def blpop(self, keys: list[str], timeout: float) -> tuple[str, str] | None:
end_time = self._loop.time() + timeout
while True:
await self._cleanup()
for k in keys:
lst = self.lists.get(k)
if lst:
value = lst.pop(0)
return k, value
remaining = end_time - self._loop.time()
if remaining <= 0:
return None
async with self._cond:
try:
await asyncio.wait_for(self._cond.wait(), remaining)
except asyncio.TimeoutError:
return None
|
brpop
async
Source code in peagen/plugins/queues/in_memory_queue.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 | async def brpop(self, keys: list[str], timeout: float) -> tuple[str, str] | None:
end_time = self._loop.time() + timeout
while True:
await self._cleanup()
for k in keys:
lst = self.lists.get(k)
if lst:
value = lst.pop()
return k, value
remaining = end_time - self._loop.time()
if remaining <= 0:
return None
async with self._cond:
try:
await asyncio.wait_for(self._cond.wait(), remaining)
except asyncio.TimeoutError:
return None
|
get
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def get(self, key: str) -> None:
return self.client.get(key)
|
set
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def set(self, key: str, mapping: dict[str, Any]) -> None:
self.hashes[key] = mapping
|
hset
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def hset(self, key: str, mapping: dict[str, Any]) -> None:
self.hashes[key].update(mapping)
|
hgetall
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def hgetall(self, key: str) -> dict[str, Any]:
await self._cleanup()
return dict(self.hashes.get(key, {}))
|
hget
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def hget(self, key: str, field: str) -> str | None:
await self._cleanup()
return self.hashes.get(key, {}).get(field)
|
expire
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def expire(self, key: str, ttl: int) -> None:
self.expiry[key] = self._loop.time() + ttl
|
exists
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def exists(self, key: str) -> bool:
await self._cleanup()
return key in self.hashes
|
keys
async
Source code in peagen/plugins/queues/in_memory_queue.py
| async def keys(self, pattern: str) -> list[str]:
await self._cleanup()
if pattern.endswith("*"):
prefix = pattern[:-1]
return [k for k in self.hashes if k.startswith(prefix)]
return [k for k in self.hashes if k == pattern]
|
publish
async
publish(channel, message)
Source code in peagen/plugins/queues/in_memory_queue.py
| async def publish(self, channel: str, message: str) -> None:
self.pubsub[channel].append(message)
|