Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions lightning-liquidity/src/lsps1/peer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use super::msgs::{
use crate::lsps0::ser::{LSPSDateTime, LSPSRequestId};
use crate::prelude::HashMap;

use core::time::Duration;

use lightning::util::hash_tables::new_hash_map;
use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};

Expand Down Expand Up @@ -397,6 +399,36 @@ impl PeerState {
});
}

/// Removes all terminal orders from state that are at least `max_age` old.
///
/// Terminal orders are those in the [`ChannelOrderState::CompletedAndChannelOpened`] or
/// [`ChannelOrderState::FailedAndRefunded`] state. `max_age` is measured from the order's
/// `created_at` timestamp. Pass [`Duration::ZERO`] to prune all terminal orders regardless
/// of age, which is useful to immediately free per-peer quota when a client is blocked by
/// the request limit due to accumulated `FailedAndRefunded` entries.
///
/// Returns the number of orders removed.
pub(super) fn prune_terminal_orders(&mut self, now: &LSPSDateTime, max_age: Duration) -> usize {
let mut pruned = 0usize;
self.outbound_channels_by_order_id.retain(|_order_id, order| {
let is_terminal = matches!(
order.state,
ChannelOrderState::CompletedAndChannelOpened { .. }
| ChannelOrderState::FailedAndRefunded { .. }
);
if is_terminal && now.duration_since(&order.created_at) >= max_age {
pruned += 1;
false
} else {
true
}
});
if pruned > 0 {
self.needs_persist |= true;
}
pruned
}

fn pending_requests_and_unpaid_orders(&self) -> usize {
let pending_requests = self.pending_requests.len();
// We exclude paid and completed orders.
Expand Down Expand Up @@ -778,4 +810,149 @@ mod tests {
// Available in CompletedAndChannelOpened
assert_eq!(state.channel_info(), Some(&channel_info));
}

fn create_test_order_params() -> LSPS1OrderParams {
LSPS1OrderParams {
lsp_balance_sat: 100_000,
client_balance_sat: 0,
required_channel_confirmations: 0,
funding_confirms_within_blocks: 6,
channel_expiry_blocks: 144,
token: None,
announce_channel: false,
}
}

#[test]
fn test_prune_terminal_orders_completed() {
let mut peer_state = PeerState::default();
let order_id = LSPS1OrderId("order1".to_string());
peer_state.new_order(
order_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);
peer_state.order_payment_received(&order_id, PaymentMethod::Bolt11).unwrap();
peer_state.order_channel_opened(&order_id, create_test_channel_info()).unwrap();

// max_age=0 prunes all terminal orders regardless of age.
let now = LSPSDateTime::from_str("2024-01-01T01:00:00Z").unwrap();
assert_eq!(peer_state.prune_terminal_orders(&now, Duration::ZERO), 1);
assert!(peer_state.get_order(&order_id).is_err());
}

#[test]
fn test_prune_terminal_orders_failed_and_refunded() {
let mut peer_state = PeerState::default();
let order_id = LSPS1OrderId("order2".to_string());
// Non-expired invoice: verify we do not require invoice expiry before pruning.
peer_state.new_order(
order_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);
peer_state.order_failed_and_refunded(&order_id).unwrap();

let now = LSPSDateTime::from_str("2024-01-01T01:00:00Z").unwrap();
assert_eq!(peer_state.prune_terminal_orders(&now, Duration::ZERO), 1);
assert!(peer_state.get_order(&order_id).is_err());
}

#[test]
fn test_prune_terminal_orders_age_filter() {
let mut peer_state = PeerState::default();

// Old order (2 hours before now) — must be pruned when max_age = 1 hour.
let old_id = LSPS1OrderId("old".to_string());
peer_state.new_order(
old_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);
peer_state.order_failed_and_refunded(&old_id).unwrap();

// Recent order (10 minutes before now) — must NOT be pruned when max_age = 1 hour.
let recent_id = LSPS1OrderId("recent".to_string());
peer_state.new_order(
recent_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T01:50:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);
peer_state.order_failed_and_refunded(&recent_id).unwrap();

let now = LSPSDateTime::from_str("2024-01-01T02:00:00Z").unwrap();
let pruned = peer_state.prune_terminal_orders(&now, Duration::from_secs(3600));
assert_eq!(pruned, 1);
assert!(peer_state.get_order(&old_id).is_err());
assert!(peer_state.get_order(&recent_id).is_ok());
}

#[test]
fn test_prune_terminal_orders_non_terminal_skipped() {
let mut peer_state = PeerState::default();

// ExpectingPayment is not a terminal state.
let expecting_id = LSPS1OrderId("expecting".to_string());
peer_state.new_order(
expecting_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);

// OrderPaid is not a terminal state.
let paid_id = LSPS1OrderId("paid".to_string());
peer_state.new_order(
paid_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);
peer_state.order_payment_received(&paid_id, PaymentMethod::Bolt11).unwrap();

let now = LSPSDateTime::from_str("2024-01-01T02:00:00Z").unwrap();
assert_eq!(peer_state.prune_terminal_orders(&now, Duration::ZERO), 0);
assert!(peer_state.get_order(&expecting_id).is_ok());
assert!(peer_state.get_order(&paid_id).is_ok());
}

#[test]
fn test_prune_terminal_orders_frees_quota() {
let mut peer_state = PeerState::default();

// Fill up to the limit with FailedAndRefunded orders.
for i in 0..MAX_PENDING_REQUESTS_PER_PEER {
let order_id = LSPS1OrderId(format!("order{}", i));
peer_state.new_order(
order_id.clone(),
create_test_order_params(),
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
create_test_payment_info_bolt11_only(),
);
peer_state.order_failed_and_refunded(&order_id).unwrap();
}

// Registering another request must fail: quota is exhausted.
let dummy_request = LSPS1Request::GetInfo(Default::default());
assert!(matches!(
peer_state.register_request(LSPSRequestId("r0".to_string()), dummy_request.clone()),
Err(PeerStateError::TooManyPendingRequests)
));

// Prune all failed orders with max_age=0.
let now = LSPSDateTime::from_str("2024-01-01T01:00:00Z").unwrap();
assert_eq!(
peer_state.prune_terminal_orders(&now, Duration::ZERO),
MAX_PENDING_REQUESTS_PER_PEER
);

// Now registering a new request must succeed.
assert!(peer_state
.register_request(LSPSRequestId("r1".to_string()), dummy_request)
.is_ok());
}
}
66 changes: 66 additions & 0 deletions lightning-liquidity/src/lsps1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use core::ops::Deref;
use core::pin::pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task;
use core::time::Duration;

