-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathabc.py
More file actions
46 lines (38 loc) · 1.34 KB
/
abc.py
File metadata and controls
46 lines (38 loc) · 1.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from abc import ABC, abstractmethod
from typing import Any, Dict, Type
from taskiq import AsyncBroker, AsyncTaskiqTask, TaskiqResult
from typing_extensions import ClassVar
class AbstractStep(ABC):
"""Abstract pipeline step."""
_step_name: str
_known_steps: ClassVar[Dict[str, Type["AbstractStep"]]] = {}
def __init_subclass__(cls, step_name: str, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
# Sets step name to the step.
cls._step_name = step_name
# Registers new subclass in the dict of
# known steps.
cls._known_steps[step_name] = cls
@abstractmethod
async def act(
self,
broker: AsyncBroker,
step_number: int,
parent_task_id: str,
task_id: str,
pipe_data: bytes,
result: "TaskiqResult[Any]",
) -> AsyncTaskiqTask[Any]:
"""
Perform pipeline action.
If you create task, please
assign given task_id to this task,
it helps clients to identify currently
executed task.
:param broker: current broker.
:param step_number: current step number.
:param parent_task_id: current task id.
:param task_id: task_id to use.
:param pipe_data: serialized pipeline must be in labels.
:param result: result of a previous task.
"""