-
Notifications
You must be signed in to change notification settings - Fork 373
Expand file tree
/
Copy pathtest_redis.py
More file actions
90 lines (69 loc) · 3.01 KB
/
test_redis.py
File metadata and controls
90 lines (69 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import time
from testcontainers.redis import RedisContainer, AsyncRedisContainer
import pytest
import redis
def test_docker_run_redis():
config = RedisContainer()
with config as redis:
client = redis.get_client()
p = client.pubsub()
p.subscribe("test")
client.publish("test", "new_msg")
msg = wait_for_message(p)
assert "data" in msg
assert b"new_msg", msg["data"]
def test_docker_run_redis_with_password():
config = RedisContainer(password="mypass")
with config as redis:
client = redis.get_client(decode_responses=True)
client.set("hello", "world")
assert client.get("hello") == "world"
def test_docker_run_start_fails(monkeypatch: pytest.MonkeyPatch):
# Patch config to speed up the test.
monkeypatch.setattr("testcontainers.core.config.testcontainers_config.max_tries", 0.3)
monkeypatch.setattr("testcontainers.core.config.testcontainers_config.sleep_time", 0.02)
# Use a bogus image to make the startup check fail.
config = RedisContainer(image="hello-world")
with pytest.raises(redis.exceptions.ConnectionError, match="Could not connect"):
config.start()
pytest.mark.usefixtures("anyio_backend")
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
async def test_key_set_in_async_redis(anyio_backend):
with AsyncRedisContainer() as container:
async_redis_client: redis.Redis = await container.get_async_client(decode_responses=True)
key = "key"
expected_value = 1
await async_redis_client.set(key, expected_value)
actual_value = await async_redis_client.get(key)
assert int(actual_value) == expected_value
pytest.mark.usefixtures("anyio_backend")
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
@pytest.mark.skip(reason="Need to sort out async pub/sub")
async def test_docker_run_async_redis(anyio_backend):
config = AsyncRedisContainer()
with config as container:
client: redis.Redis = await container.get_async_client(decode_responses=True)
p = await client.pubsub()
await p.subscribe("test")
await client.publish("test", "new_msg")
msg = wait_for_message(p)
assert "data" in msg
assert b"new_msg", msg["data"]
pytest.mark.usefixtures("anyio_backend")
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
async def test_docker_run_async_redis_with_password(anyio_backend):
config = AsyncRedisContainer(password="mypass")
with config as container:
client: redis.Redis = await container.get_async_client(decode_responses=True)
await client.set("hello", "world")
assert await client.get("hello") == "world"
def wait_for_message(pubsub, timeout=1, ignore_subscribe_messages=True):
now = time.time()
timeout = now + timeout
while now < timeout:
message = pubsub.get_message(ignore_subscribe_messages=ignore_subscribe_messages)
if message is not None:
return message
time.sleep(0.01)
now = time.time()
return None