diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 318d34d..93cc9dc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,17 +14,25 @@ jobs: - black - ruff - mypy + - stubtest runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.11" + python-version: "3.x" + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + components: clippy + override: true + - name: Install uv + uses: astral-sh/setup-uv@v7 - name: Run lint check uses: pre-commit/action@v3.0.0 with: - extra_args: -a ${{ matrix.cmd }} + extra_args: -a -v ${{ matrix.cmd }} fmt: runs-on: ubuntu-latest steps: @@ -52,33 +60,6 @@ jobs: with: token: ${{secrets.GITHUB_TOKEN}} deny: warnings - stubtest: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v6 - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - components: clippy - override: true - - uses: actions/setup-python@v6 - with: - python-version: 3.x - - name: Install uv - uses: astral-sh/setup-uv@v7 - - id: setup-venv - name: Setup virtualenv - run: python -m venv .venv - - name: Build lib - uses: PyO3/maturin-action@v1 - with: - command: dev --uv - sccache: true - - name: Run stubtest - run: | - set -e - source .venv/bin/activate - stubtest --ignore-disjoint-bases natsrpy pytest: runs-on: ubuntu-latest steps: diff --git a/.gitignore b/.gitignore index 32ce7fc..37199ed 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target +delete-me-* # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e532eb9..1f4d6c5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,7 +18,7 @@ repos: name: python mypy always_run: true pass_filenames: false - args: ["python"] + args: ["python", "examples"] - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.15.7 hooks: diff --git a/Cargo.toml b/Cargo.toml index e16e391..60dec9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ async-nats = "0.46" bytes = "1.11.1" futures-util = "0.3.32" log = "0.4.29" -pyo3 = { version = "0.28", features = ["experimental-inspect"] } +pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] } pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] } pyo3-log = "0.13.3" serde = { version = "1.0.228", features = ["derive"] } diff --git a/examples/callback_subscriptions.py b/examples/callback_subscriptions.py new file mode 100644 index 0000000..8a2bf3a --- /dev/null +++ b/examples/callback_subscriptions.py @@ -0,0 +1,36 @@ +import asyncio + +from natsrpy import Message, Nats + + +async def main() -> None: + """Main function to run the example.""" + nats = Nats(["nats://localhost:4222"]) + await nats.startup() + + async def callback(message: Message) -> None: + print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201 + + # For callback subscriptions you can use detatch method. + # + # This method does the same as __enter__, however since + # it's a callback-based subscription, context managers + # are ususally not needed. + # + # But please save the reference somewhere, since python garbage + # collector might collect your detatched subscription and + # stop receiving any new messages. + cb_sub = await nats.subscribe("cb-subj", callback=callback).detatch() + await cb_sub.unsubscribe(limit=1) + + nats.publish("cb-subj", "message for callback") + + # Waiting for subscriber to read all the messages. + await cb_sub.wait() + + # Don't forget to call shutdown. + await nats.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/consumers.py b/examples/consumers.py index f15a114..57df1c3 100644 --- a/examples/consumers.py +++ b/examples/consumers.py @@ -44,13 +44,13 @@ async def main() -> None: # We use messages() to get async iterator which we # use to get messages for push_consumer. async for push_message in await push_consumer.messages(): - print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201 + print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201 await push_message.ack() break # Pull consumers have to request batches of messages. for pull_message in await pull_consumer.fetch(max_messages=10): - print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201 + print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201 await pull_message.ack() # Cleanup diff --git a/examples/kv.py b/examples/kv.py index 860c6fd..a512a0f 100644 --- a/examples/kv.py +++ b/examples/kv.py @@ -30,8 +30,6 @@ async def main() -> None: await kv.delete("test-key") - # Alternatively you can - # use await watcher.next() async for event in watcher: print("[EVENT]", event) # noqa: T201 break diff --git a/examples/request_reply.py b/examples/request_reply.py index f90b6a4..4ed8f7c 100644 --- a/examples/request_reply.py +++ b/examples/request_reply.py @@ -12,24 +12,22 @@ async def main() -> None: # Here we create responder, that will be # answering to our requests. async def responder(message: Message) -> None: - print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201 + print(f"[REQUEST]: {message.payload!r}, headers={message.headers}") # noqa: T201 if message.reply: await nats.publish( message.reply, - f"reply to {message.payload}", + f"reply to {message.payload!r}", headers=message.headers, ) # Start responder using callback-based subsciption. - sub = await nats.subscribe(subj, callback=responder) - # Send 3 concurrent requests. - responses = await asyncio.gather( - nats.request(subj, "request1"), - nats.request(subj, "request2", headers={"header": "value"}), - nats.request(subj, "request3", inbox="test-inbox"), - ) - # Disconnect resonder. - await sub.drain() + async with nats.subscribe(subj, callback=responder): + # Send 3 concurrent requests. + responses = await asyncio.gather( + nats.request(subj, "request1"), + nats.request(subj, "request2", headers={"header": "value"}), + nats.request(subj, "request3", inbox="test-inbox"), + ) # Iterate over replies. for resp in responses: diff --git a/examples/simple_publish.py b/examples/simple_publish.py index 3ae5005..f79a42e 100644 --- a/examples/simple_publish.py +++ b/examples/simple_publish.py @@ -11,21 +11,20 @@ async def main() -> None: # Here we initiate subscription. # We do it before sending messages, # in order to catch them once we will start reading. - subscription = await nats.subscribe("hello") - - # Publish accepts str | bytes | bytearray | memoryview - await nats.publish("hello", "str world") - await nats.publish("hello", b"bytes world") - await nats.publish("hello", bytearray(b"bytearray world")) - await nats.publish("hello", "headers", headers={"one": "two"}) - await nats.publish("hello", "headers", headers={"one": ["two", "three"]}) - - # Calling this method will unsubscribe us, - # after `n` delivered messages. - # or immediately if `n` is not provided. - subscription.unsubscribe(limit=5) - async for message in subscription: - print(message) # noqa: T201 + async with nats.subscribe("hello") as subscription: + # Publish accepts str | bytes | bytearray | memoryview + await nats.publish("hello", "str world") + await nats.publish("hello", b"bytes world") + await nats.publish("hello", bytearray(b"bytearray world")) + await nats.publish("hello", "headers", headers={"one": "two"}) + await nats.publish("hello", "headers", headers={"one": ["two", "three"]}) + + # Calling this method will unsubscribe us, + # after `n` delivered messages. + # or immediately if `n` is not provided. + subscription.unsubscribe(limit=5) + async for message in subscription: + print(message) # noqa: T201 # Don't forget to call shutdown. await nats.shutdown() diff --git a/examples/subscriptions.py b/examples/subscriptions.py index f01dbb3..b1c34a6 100644 --- a/examples/subscriptions.py +++ b/examples/subscriptions.py @@ -11,42 +11,39 @@ async def main() -> None: cb_lock = asyncio.Event() async def callback(message: Message) -> None: - print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201 + print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201 cb_lock.set() - # When subscribing you can set callback. - # In that case CallbackSubscription is returned. - # This type of subscription cannot be iterated. - cb_sub = await nats.subscribe("cb-subj", callback=callback) - - # When callback is not set, you get a subscription - # that should be used along with `async for` - # loop, or alternatively you can call - # `await iter_sub.next()` to get a single message. - iter_sub = await nats.subscribe("iter-subj") - - # Subscriptions with queue argument create - # subscription with a queue group to distribute - # messages along all subscribers. - queue_sub = await nats.subscribe("queue-subj", queue="example-queue") - - await nats.publish("cb-subj", "message for callback") - await nats.publish("iter-subj", "message for iterator") - await nats.publish("queue-subj", "message for queue sub") - - # We can unsubscribe after a particular amount of messages. - await iter_sub.unsubscribe(limit=1) - await cb_sub.unsubscribe(limit=1) - await queue_sub.unsubscribe(limit=1) - - async for message in iter_sub: - print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201 - - async for message in queue_sub: - print(f"[FROM_QUEUED] {message.payload}") # noqa: T201 - - # Making sure that the message in callback is received. - await cb_lock.wait() + async with ( + # When subscribing you can set callback. + # In that case CallbackSubscription is returned. + # This type of subscription cannot be iterated. + nats.subscribe("cb-subj", callback=callback) as cb_sub, + # When callback is not set, you get a subscription + # that should be used along with `async for` + nats.subscribe("iter-subj") as iter_sub, + # Subscriptions with queue argument create + # subscription with a queue group to distribute + # messages along all subscribers. + nats.subscribe("queue-subj", queue="example-queue") as queue_sub, + ): + await nats.publish("cb-subj", "message for callback") + await nats.publish("iter-subj", "message for iterator") + await nats.publish("queue-subj", "message for queue sub") + + # We can unsubscribe after a particular amount of messages. + await iter_sub.unsubscribe(limit=1) + await cb_sub.unsubscribe(limit=1) + await queue_sub.unsubscribe(limit=1) + + async for message in iter_sub: + print(f"[FROM_ITERATOR] {message.payload!r}") # noqa: T201 + + async for message in queue_sub: + print(f"[FROM_QUEUED] {message.payload!r}") # noqa: T201 + + # Making sure that the message in callback is received. + await cb_lock.wait() # Don't forget to call shutdown. await nats.shutdown() diff --git a/pyproject.toml b/pyproject.toml index ab5f10d..a052598 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,16 @@ dynamic = ["version"] name = "Pavel Kirilin" email = "s3riussan@gmail.com" +[project.entry-points.opentelemetry_instrumentor] +natsrpy = "natsrpy.instrumentation:NatsrpyInstrumentor" + +[project.optional-dependencies] +opentelemetry = [ + "opentelemetry-api (>=1.38.0,<2.0.0)", + "opentelemetry-instrumentation (>=0.59b0,<1)", + "opentelemetry-semantic-conventions (>=0.59b0,<1)", +] + [dependency-groups] dev = [ "anyio>=4,<5", @@ -38,9 +48,6 @@ dev = [ requires = ["maturin>=1.12,<2.0"] build-backend = "maturin" -[tool.uv] -package = false - [tool.maturin] bindings = "pyo3" features = ["pyo3/extension-module"] @@ -55,11 +62,18 @@ packages = ["natsrpy"] pretty = true implicit_reexport = true allow_untyped_decorators = true +namespace_packages = true warn_return_any = false [tool.pytest] anyio_mode = "auto" +[tool.coverage.run] +omit = [ + "python/tests/**/*", + "python/natsrpy/instrumentation/**/*", +] + [tool.ruff] target-version = "py310" exclude = [".venv/"] @@ -128,3 +142,6 @@ ignore-decorators = ["typing.overload"] [tool.ruff.lint.pylint] allow-magic-value-types = ["int", "str", "float"] + +[tool.uv] +package = false diff --git a/python/natsrpy/_natsrpy_rs/__init__.pyi b/python/natsrpy/_natsrpy_rs/__init__.pyi index 1d01c29..4e18482 100644 --- a/python/natsrpy/_natsrpy_rs/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/__init__.pyi @@ -1,12 +1,15 @@ from asyncio import Future from collections.abc import Awaitable, Callable from datetime import timedelta -from typing import Any, final, overload +from types import TracebackType +from typing import Any, Generic, TypeVar, final, overload from typing_extensions import Self from . import exceptions, js +_T = TypeVar("_T") + @final class Message: """ @@ -33,27 +36,12 @@ class Message: description: str | None length: int + def __len__(self) -> int: ... + @final class IteratorSubscription: - """Async iterator subscription for receiving NATS messages. - - Returned by :meth:`Nats.subscribe` when no callback is provided. - Messages can be received using ``async for`` or by calling :meth:`next` - directly. - """ - - def __aiter__(self) -> IteratorSubscription: ... + def __aiter__(self) -> Self: ... def __anext__(self) -> Future[Message]: ... - def next(self, timeout: float | timedelta | None = None) -> Future[Message]: - """Receive the next message from the subscription. - - :param timeout: maximum time to wait for a message in seconds - or as a timedelta, defaults to None (wait indefinitely). - :return: the next message. - :raises StopAsyncIteration: when the subscription is drained or - unsubscribed. - """ - def unsubscribe(self, limit: int | None = None) -> Future[None]: """Unsubscribe from the subject. @@ -88,6 +76,39 @@ class CallbackSubscription: Unsubscribes and flushes any remaining messages before closing. """ + def wait(self) -> Future[None]: + """ + Wait for all messages to be read. + + This method blocks until the subscription + is dropped. Either by the server or + by the client (using unsubscribe). + """ + +@final +class SubscriptionCtxManager(Generic[_T]): + def detatch(self) -> Future[_T]: + """ + Detatch from the context. + + This might be a useful utility for callback + functions. It removes context manager + and lets subscription live on its own. + + But please be aware, that when used + with iterable subscriptions, receved messages + will not be automatically traced using our + built-in instrumentation. + """ + + def __aenter__(self) -> Future[_T]: ... + async def __aexit__( + self, + _exc_type: type[BaseException] | None = None, + _exc_val: BaseException | None = None, + _exc_tb: TracebackType | None = None, + ) -> Future[None]: ... + @final class Nats: """NATS client. @@ -96,6 +117,22 @@ class Nats: access over a connection to one or more NATS servers. """ + @property + def addr(self) -> list[str]: ... + @property + def user_and_pass(self) -> tuple[str, str]: ... + @property + def nkey(self) -> str | None: ... + @property + def token(self) -> str | None: ... + @property + def custom_inbox_prefix(self) -> str | None: ... + @property + def read_buffer_capacity(self) -> int: ... + @property + def sender_capacity(self) -> int: ... + @property + def max_reconnects(self) -> int | None: ... def __new__( cls, /, @@ -207,14 +244,14 @@ class Nats: subject: str, callback: Callable[[Message], Awaitable[None]], queue: str | None = None, - ) -> Future[CallbackSubscription]: ... + ) -> SubscriptionCtxManager[CallbackSubscription]: ... @overload def subscribe( self, subject: str, callback: None = None, queue: str | None = None, - ) -> Future[IteratorSubscription]: ... + ) -> SubscriptionCtxManager[IteratorSubscription]: ... def jetstream( self, *, diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index b921153..33fb782 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -159,6 +159,10 @@ class JetStreamMessage: def token(self) -> str | None: """Authentication token, if applicable.""" + @property + def length(self) -> int: + """Message's payload length.""" + def ack(self, double: bool = False) -> Future[None]: """Acknowledge that a message was handled. @@ -208,3 +212,5 @@ class JetStreamMessage: :param double: whether to wait for server response, defaults to False. """ + + def __len__(self) -> int: ... diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index ee2aa78..9e1c895 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -282,16 +282,6 @@ class MessagesIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[JetStreamMessage]: ... - def next( - self, - timeout: float | timedelta | None = None, - ) -> Future[JetStreamMessage]: - """Receive the next message from the consumer. - - :param timeout: maximum time to wait in seconds or as a timedelta, - defaults to None (wait indefinitely). - :return: the next JetStream message. - """ @final class PushConsumer: diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index 874ea96..0c38e2b 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -90,13 +90,6 @@ class KVEntryIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[KVEntry]: ... - def next(self, timeout: float | timedelta | None = None) -> Future[KVEntry]: - """Receive the next key-value entry. - - :param timeout: maximum time to wait in seconds or as a timedelta, - defaults to None (wait indefinitely). - :return: the next entry. - """ @final class KeysIterator: @@ -104,13 +97,6 @@ class KeysIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[str]: ... - def next(self, timeout: float | timedelta | None = None) -> Future[str]: - """Receive the next key. - - :param timeout: maximum time to wait in seconds or as a timedelta, - defaults to None (wait indefinitely). - :return: the next key name. - """ @final class KVConfig: diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index dd0ba40..11f553b 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -39,18 +39,6 @@ class ConsumersIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[PullConsumer | PushConsumer]: ... - def next( - self, - timeout: float | timedelta | None = None, - ) -> Future[PullConsumer | PushConsumer]: - """Receive the next consumer from the stream. - - :param timeout: maximum time to wait for a message in seconds - or as a timedelta, defaults to None (wait indefinitely). - :return: the next consumer. - :raises StopAsyncIteration: when the subscription is drained or - unsubscribed. - """ @final class ConsumersNamesIterator: @@ -63,15 +51,6 @@ class ConsumersNamesIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[str]: ... - def next(self, timeout: float | timedelta | None = None) -> Future[str]: - """Receive the next consumer name from the stream. - - :param timeout: maximum time to wait for a message in seconds - or as a timedelta, defaults to None (wait indefinitely). - :return: the next consumer name. - :raises StopAsyncIteration: when the subscription is drained or - unsubscribed. - """ @final class StreamsManager: diff --git a/python/natsrpy/_natsrpy_rs/js/object_store.pyi b/python/natsrpy/_natsrpy_rs/js/object_store.pyi index 6498ce3..72f75e3 100644 --- a/python/natsrpy/_natsrpy_rs/js/object_store.pyi +++ b/python/natsrpy/_natsrpy_rs/js/object_store.pyi @@ -102,13 +102,6 @@ class ObjectInfoIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[ObjectInfo]: ... - def next(self, timeout: float | timedelta | None = None) -> Future[ObjectInfo]: - """Receive the next object info entry. - - :param timeout: maximum time to wait in seconds or as a timedelta, - defaults to None (wait indefinitely). - :return: the next object info. - """ @final class ObjectStore: diff --git a/python/natsrpy/instrumentation/__init__.py b/python/natsrpy/instrumentation/__init__.py new file mode 100644 index 0000000..c7d5929 --- /dev/null +++ b/python/natsrpy/instrumentation/__init__.py @@ -0,0 +1,79 @@ +""" +Instrument `natsrpy`_ to trace messages. + +.. natsrpy: https://pypi.org/project/natsrpy/ + +Usage +----- + +* Run instrumented task + +.. code:: python + + import asyncio + + from natsrpy import Nats + from natsrpy.instrumentation import NatsrpyInstrumentor + + NatsrpyInstrumentor().instrument() + + + async def main() -> None: + nats = Nats() + await nats.startup() + await nats.publish("test", b"test") + await nats.shutdown() + + + if __name__ == "__main__": + asyncio.run(main()) + +API +--- +""" + +import logging +from collections.abc import Collection +from importlib import metadata +from typing import Any + +from .nats_core import NatsCoreInstrumentator + +try: + import opentelemetry # noqa: F401 +except ImportError as exc: + raise ImportError( + "Cannot instrument. Please install 'natsrpy[opentelemetry]'.", + ) from exc + +from opentelemetry import trace +from opentelemetry.instrumentation.instrumentor import ( + BaseInstrumentor, +) + +_INSTRUMENTATION_MODULE_NAME = "opentelemetry.instrumentation.natsrpy" + +logger = logging.getLogger("natsrpy.opentelemetry") + + +class NatsrpyInstrumentor(BaseInstrumentor): # type: ignore + """OpenTelemetry instrumentor for Natsrpy.""" + + def __init__(self) -> None: + super().__init__() + + def instrumentation_dependencies(self) -> Collection[str]: + """This function tells which library this instrumentor instruments.""" + return ("natsrpy >= 0.0.0",) + + def _instrument(self, **kwargs: Any) -> None: + tracer_provider = kwargs.get("tracer_provider") + tracer = trace.get_tracer( + _INSTRUMENTATION_MODULE_NAME, + metadata.version("natsrpy"), + tracer_provider, + ) + NatsCoreInstrumentator(tracer).instrument() + + def _uninstrument(self, **kwargs: Any) -> None: + NatsCoreInstrumentator.uninstrument() diff --git a/python/natsrpy/instrumentation/nats_core.py b/python/natsrpy/instrumentation/nats_core.py new file mode 100644 index 0000000..7bdc039 --- /dev/null +++ b/python/natsrpy/instrumentation/nats_core.py @@ -0,0 +1,196 @@ +import contextlib +import sys +from collections.abc import AsyncGenerator, Awaitable, Callable +from functools import wraps +from types import TracebackType +from typing import Any + +from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.utils import is_instrumentation_enabled, unwrap +from opentelemetry.trace import SpanKind, Tracer +from typing_extensions import Self +from wrapt import ObjectProxy, wrap_function_wrapper + +from natsrpy import IteratorSubscription, Message, Nats + +from .span_builder import SpanAction, SpanBuilder + + +class IterableSubscriptionProxy(ObjectProxy): # type: ignore + """Proxy for iterable subscriptions.""" + + def __init__(self, wrapped: IteratorSubscription, tracer: Tracer) -> None: + super().__init__(wrapped) + # Proxy-local attrs should have _self_ prefix + self._self_tracer = tracer + self._self_cancel_ctx: tuple[Any, Any] | None = None + + def __aiter__(self) -> Self: + return self + + def __cancel_ctx__( + self, + typ: type[BaseException] | None = None, + exc: BaseException | None = None, + tb: TracebackType | None = None, + ) -> None: + if self._self_cancel_ctx is None: + return + token, span_ctx = self._self_cancel_ctx + span_ctx.__exit__(typ, exc, tb) + context.detach(token) + self._self_cancel_ctx = None + + async def __anext__(self) -> Any: + # We cancel previous context if any. + self.__cancel_ctx__() + # Getting next message + next_msg: Message = await anext(self.__wrapped__) + if not is_instrumentation_enabled(): + return next_msg + new_ctx = propagate.extract(next_msg.headers) + token = context.attach(new_ctx) + span = ( + SpanBuilder(self._self_tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) + .with_message(next_msg) + .build() + ) + span_ctx = trace.use_span(span, end_on_exit=True) + span_ctx.__enter__() + self._self_cancel_ctx = (token, span_ctx) + + return next_msg + + +class NatsCoreInstrumentator: + """Instrument core nats methods.""" + + def __init__( + self, + tracer: Tracer, + capture_headers: bool = False, + capture_body: bool = False, + ) -> None: + self.tracer = tracer + self.capture_headers = capture_headers + self.capture_body = capture_body + + def instrument(self) -> None: + """Setup otel instrumentation for core Nats.""" + self._instrument_publish() + self._instrument_subscriptions() + + @staticmethod + def uninstrument() -> None: + """Remove instrumentations from core Nats.""" + unwrap(Nats, "publish") + + def _instrument_publish(self) -> None: + def _wrapped_publish( + wrapper: Callable[..., Any], + subject: str, + payload: bytes | str | bytearray | memoryview, + *, + headers: dict[str, Any] | None = None, + **kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return wrapper( + subject, + payload, + headers=headers, + **kwargs, + ) + span = ( + SpanBuilder(self.tracer, SpanKind.PRODUCER, SpanAction.PUBLISH) + .with_subject(subject) + .with_payload(payload) + .build() + ) + headers = headers or {} + with trace.use_span(span, end_on_exit=True): + propagate.inject(headers) + return wrapper( + subject, + payload, + headers=headers, + **kwargs, + ) + + def _publish_decorator( + wrapper: Any, + _: Nats, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped_publish(wrapper, *args, **kwargs) + + wrap_function_wrapper("natsrpy._natsrpy_rs", "Nats.publish", _publish_decorator) + + def _instrument_subscriptions(self) -> None: # noqa: C901 + """Create instrumentation for.""" + + def callback_wrapper( + cb: Callable[[Message], Awaitable[None]], + ) -> Callable[[Message], Awaitable[None]]: + """ + Custom decorator around callback functions. + + Generated callback creates span on message + receive and ends span when callback function finishes. + """ + + @wraps(cb) + async def _fixed_cb(message: Message) -> None: + """Fixed callback function.""" + if not is_instrumentation_enabled(): + await cb(message) + ctx = propagate.extract(message.headers) + token = context.attach(ctx) + span = ( + SpanBuilder(self.tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) + .with_message(message) + .build() + ) + try: + with trace.use_span(span, end_on_exit=True): + await cb(message) + finally: + context.detach(token) + + return _fixed_cb + + def process_args( + subject: str, + callback: Callable[[Message], Awaitable[None]] | None = None, + queue: str | None = None, + ) -> tuple[Any, ...]: + """Function to wrap callback when it's passed as subscribe argument.""" + if callback: + callback = callback_wrapper(callback) + return (subject, callback, queue) + + @contextlib.asynccontextmanager + async def wrapper( + wrapper: Any, + _: Nats, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> AsyncGenerator[Any, None]: + + async with wrapper(*process_args(*args, **kwargs)) as original_sub: + if isinstance(original_sub, IteratorSubscription): + ret = IterableSubscriptionProxy(original_sub, self.tracer) + else: + ret = original_sub + try: + yield ret + except BaseException: + if isinstance(ret, IterableSubscriptionProxy): + ret.__cancel_ctx__(*sys.exc_info()) + raise + finally: + if isinstance(ret, IterableSubscriptionProxy): + ret.__cancel_ctx__() + + wrap_function_wrapper("natsrpy._natsrpy_rs", "Nats.subscribe", wrapper) diff --git a/python/natsrpy/instrumentation/span_builder.py b/python/natsrpy/instrumentation/span_builder.py new file mode 100644 index 0000000..baf69c2 --- /dev/null +++ b/python/natsrpy/instrumentation/span_builder.py @@ -0,0 +1,69 @@ +import enum +from typing import Any + +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_DESTINATION_NAME, + MESSAGING_MESSAGE_BODY_SIZE, + MESSAGING_MESSAGE_ID, + MESSAGING_SYSTEM, +) +from opentelemetry.trace import Span, SpanKind, Tracer +from typing_extensions import Self + +from natsrpy import Message +from natsrpy.js import JetStreamMessage + +DEFAULT_ATTRS = {MESSAGING_SYSTEM: "nats"} + + +@enum.unique +class SpanAction(enum.Enum): + """Available span actions.""" + + PUBLISH = "publish" + RECEIVE = "receive" + + +class SpanBuilder: + """Helper class for span creation.""" + + def __init__(self, tracer: Tracer, kind: SpanKind, action: SpanAction) -> None: + self.tracer = tracer + self.attributes: dict[str, Any] = DEFAULT_ATTRS.copy() + self.kind = kind + self.action = action + + def with_subject(self, subject: str) -> Self: + """Set message subject.""" + self.attributes[MESSAGING_DESTINATION_NAME] = subject + return self + + def with_payload(self, payload: Any) -> Self: + """Set payload-related attributes.""" + self.attributes[MESSAGING_MESSAGE_BODY_SIZE] = len(payload) + return self + + def with_message_id(self, message_id: int) -> Self: + """Set message id.""" + self.attributes[MESSAGING_MESSAGE_ID] = message_id + return self + + def with_message(self, msg: Message) -> Self: + """Add message-related attributes.""" + return self.with_subject(msg.subject).with_payload(msg.payload) + + def with_js_message(self, msg: JetStreamMessage) -> Self: + """Add message-related attributes in JS context.""" + return ( + self.with_subject(msg.subject) + .with_payload(msg.payload) + .with_message_id(msg.stream_sequence) + ) + + def build(self) -> Span: + """Build resulting span.""" + return self.tracer.start_span( + self.action.value.lower(), + kind=self.kind, + attributes=self.attributes, + ) diff --git a/python/tests/test_callback_subscription.py b/python/tests/test_callback_subscription.py index 276fea6..b945348 100644 --- a/python/tests/test_callback_subscription.py +++ b/python/tests/test_callback_subscription.py @@ -1,18 +1,17 @@ import asyncio import uuid -from natsrpy import CallbackSubscription, Nats +from natsrpy import CallbackSubscription, Message, Nats async def test_callback_unsubscribe(nats: Nats) -> None: subj = uuid.uuid4().hex - async def callback(msg: object) -> None: - pass + async def callback(_: Message) -> None: ... - sub = await nats.subscribe(subject=subj, callback=callback) - assert isinstance(sub, CallbackSubscription) - await sub.unsubscribe() + async with nats.subscribe(subject=subj, callback=callback) as sub: + assert isinstance(sub, CallbackSubscription) + await sub.unsubscribe() async def test_callback_unsubscribe_with_limit(nats: Nats) -> None: @@ -20,18 +19,18 @@ async def test_callback_unsubscribe_with_limit(nats: Nats) -> None: received: list[bytes] = [] event = asyncio.Event() - async def callback(msg: object) -> None: - received.append(msg.payload) # type: ignore[attr-defined] + async def callback(msg: Message) -> None: + received.append(msg.payload) if len(received) >= 2: event.set() - sub = await nats.subscribe(subject=subj, callback=callback) - assert isinstance(sub, CallbackSubscription) - await sub.unsubscribe(limit=2) - await nats.publish(subj, b"msg-1") - await nats.publish(subj, b"msg-2") - await asyncio.wait_for(event.wait(), timeout=5.0) - assert set(received) == {b"msg-1", b"msg-2"} + async with nats.subscribe(subject=subj, callback=callback) as sub: + assert isinstance(sub, CallbackSubscription) + await sub.unsubscribe(limit=2) + await nats.publish(subj, b"msg-1") + await nats.publish(subj, b"msg-2") + await asyncio.wait_for(event.wait(), timeout=5.0) + assert set(received) == {b"msg-1", b"msg-2"} async def test_callback_drain(nats_url: str) -> None: @@ -39,10 +38,26 @@ async def test_callback_drain(nats_url: str) -> None: await client.startup() subj = uuid.uuid4().hex - async def callback(msg: object) -> None: - pass + async def callback(_: object) -> None: ... - sub = await client.subscribe(subject=subj, callback=callback) - assert isinstance(sub, CallbackSubscription) - await sub.drain() + async with client.subscribe(subject=subj, callback=callback) as sub: + assert isinstance(sub, CallbackSubscription) + await sub.drain() await client.shutdown() + + +async def test_callback_wait_method(nats: Nats) -> None: + subj = uuid.uuid4().hex + received: list[bytes] = [] + limit = 10 + + async def callback(msg: Message) -> None: + received.append(msg.payload) + + async with nats.subscribe(subject=subj, callback=callback) as sub: + assert isinstance(sub, CallbackSubscription) + await sub.unsubscribe(limit=limit) + for _ in range(limit): + await nats.publish(subj, b"msg-1") + await asyncio.wait_for(sub.wait(), timeout=5.0) + assert len(received) == limit diff --git a/python/tests/test_consumers.py b/python/tests/test_consumers.py index 3d6d78f..1be1c36 100644 --- a/python/tests/test_consumers.py +++ b/python/tests/test_consumers.py @@ -1,3 +1,4 @@ +import asyncio import uuid from natsrpy.js import ( @@ -264,7 +265,7 @@ async def test_push_consumer_messages(js: JetStream) -> None: consumer = await stream.consumers.create(consumer_config) msgs_iter = await consumer.messages() for message in messages: - nats_msg = await msgs_iter.next(timeout=0.5) + nats_msg = await asyncio.wait_for(anext(msgs_iter), timeout=0.5) assert message == nats_msg.payload finally: await js.streams.delete(stream_name) diff --git a/python/tests/test_kv.py b/python/tests/test_kv.py index 55d7f35..7b02b52 100644 --- a/python/tests/test_kv.py +++ b/python/tests/test_kv.py @@ -1,3 +1,4 @@ +import asyncio import uuid from datetime import datetime, timedelta @@ -519,7 +520,7 @@ async def test_kv_keys_iterator_next_with_timeout(js: JetStream) -> None: try: await kv.put("k1", b"v1") keys_iter = await kv.keys() - key = await keys_iter.next(timeout=5.0) + key = await asyncio.wait_for(anext(keys_iter), timeout=0.5) assert isinstance(key, str) assert key == "k1" finally: @@ -533,7 +534,7 @@ async def test_kv_keys_iterator_next_timeout_timedelta(js: JetStream) -> None: try: await kv.put("k1", b"v1") keys_iter = await kv.keys() - key = await keys_iter.next(timeout=timedelta(seconds=5)) + key = await asyncio.wait_for(anext(keys_iter), timeout=0.5) assert isinstance(key, str) assert key == "k1" finally: @@ -579,7 +580,7 @@ async def test_kv_history_iterator_next_with_timeout(js: JetStream) -> None: try: await kv.put("key1", b"value") history_iter = await kv.history("key1") - entry = await history_iter.next(timeout=5.0) + entry = await asyncio.wait_for(anext(history_iter), timeout=0.5) assert isinstance(entry, KVEntry) assert bytes(entry.value) == b"value" finally: @@ -593,7 +594,7 @@ async def test_kv_history_iterator_next_with_timedelta(js: JetStream) -> None: try: await kv.put("key1", b"value") history_iter = await kv.history("key1") - entry = await history_iter.next(timeout=timedelta(seconds=5)) + entry = await asyncio.wait_for(anext(history_iter), timeout=0.5) assert isinstance(entry, KVEntry) assert bytes(entry.value) == b"value" finally: @@ -615,7 +616,7 @@ async def test_kv_watch_all(js: JetStream) -> None: await kv.put("w1", b"val1") - entry = await watcher.next(timeout=5.0) + entry = await asyncio.wait_for(anext(watcher), timeout=0.5) assert isinstance(entry, KVEntry) assert entry.key == "w1" assert bytes(entry.value) == b"val1" @@ -634,7 +635,7 @@ async def test_kv_watch_all_from_revision(js: JetStream) -> None: watcher = await kv.watch_all(from_revision=rev1) assert isinstance(watcher, KVEntryIterator) - entry1 = await watcher.next(timeout=5.0) + entry1 = await asyncio.wait_for(anext(watcher), timeout=0.5) assert isinstance(entry1, KVEntry) finally: await js.kv.delete(bucket) @@ -647,7 +648,7 @@ async def test_kv_watch_all_timeout(js: JetStream) -> None: try: watcher = await kv.watch_all() with pytest.raises(TimeoutError): - await watcher.next(timeout=0.1) + await asyncio.wait_for(anext(watcher), timeout=0.1) finally: await js.kv.delete(bucket) @@ -667,7 +668,7 @@ async def test_kv_watch(js: JetStream) -> None: await kv.put("watched-key", b"watch-val") - entry = await watcher.next(timeout=5.0) + entry = await asyncio.wait_for(anext(watcher), timeout=0.5) assert isinstance(entry, KVEntry) assert entry.key == "watched-key" assert bytes(entry.value) == b"watch-val" @@ -686,7 +687,7 @@ async def test_kv_watch_from_revision(js: JetStream) -> None: watcher = await kv.watch("wkey", from_revision=rev1) assert isinstance(watcher, KVEntryIterator) - entry = await watcher.next(timeout=5.0) + entry = await asyncio.wait_for(anext(watcher), timeout=5.0) assert isinstance(entry, KVEntry) finally: await js.kv.delete(bucket) @@ -699,7 +700,7 @@ async def test_kv_watch_timeout(js: JetStream) -> None: try: watcher = await kv.watch("nonexistent") with pytest.raises(TimeoutError): - await watcher.next(timeout=0.1) + await asyncio.wait_for(anext(watcher), timeout=0.1) finally: await js.kv.delete(bucket) @@ -722,12 +723,12 @@ async def test_kv_watch_with_history(js: JetStream) -> None: watcher = await kv.watch_with_history("hkey") assert isinstance(watcher, KVEntryIterator) - entry = await watcher.next(timeout=5.0) + entry = await asyncio.wait_for(anext(watcher), timeout=5.0) assert bytes(entry.value) == b"h2" # Further puts are also received await kv.put("hkey", b"h3") - entry_new = await watcher.next(timeout=5.0) + entry_new = await asyncio.wait_for(anext(watcher), timeout=5.0) assert bytes(entry_new.value) == b"h3" finally: await js.kv.delete(bucket) @@ -748,7 +749,7 @@ async def test_kv_watch_many(js: JetStream) -> None: await kv.put("mk1", b"val1") - entry = await watcher.next(timeout=5.0) + entry = await asyncio.wait_for(anext(watcher), timeout=5.0) assert isinstance(entry, KVEntry) assert entry.key == "mk1" assert bytes(entry.value) == b"val1" @@ -767,8 +768,8 @@ async def test_kv_watch_many_with_history(js: JetStream) -> None: watcher = await kv.watch_many_with_history(["mk1", "mk2"]) assert isinstance(watcher, KVEntryIterator) - entry1 = await watcher.next(timeout=5.0) - entry2 = await watcher.next(timeout=5.0) + entry1 = await asyncio.wait_for(anext(watcher), timeout=5.0) + entry2 = await asyncio.wait_for(anext(watcher), timeout=5.0) assert isinstance(entry1, KVEntry) assert isinstance(entry2, KVEntry) collected_keys = {entry1.key, entry2.key} diff --git a/python/tests/test_message.py b/python/tests/test_message.py index cf1b245..b151940 100644 --- a/python/tests/test_message.py +++ b/python/tests/test_message.py @@ -5,89 +5,89 @@ async def test_message_subject(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"subject-test") - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"subject-test") + msg = await anext(sub) assert msg.subject == subj async def test_message_payload(nats: Nats) -> None: subj = uuid.uuid4().hex payload = b"payload-test-data" - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + msg = await anext(sub) assert msg.payload == payload assert isinstance(msg.payload, bytes) async def test_message_headers_empty(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"no-headers") - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"no-headers") + msg = await anext(sub) assert msg.headers == {} async def test_message_headers_string(nats: Nats) -> None: subj = uuid.uuid4().hex headers = {"content-type": "application/json", "x-id": "12345"} - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"with-headers", headers=headers) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"with-headers", headers=headers) + msg = await anext(sub) assert msg.headers == headers async def test_message_headers_multi_value(nats: Nats) -> None: subj = uuid.uuid4().hex headers = {"x-values": ["a", "b", "c"]} - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"multi-headers", headers=headers) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"multi-headers", headers=headers) + msg = await anext(sub) assert msg.headers == headers async def test_message_reply_present(nats: Nats) -> None: subj = uuid.uuid4().hex reply_to = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"reply-test", reply=reply_to) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"reply-test", reply=reply_to) + msg = await anext(sub) assert msg.reply == reply_to async def test_message_reply_absent(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"no-reply") - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"no-reply") + msg = await anext(sub) assert msg.reply is None async def test_message_length(nats: Nats) -> None: subj = uuid.uuid4().hex payload = b"length-check" - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + msg = await anext(sub) # length is the total message length (includes subject + overhead), not just payload assert msg.length >= len(payload) async def test_message_length_empty(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"") - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"") + msg = await anext(sub) # Even with empty payload, length includes overhead assert msg.length >= 0 async def test_message_repr(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"repr-test") - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"repr-test") + msg = await anext(sub) r = repr(msg) assert isinstance(r, str) assert len(r) > 0 @@ -96,8 +96,8 @@ async def test_message_repr(nats: Nats) -> None: async def test_message_large_payload(nats: Nats) -> None: subj = uuid.uuid4().hex payload = b"x" * 65536 - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + msg = await anext(sub) assert msg.payload == payload assert msg.length >= 65536 diff --git a/python/tests/test_nats_client.py b/python/tests/test_nats_client.py index 7b14460..47bacc4 100644 --- a/python/tests/test_nats_client.py +++ b/python/tests/test_nats_client.py @@ -19,18 +19,16 @@ async def test_nats_custom_addrs() -> None: async def test_nats_flush(nats: Nats) -> None: subj = uuid.uuid4().hex payload = b"flush-test" - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - await nats.flush() - message = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + await nats.flush() + message = await anext(sub) assert message.payload == payload async def test_nats_drain(nats_url: str) -> None: client = Nats(addrs=[nats_url]) await client.startup() - subj = uuid.uuid4().hex - await client.subscribe(subject=subj) await client.drain() @@ -39,9 +37,9 @@ async def test_nats_startup_shutdown_cycle(nats_url: str) -> None: await client.startup() subj = uuid.uuid4().hex payload = b"cycle-test" - sub = await client.subscribe(subject=subj) - await client.publish(subj, payload) - message = await anext(sub) + async with client.subscribe(subject=subj) as sub: + await client.publish(subj, payload) + message = await anext(sub) assert message.payload == payload await client.shutdown() @@ -54,9 +52,9 @@ async def test_nats_multiple_connections(nats_url: str) -> None: subj = uuid.uuid4().hex payload = b"cross-client" - sub = await client2.subscribe(subject=subj) - await client1.publish(subj, payload) - message = await anext(sub) + async with client2.subscribe(subject=subj) as sub: + await client1.publish(subj, payload) + message = await anext(sub) assert message.payload == payload await client1.shutdown() @@ -66,26 +64,26 @@ async def test_nats_multiple_connections(nats_url: str) -> None: async def test_nats_publish_str_payload(nats: Nats) -> None: subj = uuid.uuid4().hex payload_str = "hello-string" - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload_str) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload_str) message = await anext(sub) assert message.payload == payload_str.encode() async def test_nats_publish_empty_payload(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"") - message = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"") + message = await anext(sub) assert message.payload == b"" async def test_nats_publish_with_reply(nats: Nats) -> None: subj = uuid.uuid4().hex reply_subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"with-reply", reply=reply_subj) - message = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"with-reply", reply=reply_subj) + message = await anext(sub) assert message.payload == b"with-reply" assert message.reply == reply_subj diff --git a/python/tests/test_object_store.py b/python/tests/test_object_store.py index fb9cbcb..e32d2f7 100644 --- a/python/tests/test_object_store.py +++ b/python/tests/test_object_store.py @@ -1,3 +1,4 @@ +import asyncio import io import tempfile import uuid @@ -335,7 +336,7 @@ async def test_object_store_list_iterator_next(js: JetStream) -> None: await store.put("next-obj", b"data") iterator = await store.list() - info = await iterator.next(timeout=5.0) + info = await asyncio.wait_for(anext(iterator), timeout=5.0) assert isinstance(info, ObjectInfo) assert info.name == "next-obj" finally: @@ -352,7 +353,7 @@ async def test_object_store_watch(js: JetStream) -> None: await store.put("watch-obj", b"watch-data") - info = await watcher.next(timeout=5.0) + info = await asyncio.wait_for(anext(watcher), timeout=5.0) assert isinstance(info, ObjectInfo) assert info.name == "watch-obj" finally: @@ -368,8 +369,8 @@ async def test_object_store_watch_with_history(js: JetStream) -> None: await store.put("hist-obj-2", b"data2") watcher = await store.watch(with_history=True) - info1 = await watcher.next(timeout=5.0) - info2 = await watcher.next(timeout=5.0) + info1 = await asyncio.wait_for(anext(watcher), timeout=5.0) + info2 = await asyncio.wait_for(anext(watcher), timeout=5.0) assert {info1.name, info2.name} == {"hist-obj-1", "hist-obj-2"} finally: await js.object_store.delete(bucket) @@ -386,7 +387,7 @@ async def test_object_store_watch_without_history(js: JetStream) -> None: await store.put("new-obj", b"new-data") - info = await watcher.next(timeout=5.0) + info = await asyncio.wait_for(anext(watcher), timeout=5.0) assert info.name == "new-obj" finally: await js.object_store.delete(bucket) @@ -688,12 +689,12 @@ async def test_object_store_watch_delete_event(js: JetStream) -> None: watcher = await store.watch() await store.put("del-watch-obj", b"data") - info_put = await watcher.next(timeout=5.0) + info_put = await asyncio.wait_for(anext(watcher), timeout=5.0) assert info_put.name == "del-watch-obj" assert info_put.deleted is False await store.delete("del-watch-obj") - info_del = await watcher.next(timeout=5.0) + info_del = await asyncio.wait_for(anext(watcher), timeout=5.0) assert info_del.name == "del-watch-obj" assert info_del.deleted is True finally: @@ -707,19 +708,7 @@ async def test_object_store_watch_timeout(js: JetStream) -> None: try: watcher = await store.watch() with pytest.raises(TimeoutError): - await watcher.next(timeout=0.1) - finally: - await js.object_store.delete(bucket) - - -async def test_object_store_watch_timeout_timedelta(js: JetStream) -> None: - bucket = f"test-os-watchtdelta-{uuid.uuid4().hex[:8]}" - config = ObjectStoreConfig(bucket=bucket) - store = await js.object_store.create(config) - try: - watcher = await store.watch() - with pytest.raises(TimeoutError): - await watcher.next(timeout=timedelta(milliseconds=100)) + await asyncio.wait_for(anext(watcher), timeout=0.1) finally: await js.object_store.delete(bucket) @@ -828,7 +817,7 @@ async def test_object_store_watch_multiple_events(js: JetStream) -> None: names = set() for _ in range(3): - info = await watcher.next(timeout=5.0) + info = await asyncio.wait_for(anext(watcher), timeout=5.0) names.add(info.name) assert names == {"ev-1", "ev-2", "ev-3"} finally: diff --git a/python/tests/test_publication_and_extras.py b/python/tests/test_publication_and_extras.py index e16e18a..ad16ec6 100644 --- a/python/tests/test_publication_and_extras.py +++ b/python/tests/test_publication_and_extras.py @@ -1,3 +1,4 @@ +import asyncio import uuid from natsrpy import Nats @@ -37,9 +38,9 @@ async def test_publication_value_none(js: JetStream) -> None: async def test_subscribe_with_queue_group(nats: Nats) -> None: subj = uuid.uuid4().hex queue = f"queue-{uuid.uuid4().hex[:8]}" - sub = await nats.subscribe(subject=subj, queue=queue) - await nats.publish(subj, b"queue-msg") - msg = await sub.next(timeout=5.0) + async with nats.subscribe(subject=subj, queue=queue) as sub: + await nats.publish(subj, b"queue-msg") + msg = await asyncio.wait_for(anext(sub), timeout=5.0) assert msg.payload == b"queue-msg" await sub.unsubscribe() @@ -47,9 +48,9 @@ async def test_subscribe_with_queue_group(nats: Nats) -> None: async def test_nats_publish_bytearray(nats: Nats) -> None: subj = uuid.uuid4().hex payload = bytearray(b"bytearray-payload") - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + msg = await anext(sub) assert msg.payload == bytes(payload) @@ -57,7 +58,7 @@ async def test_nats_publish_memoryview(nats: Nats) -> None: subj = uuid.uuid4().hex data = b"memoryview-payload" payload = memoryview(data) - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - msg = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + msg = await anext(sub) assert msg.payload == data diff --git a/python/tests/test_publish.py b/python/tests/test_publish.py index 7454fb1..d58df80 100644 --- a/python/tests/test_publish.py +++ b/python/tests/test_publish.py @@ -8,9 +8,9 @@ async def test_publish_simple(nats: Nats) -> None: subj = uuid.uuid4().hex payload = uuid.uuid4().hex.encode() - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload) - message = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload) + message = await anext(sub) assert message.payload == payload @@ -24,8 +24,8 @@ async def test_publish_simple(nats: Nats) -> None: async def test_publis_headers(nats: Nats, headers: dict[str, Any]) -> None: subj = uuid.uuid4().hex payload = uuid.uuid4().hex.encode() - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, payload, headers=headers) - message = await anext(sub) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, payload, headers=headers) + message = await anext(sub) assert message.payload == payload assert message.headers == headers diff --git a/python/tests/test_request_reply.py b/python/tests/test_request_reply.py index e247420..c99e8e6 100644 --- a/python/tests/test_request_reply.py +++ b/python/tests/test_request_reply.py @@ -10,11 +10,11 @@ async def test_request_sends_with_reply(nats: Nats) -> None: received_msgs: list[Message] = [] async def responder() -> None: - sub = await nats.subscribe(subject=subj) - msg = await anext(sub) - received_msgs.append(msg) - if msg.reply: - await nats.publish(msg.reply, b"reply-data") + async with nats.subscribe(subject=subj) as sub: + msg = await anext(sub) + received_msgs.append(msg) + if msg.reply: + await nats.publish(msg.reply, b"reply-data") task = asyncio.create_task(responder()) await asyncio.sleep(0.1) @@ -34,11 +34,11 @@ async def test_request_with_headers(nats: Nats) -> None: received_msgs: list[Message] = [] async def responder() -> None: - sub = await nats.subscribe(subject=subj) - msg = await anext(sub) - received_msgs.append(msg) - if msg.reply: - await nats.publish(msg.reply, b"reply") + async with nats.subscribe(subject=subj) as sub: + msg = await anext(sub) + received_msgs.append(msg) + if msg.reply: + await nats.publish(msg.reply, b"reply") task = asyncio.create_task(responder()) await asyncio.sleep(0.1) @@ -55,11 +55,11 @@ async def test_request_none_payload(nats: Nats) -> None: received_msgs: list[Message] = [] async def responder() -> None: - sub = await nats.subscribe(subject=subj) - msg = await anext(sub) - received_msgs.append(msg) - if msg.reply: - await nats.publish(msg.reply, b"reply") + async with nats.subscribe(subject=subj) as sub: + msg = await anext(sub) + received_msgs.append(msg) + if msg.reply: + await nats.publish(msg.reply, b"reply") task = asyncio.create_task(responder()) await asyncio.sleep(0.1) diff --git a/python/tests/test_subscriptions.py b/python/tests/test_subscriptions.py index 01f4b1f..7aa37bd 100644 --- a/python/tests/test_subscriptions.py +++ b/python/tests/test_subscriptions.py @@ -1,13 +1,14 @@ import asyncio import uuid -from natsrpy import CallbackSubscription, IteratorSubscription, Nats +import pytest +from natsrpy import CallbackSubscription, IteratorSubscription, Message, Nats async def test_subscribe_returns_iterator(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - assert isinstance(sub, IteratorSubscription) + async with nats.subscribe(subject=subj) as sub: + assert isinstance(sub, IteratorSubscription) async def test_subscribe_with_callback(nats: Nats) -> None: @@ -15,64 +16,66 @@ async def test_subscribe_with_callback(nats: Nats) -> None: received: list[bytes] = [] event = asyncio.Event() - async def callback(msg: object) -> None: - received.append(msg.payload) # type: ignore[attr-defined] + async def callback(msg: Message) -> None: + received.append(msg.payload) event.set() - sub = await nats.subscribe(subject=subj, callback=callback) - assert isinstance(sub, CallbackSubscription) - await nats.publish(subj, b"callback-test") - await asyncio.wait_for(event.wait(), timeout=5.0) + async with nats.subscribe(subject=subj, callback=callback) as sub: + assert isinstance(sub, CallbackSubscription) + await nats.publish(subj, b"callback-test") + await asyncio.wait_for(event.wait(), timeout=5.0) assert received == [b"callback-test"] async def test_iterator_next_with_timeout(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await nats.publish(subj, b"timeout-test") - message = await sub.next(timeout=5.0) + async with nats.subscribe(subject=subj) as sub: + await nats.publish(subj, b"timeout-test") + message = await asyncio.wait_for(anext(sub), timeout=5.0) assert message.payload == b"timeout-test" async def test_iterator_aiter_protocol(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - payloads = [f"msg-{i}".encode() for i in range(3)] - for p in payloads: - await nats.publish(subj, p) - - received = [] - async for msg in sub: - received.append(msg.payload) - if len(received) == 3: - break + async with nats.subscribe(subject=subj) as sub: + payloads = [f"msg-{i}".encode() for i in range(3)] + for p in payloads: + await nats.publish(subj, p) + + received = [] + async for msg in sub: + received.append(msg.payload) + if len(received) == 3: + break assert received == payloads async def test_iterator_unsubscribe(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await sub.unsubscribe() + async with nats.subscribe(subject=subj) as sub: + await sub.unsubscribe() async def test_iterator_unsubscribe_with_limit(nats: Nats) -> None: subj = uuid.uuid4().hex - sub = await nats.subscribe(subject=subj) - await sub.unsubscribe(limit=2) - await nats.publish(subj, b"msg-1") - await nats.publish(subj, b"msg-2") - msg1 = await sub.next(timeout=5.0) - msg2 = await sub.next(timeout=5.0) - assert msg1.payload == b"msg-1" - assert msg2.payload == b"msg-2" + async with nats.subscribe(subject=subj) as sub: + await sub.unsubscribe(limit=2) + await nats.publish(subj, b"msg-1") + await nats.publish(subj, b"msg-2") + msg1 = await asyncio.wait_for(anext(sub), timeout=5.0) + msg2 = await asyncio.wait_for(anext(sub), timeout=5.0) + assert msg1.payload == b"msg-1" + assert msg2.payload == b"msg-2" + with pytest.raises(StopAsyncIteration): + await asyncio.wait_for(anext(sub), timeout=5.0) async def test_iterator_drain(nats_url: str) -> None: client = Nats(addrs=[nats_url]) await client.startup() subj = uuid.uuid4().hex - sub = await client.subscribe(subject=subj) - await sub.drain() + async with client.subscribe(subject=subj) as sub: + await sub.drain() await client.shutdown() @@ -80,41 +83,43 @@ async def test_callback_receives_message(nats: Nats) -> None: subj = uuid.uuid4().hex event = asyncio.Event() - async def callback(msg: object) -> None: + async def callback(_: Message) -> None: event.set() - sub = await nats.subscribe(subject=subj, callback=callback) - assert isinstance(sub, CallbackSubscription) - await nats.publish(subj, b"trigger") - await asyncio.wait_for(event.wait(), timeout=5.0) + async with nats.subscribe(subject=subj, callback=callback) as sub: + assert isinstance(sub, CallbackSubscription) + await nats.publish(subj, b"trigger") + await asyncio.wait_for(event.wait(), timeout=5.0) async def test_multiple_subscribers(nats: Nats) -> None: subj = uuid.uuid4().hex - sub1 = await nats.subscribe(subject=subj) - sub2 = await nats.subscribe(subject=subj) - await nats.publish(subj, b"multi-sub") - msg1 = await anext(sub1) - msg2 = await anext(sub2) - assert msg1.payload == b"multi-sub" - assert msg2.payload == b"multi-sub" + async with ( + nats.subscribe(subject=subj) as sub1, + nats.subscribe(subject=subj) as sub2, + ): + await nats.publish(subj, b"multi-sub") + msg1 = await anext(sub1) + msg2 = await anext(sub2) + assert msg1.payload == b"multi-sub" + assert msg2.payload == b"multi-sub" async def test_wildcard_subscription(nats: Nats) -> None: prefix = uuid.uuid4().hex - sub = await nats.subscribe(subject=f"{prefix}.*") - await nats.publish(f"{prefix}.one", b"wildcard-1") - await nats.publish(f"{prefix}.two", b"wildcard-2") - msg1 = await anext(sub) - msg2 = await anext(sub) - assert msg1.payload == b"wildcard-1" - assert msg2.payload == b"wildcard-2" + async with nats.subscribe(subject=f"{prefix}.*") as sub: + await nats.publish(f"{prefix}.one", b"wildcard-1") + await nats.publish(f"{prefix}.two", b"wildcard-2") + msg1 = await anext(sub) + msg2 = await anext(sub) + assert msg1.payload == b"wildcard-1" + assert msg2.payload == b"wildcard-2" async def test_fullwild_subscription(nats: Nats) -> None: prefix = uuid.uuid4().hex - sub = await nats.subscribe(subject=f"{prefix}.>") - await nats.publish(f"{prefix}.a.b.c", b"full-wild") - msg = await anext(sub) - assert msg.payload == b"full-wild" - assert msg.subject == f"{prefix}.a.b.c" + async with nats.subscribe(subject=f"{prefix}.>") as sub: + await nats.publish(f"{prefix}.a.b.c", b"full-wild") + msg = await anext(sub) + assert msg.payload == b"full-wild" + assert msg.subject == f"{prefix}.a.b.c" diff --git a/scripts/stubtest.py b/scripts/stubtest.py index 748ca91..f583724 100644 --- a/scripts/stubtest.py +++ b/scripts/stubtest.py @@ -8,7 +8,11 @@ def main(): subprocess.run(["maturin", "dev", "--uv"], cwd=ROOT_DIR, check=True) - os.execvpe("stubtest", ["--ignore-disjoint-bases", "natsrpy"], env=os.environ) + os.execvpe( + "stubtest", + ["--ignore-disjoint-bases", "--ignore-missing-stub", "natsrpy._natsrpy_rs"], + env=os.environ, + ) if __name__ == "__main__": diff --git a/src/js/consumers/push/consumer.rs b/src/js/consumers/push/consumer.rs index 150fd6e..862a491 100644 --- a/src/js/consumers/push/consumer.rs +++ b/src/js/consumers/push/consumer.rs @@ -6,7 +6,7 @@ use pyo3::{Bound, PyAny, PyRef, Python}; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::pymod::JetStreamMessage, - utils::{futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue}, + utils::natsrpy_future, }; type NatsPushConsumer = @@ -73,17 +73,12 @@ impl MessagesIterator { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let Some(messages_guard) = self.messages.clone() else { unreachable!("Message is always Some in runtime.") }; #[allow(clippy::significant_drop_tightening)] - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let mut messages = messages_guard.lock().await; let Some(message) = messages.next().await else { return Err(NatsrpyError::AsyncStopIteration); @@ -93,10 +88,6 @@ impl MessagesIterator { JetStreamMessage::try_from(message) }) } - - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } } impl Drop for MessagesIterator { diff --git a/src/js/kv.rs b/src/js/kv.rs index 07500ef..ac4e671 100644 --- a/src/js/kv.rs +++ b/src/js/kv.rs @@ -3,7 +3,6 @@ use std::{sync::Arc, time::Duration}; use crate::{ js::{self, stream::StreamInfo}, utils::{ - futures::natsrpy_future_with_timeout, py_types::{SendableValue, TimeValue, ToPyDate}, streamer::Streamer, }, @@ -485,14 +484,9 @@ impl KeysIterator { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let ctx = self.streamer.clone(); - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let value = ctx.lock().await.next().await; match value { Some(entry) => Ok(entry?), @@ -500,10 +494,6 @@ impl KeysIterator { } }) } - - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } } #[pyo3::pyclass] @@ -537,14 +527,9 @@ impl KVEntryIterator { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let ctx = self.streamer.clone(); - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let value = ctx.lock().await.next().await; match value { Some(entry) => KVEntry::try_from(entry?), @@ -552,10 +537,6 @@ impl KVEntryIterator { } }) } - - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } } #[pyo3::pymodule(submodule, name = "kv")] diff --git a/src/js/managers/consumers.rs b/src/js/managers/consumers.rs index 334318a..f5b16c2 100644 --- a/src/js/managers/consumers.rs +++ b/src/js/managers/consumers.rs @@ -7,10 +7,7 @@ use tokio::sync::Mutex; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::consumers::{self, pull::PullConsumer, push::PushConsumer}, - utils::{ - futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue, - streamer::Streamer, - }, + utils::{natsrpy_future, py_types::TimeValue, streamer::Streamer}, }; #[pyo3::pyclass] @@ -51,14 +48,9 @@ impl ConsumersNamesIterator { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let ctx = self.streamer.clone(); - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let value = ctx.lock().await.next().await; match value { Some(name) => Ok(name?), @@ -66,10 +58,6 @@ impl ConsumersNamesIterator { } }) } - - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } } impl ConsumersIterator { @@ -97,15 +85,10 @@ impl ConsumersIterator { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let ctx = self.streamer.clone(); let stream = self.stream.clone(); - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let value = ctx.lock().await.next().await; match value { Some(info) => { @@ -132,10 +115,6 @@ impl ConsumersIterator { } }) } - - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } } #[pyo3::pyclass] diff --git a/src/js/message.rs b/src/js/message.rs index deb4fc6..b1c6d87 100644 --- a/src/js/message.rs +++ b/src/js/message.rs @@ -92,6 +92,11 @@ impl JetStreamMessage { } #[getter] #[must_use] + pub const fn length(&self) -> usize { + self.message.length + } + #[getter] + #[must_use] pub const fn reply(&self) -> &Option { &self.message.reply } @@ -198,4 +203,8 @@ impl JetStreamMessage { pub fn __repr__(&self) -> String { self.message.__repr__() } + #[must_use] + pub const fn __len__(&self) -> usize { + self.message.length + } } diff --git a/src/js/object_store.rs b/src/js/object_store.rs index 7ef3986..2dbdd1a 100644 --- a/src/js/object_store.rs +++ b/src/js/object_store.rs @@ -15,7 +15,6 @@ use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, js::stream::{Placement, StorageType}, utils::{ - futures::natsrpy_future_with_timeout, headers::NatsrpyHeadermapExt, natsrpy_future, py_types::{SendableValue, TimeValue, ToPyDate}, @@ -428,14 +427,9 @@ impl ObjectInfoIterator { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let ctx = self.streamer.clone(); - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let value = ctx.lock().await.next().await; match value { Some(info) => ObjectInfo::try_from(info?), @@ -443,10 +437,6 @@ impl ObjectInfoIterator { } }) } - - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } } #[pyo3::pymodule(submodule, name = "object_store")] diff --git a/src/lib.rs b/src/lib.rs index 545d032..b83e669 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,10 @@ pub mod _natsrpy_rs { #[pymodule_export] use super::nats_cls::NatsCls; #[pymodule_export] - use super::subscriptions::{callback::CallbackSubscription, iterator::IteratorSubscription}; + use super::subscriptions::{ + callback::CallbackSubscription, ctx_manager::SubscriptionCtxManager, + iterator::IteratorSubscription, + }; #[pymodule_export] use super::exceptions::py_err::pymod as exceptions; diff --git a/src/message.rs b/src/message.rs index 0a0e534..8259e63 100644 --- a/src/message.rs +++ b/src/message.rs @@ -84,4 +84,8 @@ impl Message { len = self.length, ) } + #[must_use] + pub const fn __len__(&self) -> usize { + self.length + } } diff --git a/src/nats_cls.rs b/src/nats_cls.rs index a96a431..424f0e2 100644 --- a/src/nats_cls.rs +++ b/src/nats_cls.rs @@ -1,10 +1,10 @@ use async_nats::{Subject, client::traits::Publisher, message::OutboundMessage}; -use pyo3::{Bound, IntoPyObjectExt, Py, PyAny, Python, types::PyDict}; +use pyo3::{Bound, Py, PyAny, Python, types::PyDict}; use std::sync::{Arc, RwLock}; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, - subscriptions::{callback::CallbackSubscription, iterator::IteratorSubscription}, + subscriptions::ctx_manager::SubscriptionCtxManager, utils::{ futures::natsrpy_future_with_timeout, headers::NatsrpyHeadermapExt, @@ -13,16 +13,24 @@ use crate::{ }, }; -#[pyo3::pyclass(name = "Nats")] +#[pyo3::pyclass(name = "Nats", dict, weakref)] pub struct NatsCls { nats_session: Arc>>, + #[pyo3(get)] addr: Vec, + #[pyo3(get)] user_and_pass: Option<(String, String)>, + #[pyo3(get)] nkey: Option, + #[pyo3(get)] token: Option, + #[pyo3(get)] custom_inbox_prefix: Option, + #[pyo3(get)] read_buffer_capacity: u16, + #[pyo3(get)] sender_capacity: usize, + #[pyo3(get)] max_reconnects: Option, connection_timeout: TimeValue, request_timeout: Option, @@ -208,29 +216,17 @@ impl NatsCls { } #[pyo3(signature=(subject, callback=None, queue=None))] - pub fn subscribe<'py>( + pub fn subscribe( &self, - py: Python<'py>, subject: String, callback: Option>, queue: Option, - ) -> NatsrpyResult> { + ) -> NatsrpyResult { log::debug!("Subscribing to '{subject}'"); let client = self.get_client()?; - natsrpy_future(py, async move { - let subscriber = if let Some(queue) = queue { - client.queue_subscribe(subject, queue).await? - } else { - client.subscribe(subject).await? - }; - if let Some(cb) = callback { - let sub = CallbackSubscription::new(subscriber, cb)?; - Ok(Python::attach(|gil| sub.into_py_any(gil))?) - } else { - let sub = IteratorSubscription::new(subscriber); - Ok(Python::attach(|gil| sub.into_py_any(gil))?) - } - }) + Ok(SubscriptionCtxManager::new( + client, subject, callback, queue, + )) } #[pyo3(signature = ( diff --git a/src/subscriptions/callback.rs b/src/subscriptions/callback.rs index 84af5b3..6291e22 100644 --- a/src/subscriptions/callback.rs +++ b/src/subscriptions/callback.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use futures_util::StreamExt; use pyo3::{Bound, Py, PyAny, Python}; -use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future}; +use crate::{ + exceptions::rust_err::NatsrpyResult, + utils::{async_event::AsyncEvent, natsrpy_future}, +}; enum UnsubscribeCommand { Unsubscribe, @@ -11,10 +14,12 @@ enum UnsubscribeCommand { Drain, } -#[pyo3::pyclass] +#[pyo3::pyclass(from_py_object)] +#[derive(Clone)] pub struct CallbackSubscription { unsub_sender: Option>, reading_task: tokio::task::AbortHandle, + end_notify: AsyncEvent, } async fn process_message(message: async_nats::message::Message, py_callback: Arc>) { @@ -31,7 +36,7 @@ async fn process_message(message: async_nats::message::Message, py_callback: Arc Ok(()) }; if let Err(err) = task().await { - log::error!("Cannot process message {message:?}. Error: {err}"); + log::error!("Cannot process message {message:#?}. Error: {err}"); } } @@ -40,6 +45,7 @@ async fn start_py_sub( py_callback: Arc>, locals: pyo3_async_runtimes::TaskLocals, mut unsub_receiver: tokio::sync::mpsc::Receiver, + end_event: AsyncEvent, ) { loop { tokio::select! { @@ -59,21 +65,19 @@ async fn start_py_sub( match cmd { Some(UnsubscribeCommand::Unsubscribe) => { sub.unsubscribe().await.ok(); - break; } Some(UnsubscribeCommand::UnsubscribeAfter(limit)) => { sub.unsubscribe_after(limit).await.ok(); - // Don't break — continue receiving up to `limit` messages. } Some(UnsubscribeCommand::Drain) => { sub.drain().await.ok(); - break; } None => break, } } } } + end_event.set(); } impl CallbackSubscription { @@ -81,14 +85,16 @@ impl CallbackSubscription { let (unsub_tx, unsub_rx) = tokio::sync::mpsc::channel(1); let task_locals = Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?; let callback = Arc::new(callback); + let event = AsyncEvent::default(); let task_handle = tokio::task::spawn(pyo3_async_runtimes::tokio::scope( task_locals.clone(), - start_py_sub(sub, callback, task_locals, unsub_rx), + start_py_sub(sub, callback, task_locals, unsub_rx, event.clone()), )) .abort_handle(); Ok(Self { unsub_sender: Some(unsub_tx), reading_task: task_handle, + end_notify: event, }) } } @@ -130,6 +136,14 @@ impl CallbackSubscription { Ok(()) }) } + + pub fn wait<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let event = self.end_notify.clone(); + natsrpy_future(py, async move { + event.wait().await; + Ok(()) + }) + } } impl Drop for CallbackSubscription { diff --git a/src/subscriptions/ctx_manager.rs b/src/subscriptions/ctx_manager.rs new file mode 100644 index 0000000..f45be6c --- /dev/null +++ b/src/subscriptions/ctx_manager.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; + +use pyo3::{Bound, IntoPyObjectExt, Py, PyAny, Python}; + +use crate::{ + exceptions::rust_err::NatsrpyResult, + subscriptions::{callback::CallbackSubscription, iterator::IteratorSubscription}, + utils::natsrpy_future, +}; + +#[pyo3::pyclass] +pub struct SubscriptionCtxManager { + client: async_nats::Client, + subject: String, + callback: Option>, + queue: Option, + current_sub: Arc>>>, +} + +impl SubscriptionCtxManager { + #[must_use] + pub fn new( + client: async_nats::Client, + subject: String, + callback: Option>, + queue: Option, + ) -> Self { + Self { + client, + subject, + callback, + queue, + current_sub: Arc::new(tokio::sync::Mutex::new(None)), + } + } +} + +#[pyo3::pymethods] +impl SubscriptionCtxManager { + pub fn __aenter__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let current_sub = self.current_sub.clone(); + let client = self.client.clone(); + let callback = self.callback.as_ref().map(|val| val.clone_ref(py)); + let subject = self.subject.clone(); + let queue = self.queue.clone(); + #[allow(clippy::significant_drop_tightening)] + natsrpy_future(py, async move { + let mut current_sub = current_sub.lock().await; + if current_sub.is_some() { + return Err(crate::exceptions::rust_err::NatsrpyError::SessionError( + String::from( + "Subscription context manager is already active. Cannot enter a new context until the current one is exited.", + ), + )); + } + let subscriber = if let Some(queue) = queue { + client.queue_subscribe(subject, queue).await? + } else { + client.subscribe(subject).await? + }; + if let Some(cb) = callback { + let sub = CallbackSubscription::new(subscriber, cb)?; + Python::attach(|gil| -> NatsrpyResult<_> { + let sub = sub.into_bound_py_any(gil)?; + *current_sub = Some(sub.clone().unbind()); + Ok(sub.unbind()) + }) + } else { + let sub = IteratorSubscription::new(subscriber); + Python::attach(|gil| -> NatsrpyResult<_> { + let sub = sub.into_bound_py_any(gil)?; + *current_sub = Some(sub.clone().unbind()); + Ok(sub.unbind()) + }) + } + }) + } + + pub fn detatch<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.__aenter__(py) + } + + #[pyo3(signature=( + _exc_type=None, + _exc_val=None, + _exc_tb=None, + ))] + pub fn __aexit__<'py>( + &self, + py: Python<'py>, + _exc_type: Option>, + _exc_val: Option>, + _exc_tb: Option>, + ) -> NatsrpyResult> { + let current_sub = self.current_sub.clone(); + #[allow(clippy::significant_drop_tightening)] + natsrpy_future(py, async move { + let mut current_sub = current_sub.lock().await; + *current_sub = None; + Ok(()) + }) + } +} diff --git a/src/subscriptions/iterator.rs b/src/subscriptions/iterator.rs index 101fc71..8a2896f 100644 --- a/src/subscriptions/iterator.rs +++ b/src/subscriptions/iterator.rs @@ -4,9 +4,7 @@ use futures_util::StreamExt; use pyo3::{Bound, PyAny, PyRef, Python}; use crate::exceptions::rust_err::{NatsrpyError, NatsrpyResult}; -use crate::utils::futures::natsrpy_future_with_timeout; use crate::utils::natsrpy_future; -use crate::utils::py_types::TimeValue; enum UnsubscribeCommand { Unsubscribe, @@ -55,7 +53,8 @@ async fn sub_forwarder( } } -#[pyo3::pyclass] +#[pyo3::pyclass(from_py_object)] +#[derive(Clone)] pub struct IteratorSubscription { msg_rx: Arc>>, unsub_tx: Option>, @@ -83,14 +82,9 @@ impl IteratorSubscription { slf } - #[pyo3(signature=(timeout=None))] - pub fn next<'py>( - &self, - py: Python<'py>, - timeout: Option, - ) -> NatsrpyResult> { + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { let msg_rx = self.msg_rx.clone(); - natsrpy_future_with_timeout(py, timeout, async move { + natsrpy_future(py, async move { let mut rx = msg_rx.lock().await; rx.recv().await.map_or_else( || Err(NatsrpyError::AsyncStopIteration), @@ -99,10 +93,6 @@ impl IteratorSubscription { }) } - pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - self.next(py, None) - } - #[pyo3(signature=(limit=None))] pub fn unsubscribe<'py>( &self, diff --git a/src/subscriptions/mod.rs b/src/subscriptions/mod.rs index 5c11901..aebf881 100644 --- a/src/subscriptions/mod.rs +++ b/src/subscriptions/mod.rs @@ -1,2 +1,3 @@ pub mod callback; +pub mod ctx_manager; pub mod iterator; diff --git a/src/utils/async_event.rs b/src/utils/async_event.rs new file mode 100644 index 0000000..60d0448 --- /dev/null +++ b/src/utils/async_event.rs @@ -0,0 +1,53 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +/// Async event. +/// +/// This small sync primitive +/// has the same semantics as asyncio.Event from python. +/// +/// It's used to wait for an event to occur, and all +/// the following wait calls will succeed until the event is cleared again. +#[derive(Debug, Clone, Default)] +pub struct AsyncEvent { + is_set: Arc, + notify: Arc, +} + +impl AsyncEvent { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Wait for the event to be set. + /// If the event is already set, this will return immediately. + pub async fn wait(&self) { + loop { + if self.is_set.load(Ordering::Acquire) { + return; + } + self.notify.notified().await; + } + } + + /// Set the event, allowing all waiting tasks to proceed. + pub fn set(&self) { + self.is_set.store(true, Ordering::Release); + self.notify.notify_waiters(); + } + + /// Check if the event is currently set. + #[must_use] + pub fn is_set(&self) -> bool { + self.is_set.load(Ordering::Acquire) + } + + /// Clear the event, causing future wait + /// calls to block until the event is set again. + pub fn clear(&self) { + self.is_set.store(false, Ordering::Release); + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 6f4d837..f107de1 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,4 @@ +pub mod async_event; pub mod futures; pub mod headers; pub mod py; diff --git a/uv.lock b/uv.lock index 6223767..071b115 100644 --- a/uv.lock +++ b/uv.lock @@ -209,6 +209,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" }, ] +[[package]] +name = "importlib-metadata" +version = "8.7.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "zipp" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f3/49/3b30cad09e7771a4982d9975a8cbf64f00d4a1ececb53297f1d9a7be1b10/importlib_metadata-8.7.1.tar.gz", hash = "sha256:49fef1ae6440c182052f407c8d34a68f72efc36db9ca90dc0113398f2fdde8bb", size = 57107, upload-time = "2025-12-21T10:00:19.278Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/5e/f8e9a1d23b9c20a551a8a02ea3637b4642e22c2626e3a13a9a29cdea99eb/importlib_metadata-8.7.1-py3-none-any.whl", hash = "sha256:5a1f80bf1daa489495071efbb095d75a634cf28a8bc299581244063b53176151", size = 27865, upload-time = "2025-12-21T10:00:18.329Z" }, +] + [[package]] name = "iniconfig" version = "2.3.0" @@ -403,6 +415,13 @@ dependencies = [ { name = "typing-extensions" }, ] +[package.optional-dependencies] +opentelemetry = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, +] + [package.dev-dependencies] dev = [ { name = "anyio" }, @@ -417,8 +436,12 @@ dev = [ [package.metadata] requires-dist = [ { name = "maturin-import-hook", specifier = ">=0.3.0" }, + { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.38.0,<2.0.0" }, + { name = "opentelemetry-instrumentation", marker = "extra == 'opentelemetry'", specifier = ">=0.59b0,<1" }, + { name = "opentelemetry-semantic-conventions", marker = "extra == 'opentelemetry'", specifier = ">=0.59b0,<1" }, { name = "typing-extensions", specifier = ">=4.14.0" }, ] +provides-extras = ["opentelemetry"] [package.metadata.requires-dev] dev = [ @@ -440,6 +463,47 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/b2/d0896bdcdc8d28a7fc5717c305f1a861c26e18c05047949fb371034d98bd/nodeenv-1.10.0-py2.py3-none-any.whl", hash = "sha256:5bb13e3eed2923615535339b3c620e76779af4cb4c6a90deccc9e36b274d3827", size = 23438, upload-time = "2025-12-20T14:08:52.782Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.40.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2c/1d/4049a9e8698361cc1a1aa03a6c59e4fa4c71e0c0f94a30f988a6876a2ae6/opentelemetry_api-1.40.0.tar.gz", hash = "sha256:159be641c0b04d11e9ecd576906462773eb97ae1b657730f0ecf64d32071569f", size = 70851, upload-time = "2026-03-04T14:17:21.555Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5f/bf/93795954016c522008da367da292adceed71cca6ee1717e1d64c83089099/opentelemetry_api-1.40.0-py3-none-any.whl", hash = "sha256:82dd69331ae74b06f6a874704be0cfaa49a1650e1537d4a813b86ecef7d0ecf9", size = 68676, upload-time = "2026-03-04T14:17:01.24Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation" +version = "0.61b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/da/37/6bf8e66bfcee5d3c6515b79cb2ee9ad05fe573c20f7ceb288d0e7eeec28c/opentelemetry_instrumentation-0.61b0.tar.gz", hash = "sha256:cb21b48db738c9de196eba6b805b4ff9de3b7f187e4bbf9a466fa170514f1fc7", size = 32606, upload-time = "2026-03-04T14:20:16.825Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d8/3e/f6f10f178b6316de67f0dfdbbb699a24fbe8917cf1743c1595fb9dcdd461/opentelemetry_instrumentation-0.61b0-py3-none-any.whl", hash = "sha256:92a93a280e69788e8f88391247cc530fd81f16f2b011979d4d6398f805cfbc63", size = 33448, upload-time = "2026-03-04T14:19:02.447Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.61b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6d/c0/4ae7973f3c2cfd2b6e321f1675626f0dab0a97027cc7a297474c9c8f3d04/opentelemetry_semantic_conventions-0.61b0.tar.gz", hash = "sha256:072f65473c5d7c6dc0355b27d6c9d1a679d63b6d4b4b16a9773062cb7e31192a", size = 145755, upload-time = "2026-03-04T14:17:32.664Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b2/37/cc6a55e448deaa9b27377d087da8615a3416d8ad523d5960b78dbeadd02a/opentelemetry_semantic_conventions-0.61b0-py3-none-any.whl", hash = "sha256:fa530a96be229795f8cef353739b618148b0fe2b4b3f005e60e262926c4d38e2", size = 231621, upload-time = "2026-03-04T14:17:19.33Z" }, +] + [[package]] name = "packaging" version = "26.0" @@ -701,3 +765,81 @@ sdist = { url = "https://files.pythonhosted.org/packages/aa/92/58199fe10049f9703 wheels = [ { url = "https://files.pythonhosted.org/packages/c6/59/7d02447a55b2e55755011a647479041bc92a82e143f96a8195cb33bd0a1c/virtualenv-21.2.0-py3-none-any.whl", hash = "sha256:1bd755b504931164a5a496d217c014d098426cddc79363ad66ac78125f9d908f", size = 5825084, upload-time = "2026-03-09T17:24:35.378Z" }, ] + +[[package]] +name = "wrapt" +version = "1.17.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/8f/aeb76c5b46e273670962298c23e7ddde79916cb74db802131d49a85e4b7d/wrapt-1.17.3.tar.gz", hash = "sha256:f66eb08feaa410fe4eebd17f2a2c8e2e46d3476e9f8c783daa8e09e0faa666d0", size = 55547, upload-time = "2025-08-12T05:53:21.714Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3f/23/bb82321b86411eb51e5a5db3fb8f8032fd30bd7c2d74bfe936136b2fa1d6/wrapt-1.17.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:88bbae4d40d5a46142e70d58bf664a89b6b4befaea7b2ecc14e03cedb8e06c04", size = 53482, upload-time = "2025-08-12T05:51:44.467Z" }, + { url = "https://files.pythonhosted.org/packages/45/69/f3c47642b79485a30a59c63f6d739ed779fb4cc8323205d047d741d55220/wrapt-1.17.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e6b13af258d6a9ad602d57d889f83b9d5543acd471eee12eb51f5b01f8eb1bc2", size = 38676, upload-time = "2025-08-12T05:51:32.636Z" }, + { url = "https://files.pythonhosted.org/packages/d1/71/e7e7f5670c1eafd9e990438e69d8fb46fa91a50785332e06b560c869454f/wrapt-1.17.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fd341868a4b6714a5962c1af0bd44f7c404ef78720c7de4892901e540417111c", size = 38957, upload-time = "2025-08-12T05:51:54.655Z" }, + { url = "https://files.pythonhosted.org/packages/de/17/9f8f86755c191d6779d7ddead1a53c7a8aa18bccb7cea8e7e72dfa6a8a09/wrapt-1.17.3-cp310-cp310-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:f9b2601381be482f70e5d1051a5965c25fb3625455a2bf520b5a077b22afb775", size = 81975, upload-time = "2025-08-12T05:52:30.109Z" }, + { url = "https://files.pythonhosted.org/packages/f2/15/dd576273491f9f43dd09fce517f6c2ce6eb4fe21681726068db0d0467096/wrapt-1.17.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:343e44b2a8e60e06a7e0d29c1671a0d9951f59174f3709962b5143f60a2a98bd", size = 83149, upload-time = "2025-08-12T05:52:09.316Z" }, + { url = "https://files.pythonhosted.org/packages/0c/c4/5eb4ce0d4814521fee7aa806264bf7a114e748ad05110441cd5b8a5c744b/wrapt-1.17.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:33486899acd2d7d3066156b03465b949da3fd41a5da6e394ec49d271baefcf05", size = 82209, upload-time = "2025-08-12T05:52:10.331Z" }, + { url = "https://files.pythonhosted.org/packages/31/4b/819e9e0eb5c8dc86f60dfc42aa4e2c0d6c3db8732bce93cc752e604bb5f5/wrapt-1.17.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e6f40a8aa5a92f150bdb3e1c44b7e98fb7113955b2e5394122fa5532fec4b418", size = 81551, upload-time = "2025-08-12T05:52:31.137Z" }, + { url = "https://files.pythonhosted.org/packages/f8/83/ed6baf89ba3a56694700139698cf703aac9f0f9eb03dab92f57551bd5385/wrapt-1.17.3-cp310-cp310-win32.whl", hash = "sha256:a36692b8491d30a8c75f1dfee65bef119d6f39ea84ee04d9f9311f83c5ad9390", size = 36464, upload-time = "2025-08-12T05:53:01.204Z" }, + { url = "https://files.pythonhosted.org/packages/2f/90/ee61d36862340ad7e9d15a02529df6b948676b9a5829fd5e16640156627d/wrapt-1.17.3-cp310-cp310-win_amd64.whl", hash = "sha256:afd964fd43b10c12213574db492cb8f73b2f0826c8df07a68288f8f19af2ebe6", size = 38748, upload-time = "2025-08-12T05:53:00.209Z" }, + { url = "https://files.pythonhosted.org/packages/bd/c3/cefe0bd330d389c9983ced15d326f45373f4073c9f4a8c2f99b50bfea329/wrapt-1.17.3-cp310-cp310-win_arm64.whl", hash = "sha256:af338aa93554be859173c39c85243970dc6a289fa907402289eeae7543e1ae18", size = 36810, upload-time = "2025-08-12T05:52:51.906Z" }, + { url = "https://files.pythonhosted.org/packages/52/db/00e2a219213856074a213503fdac0511203dceefff26e1daa15250cc01a0/wrapt-1.17.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:273a736c4645e63ac582c60a56b0acb529ef07f78e08dc6bfadf6a46b19c0da7", size = 53482, upload-time = "2025-08-12T05:51:45.79Z" }, + { url = "https://files.pythonhosted.org/packages/5e/30/ca3c4a5eba478408572096fe9ce36e6e915994dd26a4e9e98b4f729c06d9/wrapt-1.17.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5531d911795e3f935a9c23eb1c8c03c211661a5060aab167065896bbf62a5f85", size = 38674, upload-time = "2025-08-12T05:51:34.629Z" }, + { url = "https://files.pythonhosted.org/packages/31/25/3e8cc2c46b5329c5957cec959cb76a10718e1a513309c31399a4dad07eb3/wrapt-1.17.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0610b46293c59a3adbae3dee552b648b984176f8562ee0dba099a56cfbe4df1f", size = 38959, upload-time = "2025-08-12T05:51:56.074Z" }, + { url = "https://files.pythonhosted.org/packages/5d/8f/a32a99fc03e4b37e31b57cb9cefc65050ea08147a8ce12f288616b05ef54/wrapt-1.17.3-cp311-cp311-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:b32888aad8b6e68f83a8fdccbf3165f5469702a7544472bdf41f582970ed3311", size = 82376, upload-time = "2025-08-12T05:52:32.134Z" }, + { url = "https://files.pythonhosted.org/packages/31/57/4930cb8d9d70d59c27ee1332a318c20291749b4fba31f113c2f8ac49a72e/wrapt-1.17.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8cccf4f81371f257440c88faed6b74f1053eef90807b77e31ca057b2db74edb1", size = 83604, upload-time = "2025-08-12T05:52:11.663Z" }, + { url = "https://files.pythonhosted.org/packages/a8/f3/1afd48de81d63dd66e01b263a6fbb86e1b5053b419b9b33d13e1f6d0f7d0/wrapt-1.17.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d8a210b158a34164de8bb68b0e7780041a903d7b00c87e906fb69928bf7890d5", size = 82782, upload-time = "2025-08-12T05:52:12.626Z" }, + { url = "https://files.pythonhosted.org/packages/1e/d7/4ad5327612173b144998232f98a85bb24b60c352afb73bc48e3e0d2bdc4e/wrapt-1.17.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:79573c24a46ce11aab457b472efd8d125e5a51da2d1d24387666cd85f54c05b2", size = 82076, upload-time = "2025-08-12T05:52:33.168Z" }, + { url = "https://files.pythonhosted.org/packages/bb/59/e0adfc831674a65694f18ea6dc821f9fcb9ec82c2ce7e3d73a88ba2e8718/wrapt-1.17.3-cp311-cp311-win32.whl", hash = "sha256:c31eebe420a9a5d2887b13000b043ff6ca27c452a9a22fa71f35f118e8d4bf89", size = 36457, upload-time = "2025-08-12T05:53:03.936Z" }, + { url = "https://files.pythonhosted.org/packages/83/88/16b7231ba49861b6f75fc309b11012ede4d6b0a9c90969d9e0db8d991aeb/wrapt-1.17.3-cp311-cp311-win_amd64.whl", hash = "sha256:0b1831115c97f0663cb77aa27d381237e73ad4f721391a9bfb2fe8bc25fa6e77", size = 38745, upload-time = "2025-08-12T05:53:02.885Z" }, + { url = "https://files.pythonhosted.org/packages/9a/1e/c4d4f3398ec073012c51d1c8d87f715f56765444e1a4b11e5180577b7e6e/wrapt-1.17.3-cp311-cp311-win_arm64.whl", hash = "sha256:5a7b3c1ee8265eb4c8f1b7d29943f195c00673f5ab60c192eba2d4a7eae5f46a", size = 36806, upload-time = "2025-08-12T05:52:53.368Z" }, + { url = "https://files.pythonhosted.org/packages/9f/41/cad1aba93e752f1f9268c77270da3c469883d56e2798e7df6240dcb2287b/wrapt-1.17.3-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:ab232e7fdb44cdfbf55fc3afa31bcdb0d8980b9b95c38b6405df2acb672af0e0", size = 53998, upload-time = "2025-08-12T05:51:47.138Z" }, + { url = "https://files.pythonhosted.org/packages/60/f8/096a7cc13097a1869fe44efe68dace40d2a16ecb853141394047f0780b96/wrapt-1.17.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:9baa544e6acc91130e926e8c802a17f3b16fbea0fd441b5a60f5cf2cc5c3deba", size = 39020, upload-time = "2025-08-12T05:51:35.906Z" }, + { url = "https://files.pythonhosted.org/packages/33/df/bdf864b8997aab4febb96a9ae5c124f700a5abd9b5e13d2a3214ec4be705/wrapt-1.17.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6b538e31eca1a7ea4605e44f81a48aa24c4632a277431a6ed3f328835901f4fd", size = 39098, upload-time = "2025-08-12T05:51:57.474Z" }, + { url = "https://files.pythonhosted.org/packages/9f/81/5d931d78d0eb732b95dc3ddaeeb71c8bb572fb01356e9133916cd729ecdd/wrapt-1.17.3-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:042ec3bb8f319c147b1301f2393bc19dba6e176b7da446853406d041c36c7828", size = 88036, upload-time = "2025-08-12T05:52:34.784Z" }, + { url = "https://files.pythonhosted.org/packages/ca/38/2e1785df03b3d72d34fc6252d91d9d12dc27a5c89caef3335a1bbb8908ca/wrapt-1.17.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3af60380ba0b7b5aeb329bc4e402acd25bd877e98b3727b0135cb5c2efdaefe9", size = 88156, upload-time = "2025-08-12T05:52:13.599Z" }, + { url = "https://files.pythonhosted.org/packages/b3/8b/48cdb60fe0603e34e05cffda0b2a4adab81fd43718e11111a4b0100fd7c1/wrapt-1.17.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0b02e424deef65c9f7326d8c19220a2c9040c51dc165cddb732f16198c168396", size = 87102, upload-time = "2025-08-12T05:52:14.56Z" }, + { url = "https://files.pythonhosted.org/packages/3c/51/d81abca783b58f40a154f1b2c56db1d2d9e0d04fa2d4224e357529f57a57/wrapt-1.17.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:74afa28374a3c3a11b3b5e5fca0ae03bef8450d6aa3ab3a1e2c30e3a75d023dc", size = 87732, upload-time = "2025-08-12T05:52:36.165Z" }, + { url = "https://files.pythonhosted.org/packages/9e/b1/43b286ca1392a006d5336412d41663eeef1ad57485f3e52c767376ba7e5a/wrapt-1.17.3-cp312-cp312-win32.whl", hash = "sha256:4da9f45279fff3543c371d5ababc57a0384f70be244de7759c85a7f989cb4ebe", size = 36705, upload-time = "2025-08-12T05:53:07.123Z" }, + { url = "https://files.pythonhosted.org/packages/28/de/49493f962bd3c586ab4b88066e967aa2e0703d6ef2c43aa28cb83bf7b507/wrapt-1.17.3-cp312-cp312-win_amd64.whl", hash = "sha256:e71d5c6ebac14875668a1e90baf2ea0ef5b7ac7918355850c0908ae82bcb297c", size = 38877, upload-time = "2025-08-12T05:53:05.436Z" }, + { url = "https://files.pythonhosted.org/packages/f1/48/0f7102fe9cb1e8a5a77f80d4f0956d62d97034bbe88d33e94699f99d181d/wrapt-1.17.3-cp312-cp312-win_arm64.whl", hash = "sha256:604d076c55e2fdd4c1c03d06dc1a31b95130010517b5019db15365ec4a405fc6", size = 36885, upload-time = "2025-08-12T05:52:54.367Z" }, + { url = "https://files.pythonhosted.org/packages/fc/f6/759ece88472157acb55fc195e5b116e06730f1b651b5b314c66291729193/wrapt-1.17.3-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:a47681378a0439215912ef542c45a783484d4dd82bac412b71e59cf9c0e1cea0", size = 54003, upload-time = "2025-08-12T05:51:48.627Z" }, + { url = "https://files.pythonhosted.org/packages/4f/a9/49940b9dc6d47027dc850c116d79b4155f15c08547d04db0f07121499347/wrapt-1.17.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:54a30837587c6ee3cd1a4d1c2ec5d24e77984d44e2f34547e2323ddb4e22eb77", size = 39025, upload-time = "2025-08-12T05:51:37.156Z" }, + { url = "https://files.pythonhosted.org/packages/45/35/6a08de0f2c96dcdd7fe464d7420ddb9a7655a6561150e5fc4da9356aeaab/wrapt-1.17.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:16ecf15d6af39246fe33e507105d67e4b81d8f8d2c6598ff7e3ca1b8a37213f7", size = 39108, upload-time = "2025-08-12T05:51:58.425Z" }, + { url = "https://files.pythonhosted.org/packages/0c/37/6faf15cfa41bf1f3dba80cd3f5ccc6622dfccb660ab26ed79f0178c7497f/wrapt-1.17.3-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:6fd1ad24dc235e4ab88cda009e19bf347aabb975e44fd5c2fb22a3f6e4141277", size = 88072, upload-time = "2025-08-12T05:52:37.53Z" }, + { url = "https://files.pythonhosted.org/packages/78/f2/efe19ada4a38e4e15b6dff39c3e3f3f73f5decf901f66e6f72fe79623a06/wrapt-1.17.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ed61b7c2d49cee3c027372df5809a59d60cf1b6c2f81ee980a091f3afed6a2d", size = 88214, upload-time = "2025-08-12T05:52:15.886Z" }, + { url = "https://files.pythonhosted.org/packages/40/90/ca86701e9de1622b16e09689fc24b76f69b06bb0150990f6f4e8b0eeb576/wrapt-1.17.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:423ed5420ad5f5529db9ce89eac09c8a2f97da18eb1c870237e84c5a5c2d60aa", size = 87105, upload-time = "2025-08-12T05:52:17.914Z" }, + { url = "https://files.pythonhosted.org/packages/fd/e0/d10bd257c9a3e15cbf5523025252cc14d77468e8ed644aafb2d6f54cb95d/wrapt-1.17.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:e01375f275f010fcbf7f643b4279896d04e571889b8a5b3f848423d91bf07050", size = 87766, upload-time = "2025-08-12T05:52:39.243Z" }, + { url = "https://files.pythonhosted.org/packages/e8/cf/7d848740203c7b4b27eb55dbfede11aca974a51c3d894f6cc4b865f42f58/wrapt-1.17.3-cp313-cp313-win32.whl", hash = "sha256:53e5e39ff71b3fc484df8a522c933ea2b7cdd0d5d15ae82e5b23fde87d44cbd8", size = 36711, upload-time = "2025-08-12T05:53:10.074Z" }, + { url = "https://files.pythonhosted.org/packages/57/54/35a84d0a4d23ea675994104e667ceff49227ce473ba6a59ba2c84f250b74/wrapt-1.17.3-cp313-cp313-win_amd64.whl", hash = "sha256:1f0b2f40cf341ee8cc1a97d51ff50dddb9fcc73241b9143ec74b30fc4f44f6cb", size = 38885, upload-time = "2025-08-12T05:53:08.695Z" }, + { url = "https://files.pythonhosted.org/packages/01/77/66e54407c59d7b02a3c4e0af3783168fff8e5d61def52cda8728439d86bc/wrapt-1.17.3-cp313-cp313-win_arm64.whl", hash = "sha256:7425ac3c54430f5fc5e7b6f41d41e704db073309acfc09305816bc6a0b26bb16", size = 36896, upload-time = "2025-08-12T05:52:55.34Z" }, + { url = "https://files.pythonhosted.org/packages/02/a2/cd864b2a14f20d14f4c496fab97802001560f9f41554eef6df201cd7f76c/wrapt-1.17.3-cp314-cp314-macosx_10_13_universal2.whl", hash = "sha256:cf30f6e3c077c8e6a9a7809c94551203c8843e74ba0c960f4a98cd80d4665d39", size = 54132, upload-time = "2025-08-12T05:51:49.864Z" }, + { url = "https://files.pythonhosted.org/packages/d5/46/d011725b0c89e853dc44cceb738a307cde5d240d023d6d40a82d1b4e1182/wrapt-1.17.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:e228514a06843cae89621384cfe3a80418f3c04aadf8a3b14e46a7be704e4235", size = 39091, upload-time = "2025-08-12T05:51:38.935Z" }, + { url = "https://files.pythonhosted.org/packages/2e/9e/3ad852d77c35aae7ddebdbc3b6d35ec8013af7d7dddad0ad911f3d891dae/wrapt-1.17.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:5ea5eb3c0c071862997d6f3e02af1d055f381b1d25b286b9d6644b79db77657c", size = 39172, upload-time = "2025-08-12T05:51:59.365Z" }, + { url = "https://files.pythonhosted.org/packages/c3/f7/c983d2762bcce2326c317c26a6a1e7016f7eb039c27cdf5c4e30f4160f31/wrapt-1.17.3-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:281262213373b6d5e4bb4353bc36d1ba4084e6d6b5d242863721ef2bf2c2930b", size = 87163, upload-time = "2025-08-12T05:52:40.965Z" }, + { url = "https://files.pythonhosted.org/packages/e4/0f/f673f75d489c7f22d17fe0193e84b41540d962f75fce579cf6873167c29b/wrapt-1.17.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:dc4a8d2b25efb6681ecacad42fca8859f88092d8732b170de6a5dddd80a1c8fa", size = 87963, upload-time = "2025-08-12T05:52:20.326Z" }, + { url = "https://files.pythonhosted.org/packages/df/61/515ad6caca68995da2fac7a6af97faab8f78ebe3bf4f761e1b77efbc47b5/wrapt-1.17.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:373342dd05b1d07d752cecbec0c41817231f29f3a89aa8b8843f7b95992ed0c7", size = 86945, upload-time = "2025-08-12T05:52:21.581Z" }, + { url = "https://files.pythonhosted.org/packages/d3/bd/4e70162ce398462a467bc09e768bee112f1412e563620adc353de9055d33/wrapt-1.17.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d40770d7c0fd5cbed9d84b2c3f2e156431a12c9a37dc6284060fb4bec0b7ffd4", size = 86857, upload-time = "2025-08-12T05:52:43.043Z" }, + { url = "https://files.pythonhosted.org/packages/2b/b8/da8560695e9284810b8d3df8a19396a6e40e7518059584a1a394a2b35e0a/wrapt-1.17.3-cp314-cp314-win32.whl", hash = "sha256:fbd3c8319de8e1dc79d346929cd71d523622da527cca14e0c1d257e31c2b8b10", size = 37178, upload-time = "2025-08-12T05:53:12.605Z" }, + { url = "https://files.pythonhosted.org/packages/db/c8/b71eeb192c440d67a5a0449aaee2310a1a1e8eca41676046f99ed2487e9f/wrapt-1.17.3-cp314-cp314-win_amd64.whl", hash = "sha256:e1a4120ae5705f673727d3253de3ed0e016f7cd78dc463db1b31e2463e1f3cf6", size = 39310, upload-time = "2025-08-12T05:53:11.106Z" }, + { url = "https://files.pythonhosted.org/packages/45/20/2cda20fd4865fa40f86f6c46ed37a2a8356a7a2fde0773269311f2af56c7/wrapt-1.17.3-cp314-cp314-win_arm64.whl", hash = "sha256:507553480670cab08a800b9463bdb881b2edeed77dc677b0a5915e6106e91a58", size = 37266, upload-time = "2025-08-12T05:52:56.531Z" }, + { url = "https://files.pythonhosted.org/packages/77/ed/dd5cf21aec36c80443c6f900449260b80e2a65cf963668eaef3b9accce36/wrapt-1.17.3-cp314-cp314t-macosx_10_13_universal2.whl", hash = "sha256:ed7c635ae45cfbc1a7371f708727bf74690daedc49b4dba310590ca0bd28aa8a", size = 56544, upload-time = "2025-08-12T05:51:51.109Z" }, + { url = "https://files.pythonhosted.org/packages/8d/96/450c651cc753877ad100c7949ab4d2e2ecc4d97157e00fa8f45df682456a/wrapt-1.17.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:249f88ed15503f6492a71f01442abddd73856a0032ae860de6d75ca62eed8067", size = 40283, upload-time = "2025-08-12T05:51:39.912Z" }, + { url = "https://files.pythonhosted.org/packages/d1/86/2fcad95994d9b572db57632acb6f900695a648c3e063f2cd344b3f5c5a37/wrapt-1.17.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:5a03a38adec8066d5a37bea22f2ba6bbf39fcdefbe2d91419ab864c3fb515454", size = 40366, upload-time = "2025-08-12T05:52:00.693Z" }, + { url = "https://files.pythonhosted.org/packages/64/0e/f4472f2fdde2d4617975144311f8800ef73677a159be7fe61fa50997d6c0/wrapt-1.17.3-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:5d4478d72eb61c36e5b446e375bbc49ed002430d17cdec3cecb36993398e1a9e", size = 108571, upload-time = "2025-08-12T05:52:44.521Z" }, + { url = "https://files.pythonhosted.org/packages/cc/01/9b85a99996b0a97c8a17484684f206cbb6ba73c1ce6890ac668bcf3838fb/wrapt-1.17.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:223db574bb38637e8230eb14b185565023ab624474df94d2af18f1cdb625216f", size = 113094, upload-time = "2025-08-12T05:52:22.618Z" }, + { url = "https://files.pythonhosted.org/packages/25/02/78926c1efddcc7b3aa0bc3d6b33a822f7d898059f7cd9ace8c8318e559ef/wrapt-1.17.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e405adefb53a435f01efa7ccdec012c016b5a1d3f35459990afc39b6be4d5056", size = 110659, upload-time = "2025-08-12T05:52:24.057Z" }, + { url = "https://files.pythonhosted.org/packages/dc/ee/c414501ad518ac3e6fe184753632fe5e5ecacdcf0effc23f31c1e4f7bfcf/wrapt-1.17.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:88547535b787a6c9ce4086917b6e1d291aa8ed914fdd3a838b3539dc95c12804", size = 106946, upload-time = "2025-08-12T05:52:45.976Z" }, + { url = "https://files.pythonhosted.org/packages/be/44/a1bd64b723d13bb151d6cc91b986146a1952385e0392a78567e12149c7b4/wrapt-1.17.3-cp314-cp314t-win32.whl", hash = "sha256:41b1d2bc74c2cac6f9074df52b2efbef2b30bdfe5f40cb78f8ca22963bc62977", size = 38717, upload-time = "2025-08-12T05:53:15.214Z" }, + { url = "https://files.pythonhosted.org/packages/79/d9/7cfd5a312760ac4dd8bf0184a6ee9e43c33e47f3dadc303032ce012b8fa3/wrapt-1.17.3-cp314-cp314t-win_amd64.whl", hash = "sha256:73d496de46cd2cdbdbcce4ae4bcdb4afb6a11234a1df9c085249d55166b95116", size = 41334, upload-time = "2025-08-12T05:53:14.178Z" }, + { url = "https://files.pythonhosted.org/packages/46/78/10ad9781128ed2f99dbc474f43283b13fea8ba58723e98844367531c18e9/wrapt-1.17.3-cp314-cp314t-win_arm64.whl", hash = "sha256:f38e60678850c42461d4202739f9bf1e3a737c7ad283638251e79cc49effb6b6", size = 38471, upload-time = "2025-08-12T05:52:57.784Z" }, + { url = "https://files.pythonhosted.org/packages/1f/f6/a933bd70f98e9cf3e08167fc5cd7aaaca49147e48411c0bd5ae701bb2194/wrapt-1.17.3-py3-none-any.whl", hash = "sha256:7171ae35d2c33d326ac19dd8facb1e82e5fd04ef8c6c0e394d7af55a55051c22", size = 23591, upload-time = "2025-08-12T05:53:20.674Z" }, +] + +[[package]] +name = "zipp" +version = "3.23.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547, upload-time = "2025-06-08T17:06:39.4Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" }, +]