-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy path_service.py
More file actions
71 lines (58 loc) · 2.75 KB
/
_service.py
File metadata and controls
71 lines (58 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import logging
from collections.abc import Iterable
from anyio import to_thread
from dependency_injector.wiring import Provide, inject
from common.tracing import trace_function
from common.utils import apply_decorator_to_methods
from ._gateway_interfaces import BookEventGatewayInterface, BookRepositoryInterface
from ._models import BookModel
from ._tasks import book_cpu_intensive_task
from .dto import Book, BookData
from .events import BookCreatedV1, BookCreatedV1Data
@apply_decorator_to_methods(trace_function())
class BookService:
_book_repository: BookRepositoryInterface
_event_gateway: BookEventGatewayInterface
@inject
def __init__(
self,
book_repository: BookRepositoryInterface = Provide[BookRepositoryInterface.__name__],
event_gateway: BookEventGatewayInterface = Provide[BookEventGatewayInterface.__name__],
) -> None:
super().__init__()
self._book_repository = book_repository
self._event_gateway = event_gateway
async def create_book(self, book: BookData) -> Book:
# Example of CPU intensive task ran in a different thread
# Using processes could be better, but it would bring technical complexity
# https://anyio.readthedocs.io/en/3.x/subprocesses.html#running-functions-in-worker-processes
book_data_altered: dict = await to_thread.run_sync(self._some_cpu_intensive_blocking_task, book.model_dump())
book_model = BookModel(**book_data_altered)
book = Book.model_validate(await self._book_repository.save(book_model), from_attributes=True)
# Example of CPU intensive task ran in a dramatiq task. We should not rely on
# dramatiq if we need to wait the operation result.
# The worker could be terminated (e.g. during deployments) and this function
# would time out or raise an error.
book_cpu_intensive_task.send(book_id=str(book.book_id))
await self._event_gateway.emit(
BookCreatedV1.event_factory(data=BookCreatedV1Data.model_validate(book_model, from_attributes=True))
)
return book
async def list_books(self) -> Iterable[Book]:
books = await self._book_repository.find()
return [Book.model_validate(x, from_attributes=True) for x in books]
async def book_created_event_handler(
self,
book_id: int,
) -> None: # pragma: no cover
# This is just an example placeholder, there's nothing to test.
logging.info(
"Processed book crated event`",
extra={
"book_id": book_id,
},
)
def _some_cpu_intensive_blocking_task(self, book: dict) -> dict:
# This is just an example placeholder,
# there's nothing to test.
return book # pragma: no cover