Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ async-nats = "0.46"
bytes = "1.11.1"
futures-util = "0.3.32"
log = "0.4.29"
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
mimalloc = "0.1.48"
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
pyo3-log = "0.13.3"
serde = { version = "1.0.228", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub type NatsrpyResult<T> = Result<T, NatsrpyError>;
pub enum NatsrpyError {
#[error(transparent)]
StdIOError(#[from] std::io::Error),
#[error("The lock is poisoned")]
PoisonedLock,
#[error(transparent)]
StdParseIntError(#[from] std::num::ParseIntError),
#[error(transparent)]
Expand Down
11 changes: 3 additions & 8 deletions src/js/consumers/pull/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use futures_util::StreamExt;
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::NatsrpyResult,
Expand All @@ -19,7 +18,7 @@ pub struct PullConsumer {
name: String,
#[pyo3(get)]
stream_name: String,
consumer: Arc<RwLock<NatsPullConsumer>>,
consumer: Arc<NatsPullConsumer>,
}

impl PullConsumer {
Expand All @@ -29,7 +28,7 @@ impl PullConsumer {
Self {
name: info.name.clone(),
stream_name: info.stream_name.clone(),
consumer: Arc::new(RwLock::new(consumer)),
consumer: Arc::new(consumer),
}
}
}
Expand Down Expand Up @@ -60,13 +59,9 @@ impl PullConsumer {
min_ack_pending: Option<usize>,
timeout: Option<TimeValue>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.consumer.clone();

// Because we borrow cosnumer lock
// later for modifications of fetchbuilder.
let consumer = self.consumer.clone();
#[allow(clippy::significant_drop_tightening)]
natsrpy_future_with_timeout(py, timeout, async move {
let consumer = ctx.read().await;
let mut fetch_builder = consumer.fetch();
if let Some(max_messages) = max_messages {
fetch_builder = fetch_builder.max_messages(max_messages);
Expand Down
17 changes: 7 additions & 10 deletions src/js/consumers/push/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use futures_util::StreamExt;
use pyo3::{Bound, PyAny, PyRef, Python};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
Expand All @@ -20,7 +19,7 @@ pub struct PushConsumer {
name: String,
#[pyo3(get)]
stream_name: String,
consumer: Arc<RwLock<NatsPushConsumer>>,
consumer: Arc<NatsPushConsumer>,
}

impl PushConsumer {
Expand All @@ -30,32 +29,30 @@ impl PushConsumer {
Self {
name: info.name.clone(),
stream_name: info.stream_name.clone(),
consumer: Arc::new(RwLock::new(consumer)),
consumer: Arc::new(consumer),
}
}
}

#[pyo3::pyclass]
pub struct MessagesIterator {
messages: Option<Arc<RwLock<async_nats::jetstream::consumer::push::Messages>>>,
messages: Option<Arc<tokio::sync::Mutex<async_nats::jetstream::consumer::push::Messages>>>,
}

impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator {
fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self {
Self {
messages: Some(Arc::new(RwLock::new(value))),
messages: Some(Arc::new(tokio::sync::Mutex::new(value))),
}
}
}

#[pyo3::pymethods]
impl PushConsumer {
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
let consumer_guard = self.consumer.clone();
let consumer = self.consumer.clone();
natsrpy_future(py, async move {
Ok(MessagesIterator::from(
consumer_guard.read().await.messages().await?,
))
Ok(MessagesIterator::from(consumer.messages().await?))
})
}

Expand Down Expand Up @@ -87,7 +84,7 @@ impl MessagesIterator {
};
#[allow(clippy::significant_drop_tightening)]
natsrpy_future_with_timeout(py, timeout, async move {
let mut messages = messages_guard.write().await;
let mut messages = messages_guard.lock().await;
let Some(message) = messages.next().await else {
return Err(NatsrpyError::AsyncStopIteration);
};
Expand Down
18 changes: 6 additions & 12 deletions src/js/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use async_nats::{HeaderMap, jetstream::context::traits::Publisher};
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
Expand Down Expand Up @@ -301,17 +300,18 @@ impl CounterEntry {
#[pyo3::pyclass]
#[allow(dead_code)]
pub struct Counters {
stream: Arc<RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>>,
js: Arc<RwLock<async_nats::jetstream::Context>>,
stream: Arc<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>,
js: Arc<async_nats::jetstream::Context>,
}

impl Counters {
#[must_use]
pub fn new(
stream: async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>,
js: Arc<RwLock<async_nats::jetstream::Context>>,
js: Arc<async_nats::jetstream::Context>,
) -> Self {
Self {
stream: Arc::new(RwLock::new(stream)),
stream: Arc::new(stream),
js,
}
}
Expand Down Expand Up @@ -357,8 +357,6 @@ impl Counters {
headers.insert(COUNTER_INCREMENT_HEADER, value.to_string());
natsrpy_future_with_timeout(py, timeout, async move {
let resp = js
.read()
.await
.publish_message(async_nats::jetstream::message::OutboundMessage {
subject: key.into(),
payload: bytes::Bytes::new(),
Expand Down Expand Up @@ -404,11 +402,7 @@ impl Counters {
) -> NatsrpyResult<Bound<'py, PyAny>> {
let stream_guard = self.stream.clone();
natsrpy_future_with_timeout(py, timeout, async move {
let message = stream_guard
.read()
.await
.direct_get_last_for_subject(key)
.await?;
let message = stream_guard.direct_get_last_for_subject(key).await?;
CounterEntry::try_from(message)
})
}
Expand Down
17 changes: 5 additions & 12 deletions src/js/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use async_nats::{Subject, connection::State, jetstream::context::traits::Publisher};
use pyo3::{Bound, PyAny, Python, types::PyDict};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
Expand All @@ -15,15 +14,13 @@ use crate::{

#[pyo3::pyclass]
pub struct JetStream {
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
ctx: Arc<async_nats::jetstream::Context>,
}

impl JetStream {
#[must_use]
pub fn new(ctx: async_nats::jetstream::Context) -> Self {
Self {
ctx: Arc::new(RwLock::new(ctx)),
}
Self { ctx: Arc::new(ctx) }
}
}

Expand Down Expand Up @@ -92,20 +89,16 @@ impl JetStream {
err_on_disconnect: bool,
wait: bool,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.ctx.clone();
let data = payload.into();
let headermap = headers
.map(async_nats::HeaderMap::from_pydict)
.transpose()?;
let client = self.ctx.clone();
natsrpy_future(py, async move {
if err_on_disconnect
&& ctx.read().await.client().connection_state() == State::Disconnected
{
if err_on_disconnect && client.client().connection_state() == State::Disconnected {
return Err(NatsrpyError::Disconnected);
}
let publication = ctx
.read()
.await
let publication = client
.publish_message(async_nats::jetstream::message::OutboundMessage {
subject: Subject::from(subject),
payload: data,
Expand Down
Loading
Loading