Skip to content

Class peagen.plugins.queues.in_memory_queue.InMemoryQueue

peagen.plugins.queues.in_memory_queue.InMemoryQueue

InMemoryQueue(maxsize=0, **_)

In-memory stand-in for Redis with minimal async APIs.

Source code in peagen/plugins/queues/in_memory_queue.py
11
12
13
14
15
16
17
18
19
def __init__(self, maxsize: int = 0, **_: object) -> None:
    self.lists: dict[str, list[str]] = defaultdict(list)
    self.sets: dict[str, set[str]] = defaultdict(set)
    self.hashes: dict[str, dict[str, Any]] = defaultdict(dict)
    self.expiry: dict[str, float] = {}
    self.pubsub: dict[str, list[str]] = defaultdict(list)
    self._loop = asyncio.get_event_loop()
    self._cond = asyncio.Condition()
    self.maxsize = maxsize

lists instance-attribute

lists = defaultdict(list)

sets instance-attribute

sets = defaultdict(set)

hashes instance-attribute

hashes = defaultdict(dict)

expiry instance-attribute

expiry = {}

pubsub instance-attribute

pubsub = defaultdict(list)

maxsize instance-attribute

maxsize = maxsize

get_client

get_client()
Source code in peagen/plugins/queues/in_memory_queue.py
21
22
def get_client(self) -> "InMemoryQueue":
    return self

sadd async

sadd(key, member)
Source code in peagen/plugins/queues/in_memory_queue.py
32
33
async def sadd(self, key: str, member: str) -> None:
    self.sets[key].add(member)

smembers async

smembers(key)
Source code in peagen/plugins/queues/in_memory_queue.py
35
36
async def smembers(self, key: str) -> list[str]:
    return list(self.sets.get(key, set()))

rpush async

rpush(key, value)
Source code in peagen/plugins/queues/in_memory_queue.py
39
40
41
42
async def rpush(self, key: str, value: str) -> None:
    self.lists[key].append(value)
    async with self._cond:
        self._cond.notify_all()

lrange async

lrange(key, start, end)
Source code in peagen/plugins/queues/in_memory_queue.py
44
45
46
47
48
async def lrange(self, key: str, start: int, end: int) -> list[str]:
    lst = self.lists.get(key, [])
    if end == -1:
        end = len(lst) - 1
    return lst[start : end + 1]

blpop async

blpop(keys, timeout)
Source code in peagen/plugins/queues/in_memory_queue.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def blpop(self, keys: list[str], timeout: float) -> tuple[str, str] | None:
    end_time = self._loop.time() + timeout
    while True:
        await self._cleanup()
        for k in keys:
            lst = self.lists.get(k)
            if lst:
                value = lst.pop(0)
                return k, value
        remaining = end_time - self._loop.time()
        if remaining <= 0:
            return None
        async with self._cond:
            try:
                await asyncio.wait_for(self._cond.wait(), remaining)
            except asyncio.TimeoutError:
                return None

brpop async

brpop(keys, timeout)
Source code in peagen/plugins/queues/in_memory_queue.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
async def brpop(self, keys: list[str], timeout: float) -> tuple[str, str] | None:
    end_time = self._loop.time() + timeout
    while True:
        await self._cleanup()
        for k in keys:
            lst = self.lists.get(k)
            if lst:
                value = lst.pop()
                return k, value
        remaining = end_time - self._loop.time()
        if remaining <= 0:
            return None
        async with self._cond:
            try:
                await asyncio.wait_for(self._cond.wait(), remaining)
            except asyncio.TimeoutError:
                return None

get async

get(key)
Source code in peagen/plugins/queues/in_memory_queue.py
87
88
async def get(self, key: str) -> None:
    return self.client.get(key)

set async

set(key, mapping)
Source code in peagen/plugins/queues/in_memory_queue.py
90
91
async def set(self, key: str, mapping: dict[str, Any]) -> None:
    self.hashes[key] = mapping

hset async

hset(key, mapping)
Source code in peagen/plugins/queues/in_memory_queue.py
93
94
async def hset(self, key: str, mapping: dict[str, Any]) -> None:
    self.hashes[key].update(mapping)

hgetall async

hgetall(key)
Source code in peagen/plugins/queues/in_memory_queue.py
96
97
98
async def hgetall(self, key: str) -> dict[str, Any]:
    await self._cleanup()
    return dict(self.hashes.get(key, {}))

hget async

hget(key, field)
Source code in peagen/plugins/queues/in_memory_queue.py
100
101
102
async def hget(self, key: str, field: str) -> str | None:
    await self._cleanup()
    return self.hashes.get(key, {}).get(field)

expire async

expire(key, ttl)
Source code in peagen/plugins/queues/in_memory_queue.py
104
105
async def expire(self, key: str, ttl: int) -> None:
    self.expiry[key] = self._loop.time() + ttl

exists async

exists(key)
Source code in peagen/plugins/queues/in_memory_queue.py
107
108
109
async def exists(self, key: str) -> bool:
    await self._cleanup()
    return key in self.hashes

keys async

keys(pattern)
Source code in peagen/plugins/queues/in_memory_queue.py
111
112
113
114
115
116
async def keys(self, pattern: str) -> list[str]:
    await self._cleanup()
    if pattern.endswith("*"):
        prefix = pattern[:-1]
        return [k for k in self.hashes if k.startswith(prefix)]
    return [k for k in self.hashes if k == pattern]

publish async

publish(channel, message)
Source code in peagen/plugins/queues/in_memory_queue.py
118
119
async def publish(self, channel: str, message: str) -> None:
    self.pubsub[channel].append(message)