This repository was archived by the owner on Aug 19, 2025. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 124
Expand file tree
/
Copy pathtest_broadcast.py
More file actions
165 lines (127 loc) · 5.76 KB
/
test_broadcast.py
File metadata and controls
165 lines (127 loc) · 5.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from __future__ import annotations
import asyncio
import typing
import pytest
import redis
from broadcaster import Broadcast, BroadcastBackend, Event
from broadcaster.backends.kafka import KafkaBackend
class CustomBackend(BroadcastBackend):
def __init__(self, url: str):
self._subscribed: set[str] = set()
async def connect(self) -> None:
self._published: asyncio.Queue[Event] = asyncio.Queue()
async def disconnect(self) -> None:
pass
async def subscribe(self, channel: str) -> None:
self._subscribed.add(channel)
async def unsubscribe(self, channel: str) -> None:
self._subscribed.remove(channel)
async def publish(self, channel: str, message: typing.Any) -> None:
event = Event(channel=channel, message=message)
await self._published.put(event)
async def next_published(self) -> Event:
while True:
event = await self._published.get()
if event.channel in self._subscribed:
return event
@pytest.mark.asyncio
async def test_memory():
async with Broadcast("memory://") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_redis():
async with Broadcast("redis://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_redis_server_disconnect():
with pytest.raises(redis.ConnectionError) as exc:
async with Broadcast("redis://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
await broadcast._backend._conn.connection_pool.aclose() # type: ignore[attr-defined]
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
await subscriber.get()
assert False
assert exc.value.args == ("Connection closed by server.",)
@pytest.mark.asyncio
async def test_redis_does_not_log_loop_error_messages_if_subscribing(caplog):
async with Broadcast("redis://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
assert caplog.messages == []
@pytest.mark.asyncio
async def test_redis_does_not_log_loop_error_messages_if_not_subscribing(caplog):
async with Broadcast("redis://localhost:6379") as broadcast:
await broadcast.publish("chatroom", "hello")
# Give the loop an opportunity to catch any errors before checking
# the logs.
await asyncio.sleep(0.1)
assert caplog.messages == []
@pytest.mark.asyncio
async def test_redis_stream():
async with Broadcast("redis-stream://localhost:6379") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
async with broadcast.subscribe("chatroom1") as subscriber:
await broadcast.publish("chatroom1", "hello")
event = await subscriber.get()
assert event.channel == "chatroom1"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_postgres():
async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_kafka():
async with Broadcast("kafka://localhost:9092") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_kafka_multiple_urls():
async with Broadcast(backend=KafkaBackend(urls=["kafka://localhost:9092", "kafka://localhost:9092"])) as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_custom():
backend = CustomBackend("")
async with Broadcast(backend=backend) as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"
@pytest.mark.asyncio
async def test_unknown_backend():
with pytest.raises(ValueError, match="Unsupported backend"):
async with Broadcast(url="unknown://"):
pass
@pytest.mark.asyncio
async def test_needs_url_or_backend():
with pytest.raises(AssertionError, match="Either `url` or `backend` must be provided."):
Broadcast()