Skip to content

Class peagen.plugins.publishers.redis_publisher.RedisPublisher

peagen.plugins.publishers.redis_publisher.RedisPublisher

RedisPublisher(
    *,
    uri=None,
    host=None,
    port=None,
    db=None,
    password=None,
    username=None,
)

Sync publisher for Redis Pub/Sub.

Source code in peagen/plugins/publishers/redis_publisher.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    *,
    uri: Optional[str] = None,
    host: Optional[str] = None,
    port: Optional[int] = None,
    db: Optional[int] = None,
    password: Optional[str] = None,
    username: Optional[str] = None,
) -> None:
    individual_opts = any(
        v is not None for v in (host, port, db, password, username)
    )
    if uri and individual_opts:
        raise ValueError(
            "Cannot specify both `uri` and individual host/port/db/password/username."
        )

    if uri:
        redis_uri = uri
    else:
        if not (host and port is not None and db is not None):
            raise ValueError(
                "When no `uri` is given, `host`, `port`, and `db` are required."
            )

        if username and password:
            auth = f"{quote_plus(username)}:{quote_plus(password)}@"
        elif password:
            auth = f":{quote_plus(password)}@"
        else:
            auth = ""

        redis_uri = f"redis://{auth}{host}:{port}/{db}"

    self._client: redis.Redis = redis.from_url(redis_uri, decode_responses=True)

publish

publish(channel, payload)

Send payload to channel as a JSON message.

Source code in peagen/plugins/publishers/redis_publisher.py
52
53
54
def publish(self, channel: str, payload: Dict[str, Any]) -> None:
    """Send ``payload`` to ``channel`` as a JSON message."""
    self._client.publish(channel, json.dumps(payload))