use super::event::LSPS1ServiceEvent;
use super::msgs::{
Expand Down Expand Up @@ -752,6 +753,52 @@ where
Ok(())
}

/// Prunes terminal orders for a peer that are at least `max_age` old, freeing memory and
/// per-peer quota.
///
/// Terminal orders are those in the [`LSPS1OrderState::Completed`] or
/// [`LSPS1OrderState::Failed`] state. `max_age` is measured from each order's `created_at`
/// timestamp. Pass [`Duration::ZERO`] to prune all terminal orders regardless of age,
/// which is useful to immediately free per-peer quota when a client is blocked by the
/// per-peer request limit due to accumulated failed orders.
///
/// Returns the number of orders removed, or an [`APIError::APIMisuseError`] if no state
/// exists for the given counterparty.
pub async fn prune_orders(
&self, counterparty_node_id: PublicKey, max_age: Duration,
) -> Result<usize, APIError> {
let now =
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
let pruned;
{
let outer_state_lock = self.per_peer_state.read().unwrap();
let inner_state_lock =
outer_state_lock.get(&counterparty_node_id).ok_or_else(|| {
APIError::APIMisuseError {
err: format!(
"No existing state with counterparty {}",
counterparty_node_id
),
}
})?;
let mut peer_state = inner_state_lock.lock().unwrap();
pruned = peer_state.prune_terminal_orders(&now, max_age);
}

if pruned > 0 {
self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
err: format!(
"Failed to persist peer state for {}: {}",
counterparty_node_id, e
),
}
})?;
}

Ok(pruned)
}

fn generate_order_id(&self) -> LSPS1OrderId {
let bytes = self.entropy_source.get_secure_random_bytes();
LSPS1OrderId(utils::hex_str(&bytes[0..16]))
Expand Down Expand Up @@ -930,6 +977,25 @@ where
},
}
}

/// Prunes terminal orders for a peer that are at least `max_age` old.
///
/// Wraps [`LSPS1ServiceHandler::prune_orders`].
pub fn prune_orders(
&self, counterparty_node_id: PublicKey, max_age: Duration,
) -> Result<usize, APIError> {
let mut fut = pin!(self.inner.prune_orders(counterparty_node_id, max_age));

let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
// In a sync context, we can't wait for the future to complete.
unreachable!("Should not be pending in a sync context");
},
}
}
}

fn check_range(min: u64, max: u64, value: u64) -> bool {
Expand Down
Loading