Skip to content

Commit 58894dc

Browse files
committed
feat: add project description
1 parent 0b23479 commit 58894dc

1 file changed

Lines changed: 135 additions & 1 deletion

File tree

README.md

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,135 @@
1-
# taskiq_valkey
1+
# TaskIQ-Valkey
2+
3+
Taskiq-valkey is a plugin for taskiq that adds a new broker and result backend based on valkey.
4+
5+
# Installation
6+
7+
To use this project you must have installed core taskiq library:
8+
```bash
9+
pip install taskiq
10+
```
11+
This project can be installed using pip:
12+
```bash
13+
pip install taskiq-valkey
14+
```
15+
16+
# Usage
17+
18+
Let's see the example with the valkey broker and valkey async result:
19+
20+
```python
21+
# broker.py
22+
import asyncio
23+
24+
from taskiq_valkey import ValkeyAsyncResultBackend, ValkeyStreamBroker
25+
26+
result_backend = ValkeyAsyncResultBackend(
27+
valkey_url="valkey://localhost:6379",
28+
)
29+
30+
# Or you can use PubSubBroker if you need broadcasting
31+
# Or ListQueueBroker if you don't want acknowledges
32+
broker = ValkeyStreamBroker(
33+
valkey_url="valkey://localhost:6379",
34+
).with_result_backend(result_backend)
35+
36+
37+
@broker.task
38+
async def best_task_ever() -> None:
39+
"""Solve all problems in the world."""
40+
await asyncio.sleep(5.5)
41+
print("All problems are solved!")
42+
43+
44+
async def main():
45+
task = await best_task_ever.kiq()
46+
print(await task.wait_result())
47+
48+
49+
if __name__ == "__main__":
50+
asyncio.run(main())
51+
```
52+
53+
Launch the workers:
54+
`taskiq worker broker:broker`
55+
Then run the main code:
56+
`python3 broker.py`
57+
58+
59+
## Brokers
60+
61+
This package contains 6 broker implementations. We have two broker types: `PubSub` and `Stream`.
62+
63+
Each of type is implemented for each valkey architecture:
64+
* Single node
65+
* Cluster
66+
* Sentinel
67+
68+
Here's a small breakdown of how they differ from eachother.
69+
70+
71+
### PubSub
72+
73+
By default on old valkey versions PUBSUB was the way of making valkey into a queue.
74+
But using PUBSUB means that all messages delivered to all subscribed consumers.
75+
76+
> [!WARNING]
77+
> This broker doesn't support acknowledgements. If during message processing
78+
> Worker was suddenly killed the message is going to be lost.
79+
80+
### Stream
81+
82+
Stream brokers use valkey [stream type](https://valkey.io/topics/streams-intro/) to store and fetch messages.
83+
84+
> [!TIP]
85+
> This broker **supports** acknowledgements and therefore is fine to use in cases when data durability is
86+
> required.
87+
88+
## ValkeyAsyncResultBackend configuration
89+
90+
ValkeyAsyncResultBackend parameters:
91+
* `valkey_url` - url to valkey.
92+
* `keep_results` - flag to not remove results from Valkey after reading.
93+
* `result_ex_time` - expire time in seconds (by default - not specified)
94+
* `result_px_time` - expire time in milliseconds (by default - not specified)
95+
* Any other keyword arguments are passed to `valkey.asyncio.BlockingConnectionPool`.
96+
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
97+
(or set it to `None` to try reconnects indefinitely).
98+
99+
> [!WARNING]
100+
> **It is highly recommended to use expire time in ValkeyAsyncResultBackend**
101+
> If you want to add expiration, either `result_ex_time` or `result_px_time` must be set.
102+
> ```python
103+
> # First variant
104+
> valkey_async_result = ValkeyAsyncResultBackend(
105+
> valkey_url="valkey://localhost:6379",
106+
> result_ex_time=1000,
107+
> )
108+
>
109+
> # Second variant
110+
> valkey_async_result = ValkeyAsyncResultBackend(
111+
> valkey_url="valkey://localhost:6379",
112+
> result_px_time=1000000,
113+
> )
114+
> ```
115+
116+
117+
## Schedule sources
118+
119+
120+
You can use this package to add dynamic schedule sources. They are used to store
121+
schedules for taskiq scheduler.
122+
123+
The advantage of using schedule sources from this package over default `LabelBased` source is that you can
124+
dynamically add schedules in it.
125+
126+
For now we have only one type of schedules - `ListValkeyScheduleSource`.
127+
128+
### ListValkeyScheduleSource
129+
130+
This source holds values in lists.
131+
132+
* For cron tasks it uses key `{prefix}:cron`.
133+
* For timed schedules it uses key `{prefix}:time:{time}` where `{time}` is actually time where schedules should run.
134+
135+
The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to valkey.

0 commit comments

Comments
 (0)