diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index af4d1569d0c..68b0612ec1f 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4654,34 +4654,20 @@ fn test_claim_to_closed_channel_blocks_claimed_event() { } #[test] -#[cfg(all(feature = "std", not(target_os = "windows")))] fn test_single_channel_multiple_mpp() { use crate::util::config::UserConfig; - use std::sync::atomic::{AtomicBool, Ordering}; - // Test what happens when we attempt to claim an MPP with many parts that came to us through - // the same channel with a synchronous persistence interface which has very high latency. - // - // Previously, if a `revoke_and_ack` came in while we were still running in - // `ChannelManager::claim_payment` we'd end up hanging waiting to apply a - // `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test - // for more info. + // Test that when an MPP payment has many parts ariving on the same channel, all of them are + // claimed in a single commitment update rather than requiring a round-trip per claim. let chanmon_cfgs = create_chanmon_cfgs(9); let node_cfgs = create_node_cfgs(9, &chanmon_cfgs); let mut config = test_default_channel_config(); - // Set the percentage to the default value at the time this test was written config.channel_handshake_config.announced_channel_max_inbound_htlc_value_in_flight_percentage = 10; let configs: [Option; 9] = core::array::from_fn(|_| Some(config.clone())); let node_chanmgrs = create_node_chanmgrs(9, &node_cfgs, &configs); let mut nodes = create_network(9, &node_cfgs, &node_chanmgrs); - let node_b_id = nodes[1].node.get_our_node_id(); - let node_c_id = nodes[2].node.get_our_node_id(); - let node_d_id = nodes[3].node.get_our_node_id(); - let node_e_id = nodes[4].node.get_our_node_id(); - let node_f_id = nodes[5].node.get_our_node_id(); - let node_g_id = nodes[6].node.get_our_node_id(); let node_h_id = nodes[7].node.get_our_node_id(); let node_i_id = nodes[8].node.get_our_node_id(); @@ -4691,28 +4677,7 @@ fn test_single_channel_multiple_mpp() { // 7 // 8 // - // We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test - // robust is rather challenging. We rely on having the main test thread wait on locks held in - // the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a - // single `ChannelMonitorUpdate`. - // This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both - // of which require `ChannelManager` locks, but we have to make sure this thread gets a chance - // to be blocked on the mutexes before we let the background thread wake `claim_funds` so that - // the mutex can switch to this main thread. - // This relies on our locks being fair, but also on our threads getting runtime during the test - // run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as - // possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending - // into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main - // thread to the background thread, and then we let it sleep a while before we send the - // `ChannelMonitorUpdate` unblocker. - // Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two - // `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so - // we use 6 HTLCs. - // Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement - // actual preemptive multitasking and thinks that cooperative multitasking somehow is - // acceptable in the 21st century, let alone a quarter of the way into it. - const MAX_THREAD_INIT_TIME: std::time::Duration = std::time::Duration::from_secs(1); - + // All six parts converge on the same channel (7->8) create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 0, 3, 100_000, 0); @@ -4728,7 +4693,7 @@ fn test_single_channel_multiple_mpp() { create_announced_chan_between_nodes_with_value(&nodes, 6, 7, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 7, 8, 1_000_000, 0); - let (mut route, payment_hash, payment_preimage, payment_secret) = + let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(&nodes[0], nodes[8], 50_000_000); send_along_route_with_secret( @@ -4747,177 +4712,74 @@ fn test_single_channel_multiple_mpp() { payment_secret, ); - let (do_a_write, blocker) = std::sync::mpsc::sync_channel(0); - *nodes[8].chain_monitor.write_blocker.lock().unwrap() = Some(blocker); - - // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. - // We do this by casting a pointer to a `TestChannelManager` to a pointer to a - // `TestChannelManager` with different (in this case 'static) lifetime. - // This is even suggested in the second example at - // https://doc.rust-lang.org/std/mem/fn.transmute.html#examples - let claim_node: &'static TestChannelManager<'static, 'static> = - unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) }; - let thrd = std::thread::spawn(move || { - // Initiate the claim in a background thread as it will immediately block waiting on the - // `write_blocker` we set above. - claim_node.claim_funds(payment_preimage); - }); - - // First unlock one monitor so that we have a pending - // `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty. - do_a_write.send(()).unwrap(); - - let event_node: &'static TestChannelManager<'static, 'static> = - unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) }; - let thrd_event = std::thread::spawn(move || { - let mut have_event = false; - while !have_event { - let mut events = event_node.get_and_clear_pending_events(); - assert!(events.len() == 1 || events.len() == 0); - if events.len() == 1 { - if let Event::PaymentClaimed { .. } = events[0] { - } else { - panic!("Unexpected event {events:?}"); - } - have_event = true; - } - if !have_event { - std::thread::yield_now(); - } - } - }); - - // Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the - // `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which - // `claim_funds` is holding. Thus, we release a second write after a small sleep in the - // background to give `claim_funds` a chance to step forward, unblocking - // `get_and_clear_pending_msg_events`. - let do_a_write_background = do_a_write.clone(); - let block_thrd2 = AtomicBool::new(true); - let block_thrd2_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd2) }; - let thrd2 = std::thread::spawn(move || { - while block_thrd2_read.load(Ordering::Acquire) { - std::thread::yield_now(); - } - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - }); - block_thrd2.store(false, Ordering::Release); - let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id); - - // Thread 2 could unblock first, or it could get blocked waiting on us to process a - // `PaymentClaimed` event. Either way, wait until both have finished. - thrd2.join().unwrap(); - thrd_event.join().unwrap(); - - // Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back - nodes[7].node.peer_disconnected(node_b_id); - nodes[7].node.peer_disconnected(node_c_id); - nodes[7].node.peer_disconnected(node_d_id); - nodes[7].node.peer_disconnected(node_e_id); - nodes[7].node.peer_disconnected(node_f_id); - nodes[7].node.peer_disconnected(node_g_id); - - let first_update_fulfill = first_updates.update_fulfill_htlcs.remove(0); - nodes[7].node.handle_update_fulfill_htlc(node_i_id, first_update_fulfill); - check_added_monitors(&nodes[7], 1); - expect_payment_forwarded!(nodes[7], nodes[1], nodes[8], Some(1000), false, false); - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed); - check_added_monitors(&nodes[7], 1); - let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); - - // Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on - // our peer lock, so we have to release a write to let it process. - // After this call completes, the channel previously would be locked up and should not be able - // to make further progress. - let do_a_write_background = do_a_write.clone(); - let block_thrd3 = AtomicBool::new(true); - let block_thrd3_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd3) }; - let thrd3 = std::thread::spawn(move || { - while block_thrd3_read.load(Ordering::Acquire) { - std::thread::yield_now(); - } - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - }); - block_thrd3.store(false, Ordering::Release); - nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); - thrd3.join().unwrap(); - assert!(!thrd.is_finished()); - - let thrd4 = std::thread::spawn(move || { - do_a_write.send(()).unwrap(); - do_a_write.send(()).unwrap(); - }); - - thrd4.join().unwrap(); - thrd.join().unwrap(); - - // At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the - // above `revoke_and_ack`. - check_added_monitors(&nodes[8], 7); - - // Now drive everything to the end, at least as far as node 7 is concerned... - *nodes[8].chain_monitor.write_blocker.lock().unwrap() = None; - nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); + // All six parts are on the same channel, so claiming should produce a single batched + // ChannelMonitorUpdate containing all 6 preimages and one commitment. + nodes[8].node.claim_funds(payment_preimage); + expect_payment_claimed!(nodes[8], payment_hash, 50_000_000); check_added_monitors(&nodes[8], 1); - let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id); - - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[2], nodes[8], Some(1000), false, false); - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[3], nodes[8], Some(1000), false, false); - let mut next_source = 4; - if let Some(update) = updates.update_fulfill_htlcs.get(0) { - nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone()); - expect_payment_forwarded!(nodes[7], nodes[4], nodes[8], Some(1000), false, false); - next_source += 1; - } - - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed); - nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); - if updates.update_fulfill_htlcs.get(0).is_some() { - check_added_monitors(&nodes[7], 5); - } else { - check_added_monitors(&nodes[7], 4); + let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id); + assert_eq!(first_updates.update_fulfill_htlcs.len(), 6); + + // Disconnect node 7 from intermediate nodes so it doesn't bother forwarding back. + nodes[7].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[2].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[3].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[4].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[5].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[6].node.get_our_node_id()); + + // Deliver all 6 fulfills to node 7 before handling the commitment_signed. + // Each handle_update_fulfill_htlc triggers claim_funds_internal on node 7's upstream + // channels (which are disconnected), generating a preimage monitor update + PaymentForwarded. + for fulfill in first_updates.update_fulfill_htlcs.drain(..) { + nodes[7].node.handle_update_fulfill_htlc(node_i_id, fulfill); + check_added_monitors(&nodes[7], 1); } - - let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); - - nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); - nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); - check_added_monitors(&nodes[8], 2); - - let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id); - - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); - next_source += 1; - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); - next_source += 1; - if let Some(update) = updates.update_fulfill_htlcs.get(0) { - nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone()); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); + let events = nodes[7].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 6); + let mut seen_prev_node_ids = std::collections::HashSet::new(); + for event in events { + match event { + Event::PaymentForwarded { + prev_htlcs, + next_htlcs, + total_fee_earned_msat, + claim_from_onchain_tx, + .. + } => { + assert_eq!(total_fee_earned_msat, Some(1000)); + assert!(!claim_from_onchain_tx); + assert_eq!(prev_htlcs.len(), 1); + assert_eq!(next_htlcs.len(), 1); + let prev_node_id = prev_htlcs[0].node_id.unwrap(); + let next_node_id = next_htlcs[0].node_id.unwrap(); + assert_eq!(next_node_id, node_i_id); + // Each forward should come from a unique intermediate node (1-6) + assert!( + seen_prev_node_ids.insert(prev_node_id), + "Duplicate prev_node_id in PaymentForwarded events" + ); + }, + _ => panic!("Unexpected event {:?}", event), + } } - - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed); - nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); - if updates.update_fulfill_htlcs.get(0).is_some() { - check_added_monitors(&nodes[7], 5); - } else { - check_added_monitors(&nodes[7], 4); + // Verify all 6 intermediate nodes were seen + for i in 1..=6 { + assert!( + seen_prev_node_ids.contains(&nodes[i].node.get_our_node_id()), + "Missing PaymentForwarded for node {}", + i + ); } - + nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed); + check_added_monitors(&nodes[7], 1); let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); + nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); + check_added_monitors(&nodes[8], 1); nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); - check_added_monitors(&nodes[8], 2); + check_added_monitors(&nodes[8], 1); let raa = get_event_msg!(nodes[8], MessageSendEvent::SendRevokeAndACK, node_h_id); nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 55d4a84eb91..b92a6623579 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -1165,6 +1165,21 @@ pub enum UpdateFulfillCommitFetch { DuplicateClaim {}, } +/// The return type of get_update_fulfill_htlcs_and_commit. +pub(crate) enum UpdateFulfillsCommitFetch { + /// At least one HTLC fulfill was new and a monitor update was generated. + NewClaims { + /// The ChannelMonitorUpdate containing all payment preimages (and possibly the + /// commitment update). + monitor_update: ChannelMonitorUpdate, + /// The value of each successfully claimed HTLC, in msat, in the same order as the input. + /// `None` entries indicate duplicate claims. + htlc_value_msat: Vec>, + }, + /// All HTLC fulfills were duplicates. + AllDuplicateClaims {}, +} + /// Error returned when processing an invalid interactive-tx message from our counterparty. pub(super) struct InteractiveTxMsgError { /// The underlying error. @@ -7703,6 +7718,105 @@ where } } + /// Batch version of [`Self::get_update_fulfill_htlc_and_commit`] that fulfills multiple HTLCs + /// in a single commitment update, reducing round-trips when multiple MPP parts arrive on the + /// same channel. + pub fn get_update_fulfill_htlcs_and_commit( + &mut self, htlcs: &[(u64, Option, Option)], + payment_preimage: PaymentPreimage, logger: &L, + ) -> UpdateFulfillsCommitFetch { + let release_cs_monitor = self.context.blocked_monitor_updates.is_empty(); + let mut all_duplicate = true; + let mut htlc_value_msat = Vec::with_capacity(htlcs.len()); + let mut combined_monitor_update = ChannelMonitorUpdate { + update_id: 0, + updates: Vec::new(), + channel_id: Some(self.context.channel_id()), + }; + // Track whether any claim was not blocked (i.e. the HTLC was immediately fulfilled rather + // than placed in the holding cell). + let mut any_not_blocked = false; + + for (htlc_id, payment_info, attribution_data) in htlcs { + match self.get_update_fulfill_htlc( + *htlc_id, + payment_preimage, + payment_info.clone(), + attribution_data.clone(), + logger, + ) { + UpdateFulfillFetch::NewClaim { + monitor_update, + htlc_value_msat: value, + update_blocked, + } => { + if combined_monitor_update.update_id == 0 { + combined_monitor_update.update_id = monitor_update.update_id; + } + combined_monitor_update.updates.extend(monitor_update.updates); + htlc_value_msat.push(Some(value)); + all_duplicate = false; + if !update_blocked { + any_not_blocked = true; + } + }, + UpdateFulfillFetch::DuplicateClaim {} => { + htlc_value_msat.push(None); + }, + } + } + + if all_duplicate { + return UpdateFulfillsCommitFetch::AllDuplicateClaims {}; + } + + if release_cs_monitor && any_not_blocked { + let mut additional_update = self.build_commitment_no_status_check(logger); + // The N calls to get_update_fulfill_htlc and build_commitment_no_status_check + // each bumped latest_monitor_update_id, but we merged everything into a single + // ChannelMonitorUpdate with the first ID, so reset to that ID. + self.context.latest_monitor_update_id = combined_monitor_update.update_id; + combined_monitor_update.updates.append(&mut additional_update.updates); + } else { + let blocked_upd = self.context.blocked_monitor_updates.first(); + let new_mon_id = blocked_upd + .map(|upd| upd.update.update_id) + .unwrap_or(combined_monitor_update.update_id); + combined_monitor_update.update_id = new_mon_id; + for held_update in self.context.blocked_monitor_updates.iter_mut() { + held_update.update.update_id += 1; + } + + // Reset latest_monitor_update_id before building a new commitment so its ID is consecutive. + self.context.latest_monitor_update_id = self + .context + .blocked_monitor_updates + .last() + .map(|upd| upd.update.update_id) + .unwrap_or(combined_monitor_update.update_id); + + if any_not_blocked { + debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set"); + let update = self.build_commitment_no_status_check(logger); + self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate { update }); + } + } + + self.monitor_updating_paused( + false, + any_not_blocked, + false, + Vec::new(), + Vec::new(), + Vec::new(), + logger, + ); + UpdateFulfillsCommitFetch::NewClaims { + monitor_update: combined_monitor_update, + htlc_value_msat, + } + } + /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. /// if it was already resolved). Otherwise returns `Ok`. pub fn queue_fail_htlc( diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 7f6d6535e58..b4840491da5 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -62,7 +62,7 @@ use crate::ln::channel::{ self, hold_time_since, Channel, ChannelError, ChannelUpdateStatus, DisconnectResult, FundedChannel, FundingTxSigned, InboundV1Channel, InteractiveTxMsgError, OutboundHop, OutboundV1Channel, PendingV2Channel, ReconnectionMsg, ShutdownResult, SpliceFundingFailed, - StfuResponse, UpdateFulfillCommitFetch, WithChannelContext, + StfuResponse, UpdateFulfillCommitFetch, UpdateFulfillsCommitFetch, WithChannelContext, }; use crate::ln::channel_state::ChannelDetails; use crate::ln::funding::{FundingContribution, FundingTemplate}; @@ -9420,58 +9420,82 @@ impl< None }; let payment_info = Some(PaymentClaimDetails { mpp_parts, claiming_payment }); + + // Group sources by (counterparty_node_id, channel_id) so that multiple MPP + // parts on the same channel can be batched into a single commitment update. + let mut grouped_sources: Vec<(PublicKey, ChannelId, Vec)> = Vec::new(); for htlc in sources { - let this_mpp_claim = - pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { - let counterparty_id = htlc.prev_hop.counterparty_node_id; - let counterparty_id = counterparty_id - .expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let counterparty_id = htlc.prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let chan_id = htlc.prev_hop.channel_id; + if let Some(group) = grouped_sources + .iter_mut() + .find(|(cp, cid, _)| *cp == counterparty_id && *cid == chan_id) + { + group.2.push(htlc); + } else { + grouped_sources.push((counterparty_id, chan_id, vec![htlc])); + } + } + + for (_, _, group) in grouped_sources { + if group.len() == 1 { + // Single HTLC on this channel, use existing path. + let htlc = group.into_iter().next().unwrap(); + let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { + let counterparty_id = htlc.prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); (counterparty_id, htlc.prev_hop.channel_id, claim_ptr) }); - let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { - RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { - pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), - } - }); + let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); - // Create new attribution data as the final hop. Always report a zero hold time, because reporting a - // non-zero value will not make a difference in the penalty that may be applied by the sender. If there - // is a phantom hop, we need to double-process. - let attribution_data = - if let Some(phantom_secret) = htlc.prev_hop.phantom_shared_secret { - let attribution_data = - process_fulfill_attribution_data(None, &phantom_secret, 0); - Some(attribution_data) - } else { - None - }; + let attribution_data = + if let Some(phantom_secret) = htlc.prev_hop.phantom_shared_secret { + let attribution_data = + process_fulfill_attribution_data(None, &phantom_secret, 0); + Some(attribution_data) + } else { + None + }; - let attribution_data = process_fulfill_attribution_data( - attribution_data, - &htlc.prev_hop.incoming_packet_shared_secret, - 0, - ); + let attribution_data = process_fulfill_attribution_data( + attribution_data, + &htlc.prev_hop.incoming_packet_shared_secret, + 0, + ); - self.claim_funds_from_hop( - htlc.prev_hop, - payment_preimage, - payment_info.clone(), - Some(attribution_data), - |_, definitely_duplicate| { - debug_assert!( - !definitely_duplicate, - "We shouldn't claim duplicatively from a payment" - ); - ( - Some(MonitorUpdateCompletionAction::PaymentClaimed { - payment_hash, - pending_mpp_claim: this_mpp_claim, - }), - raa_blocker, - ) - }, - ); + self.claim_funds_from_hop( + htlc.prev_hop, + payment_preimage, + payment_info.clone(), + Some(attribution_data), + |_, definitely_duplicate| { + debug_assert!( + !definitely_duplicate, + "We shouldn't claim duplicatively from a payment" + ); + ( + Some(MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: this_mpp_claim, + }), + raa_blocker, + ) + }, + ); + } else { + // Multiple HTLCs on the same channel, batch into a single commitment. + self.claim_batch_funds_from_channel( + group, + payment_preimage, + payment_hash, + payment_info.clone(), + &pending_mpp_claim_ptr_opt, + ); + } } } else { for htlc in sources { @@ -9917,6 +9941,221 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } } + /// Batch-claims multiple HTLCs from the same channel in a single commitment update. + /// + /// This is used when an MPP payment has multiple parts arriving on the same channel, allowing + /// all of them to be fulfilled in one `commitment_signed` message rather than requiring a + /// round-trip (RAA) between each claim. + fn claim_batch_funds_from_channel( + &self, htlcs: Vec, payment_preimage: PaymentPreimage, + payment_hash: PaymentHash, payment_info: Option, + pending_mpp_claim_ptr_opt: &Option>>, + ) { + debug_assert!(htlcs.len() > 1); + + // If we haven't yet run background events assume we're still deserializing and shouldn't + // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as + // `BackgroundEvent`s. + let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire); + + let per_peer_state = self.per_peer_state.read().unwrap(); + + let counterparty_node_id = htlcs[0].prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let chan_id = htlcs[0].prev_hop.channel_id; + + // Build the per-HTLC attribution data and fulfill parameters. + let fulfill_params: Vec<_> = htlcs + .iter() + .enumerate() + .map(|(i, htlc)| { + let attribution_data = + if let Some(phantom_secret) = htlc.prev_hop.phantom_shared_secret { + Some(process_fulfill_attribution_data(None, &phantom_secret, 0)) + } else { + None + }; + + let attribution_data = process_fulfill_attribution_data( + attribution_data, + &htlc.prev_hop.incoming_packet_shared_secret, + 0, + ); + + // Only the first HTLC carries payment info, the ChannelMonitor deduplicates by + // payment hash so one preimage step with payment_info is sufficient. + let info = if i == 0 { payment_info.clone() } else { None }; + + (htlc.prev_hop.htlc_id, info, Some(attribution_data)) + }) + .collect(); + + let mut peer_state_lock = per_peer_state.get(&counterparty_node_id).map(|peer_mutex| peer_mutex.lock().unwrap()).expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + + { + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(mut chan_entry) = + peer_state.channel_by_id.entry(chan_id) + { + if let Some(chan) = chan_entry.get_mut().as_funded_mut() { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + let fulfill_res = chan.get_update_fulfill_htlcs_and_commit( + &fulfill_params, + payment_preimage, + &&logger, + ); + + let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + ( + counterparty_node_id, + chan_id, + PendingMPPClaimPointer(Arc::clone(pending_claim)), + ) + }); + let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); + + match fulfill_res { + UpdateFulfillsCommitFetch::NewClaims { + monitor_update, + htlc_value_msat, + } => { + // Register a single completion action and RAA blocker for the + // combined ChannelMonitorUpdate covering all claims in this batch. + peer_state + .monitor_update_blocked_actions + .entry(chan_id) + .or_insert(Vec::new()) + .push(MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: this_mpp_claim, + }); + let funding_txo = htlcs[0].prev_hop.outpoint; + if let Some(blocker) = raa_blocker { + peer_state + .actions_blocking_raa_monitor_updates + .entry(chan_id) + .or_insert_with(Vec::new) + .push(blocker); + } + if let Some(data) = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo, + monitor_update, + ) { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_post_monitor_update_chan_resume(data); + } + let _ = htlc_value_msat; + }, + UpdateFulfillsCommitFetch::AllDuplicateClaims {} => { + // This is a startup replay — the `ChannelMonitorUpdate` containing the + // preimages was already persisted before shutdown, but the PaymentClaimed + // event may not have been generated yet. Mirrors the `DuplicateClaim` + // handling in claim_mpp_part. + if let Some(raa_blocker) = raa_blocker { + let actions = &mut peer_state.actions_blocking_raa_monitor_updates; + let actions_list = actions.entry(chan_id).or_insert_with(Vec::new); + if !actions_list.contains(&raa_blocker) { + debug_assert!(during_init); + actions_list.push(raa_blocker); + } + } + + let action = MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: this_mpp_claim, + }; + + let in_flight_mons = peer_state.in_flight_monitor_updates.get(&chan_id); + if in_flight_mons.map(|(_, mons)| !mons.is_empty()).unwrap_or(false) { + peer_state + .monitor_update_blocked_actions + .entry(chan_id) + .or_insert_with(Vec::new) + .push(action); + return; + } + + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + + debug_assert!( + during_init, + "Duplicate batch claims should only occur during startup replay" + ); + self.handle_monitor_update_completion_actions([action]); + }, + } + } + return; + } + } + + // Channel is closed, fall back to per-HTLC claiming against the closed channel monitor. + // We register exactly one PaymentClaimed action and one RAA blocker for the entire batch + // (on the first HTLC only), matching the single ChannelMonitorUpdate semantics. This + // avoids N redundant handle_monitor_update_release calls when the blockers all share the + // same PendingMPPClaimPointer. + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + let mut this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { + let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); + (counterparty_node_id, chan_id, claim_ptr) + }); + let mut raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); + for htlc in htlcs { + let attribution_data = if let Some(phantom_secret) = htlc.prev_hop.phantom_shared_secret + { + Some(process_fulfill_attribution_data(None, &phantom_secret, 0)) + } else { + None + }; + + let attribution_data = process_fulfill_attribution_data( + attribution_data, + &htlc.prev_hop.incoming_packet_shared_secret, + 0, + ); + + // Only the first HTLC carries the completion action and RAA blocker. + let first_mpp_claim = this_mpp_claim.take(); + let first_raa_blocker = raa_blocker.take(); + self.claim_funds_from_hop( + htlc.prev_hop, + payment_preimage, + payment_info.clone(), + Some(attribution_data), + |_, definitely_duplicate| { + debug_assert!( + !definitely_duplicate, + "We shouldn't claim duplicatively from a payment" + ); + ( + first_mpp_claim.map(|claim| { + MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: Some(claim), + } + }), + first_raa_blocker, + ) + }, + ); + } + } + fn finalize_claims(&self, sources: Vec<(HTLCSource, Option)>) { // Decode attribution data to hold times. let hold_times = sources.into_iter().filter_map(|(source, attribution_data)| { @@ -20708,39 +20947,26 @@ mod tests { assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.drain(..).next().unwrap(), true, None); - // Claim the full MPP payment. Note that we can't use a test utility like - // claim_funds_along_route because the ordering of the messages causes the second half of the - // payment to be put in the holding cell, which confuses the test utilities. So we exchange the - // lightning messages manually. + // Claim the full MPP payment. Both parts are on the same channel, so they should be + // batched into a single commitment update. nodes[1].node.claim_funds(payment_preimage); expect_payment_claimed!(nodes[1], our_payment_hash, 200_000); - check_added_monitors(&nodes[1], 2); + check_added_monitors(&nodes[1], 1); - let mut bs_1st_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_1st_updates.update_fulfill_htlcs.remove(0)); + let mut bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + assert_eq!(bs_updates.update_fulfill_htlcs.len(), 2); + nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_updates.update_fulfill_htlcs.remove(0)); expect_payment_sent(&nodes[0], payment_preimage, None, false, false); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_1st_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_first_raa, as_first_cs) = get_revoke_commit_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_first_raa); - check_added_monitors(&nodes[1], 1); - let mut bs_2nd_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_first_cs); - check_added_monitors(&nodes[1], 1); - let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_2nd_updates.update_fulfill_htlcs.remove(0)); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_2nd_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let as_second_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_first_raa); - let as_second_updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_updates.update_fulfill_htlcs.remove(0)); + nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_updates.commitment_signed); check_added_monitors(&nodes[0], 1); - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_second_raa); + let (as_raa, as_cs) = get_revoke_commit_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_raa); check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_second_updates.commitment_signed); + nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_cs); check_added_monitors(&nodes[1], 1); - let bs_third_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_third_raa); + let bs_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_raa); check_added_monitors(&nodes[0], 1); // Note that successful MPP payments will generate a single PaymentSent event upon the first diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 807d1a1af39..fa95537d76e 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -3054,55 +3054,38 @@ fn auto_retry_partial_failure() { expect_htlc_failure_conditions(nodes[1].node.get_and_clear_pending_events(), &[]); nodes[1].node.process_pending_htlc_forwards(); expect_payment_claimable!(nodes[1], payment_hash, payment_secret, amt_msat); + // All 3 parts arrived on the same channel (chan_1), so claim_funds batches them + // into a single commitment update with all 3 update_fulfill_htlcmessages. nodes[1].node.claim_funds(payment_preimage); expect_payment_claimed!(nodes[1], payment_hash, amt_msat); + check_added_monitors(&nodes[1], 1); let mut bs_claim = get_htlc_update_msgs(&nodes[1], &node_a_id); - assert_eq!(bs_claim.update_fulfill_htlcs.len(), 1); + assert_eq!(bs_claim.update_fulfill_htlcs.len(), 3); nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); expect_payment_sent(&nodes[0], payment_preimage, None, false, false); + nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); + nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &bs_claim.commitment_signed); check_added_monitors(&nodes[0], 1); let (as_third_raa, as_third_cs) = get_revoke_commit_msgs(&nodes[0], &node_b_id); nodes[1].node.handle_revoke_and_ack(node_a_id, &as_third_raa); - check_added_monitors(&nodes[1], 4); - let mut bs_2nd_claim = get_htlc_update_msgs(&nodes[1], &node_a_id); - - nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_third_cs); - check_added_monitors(&nodes[1], 1); - let bs_third_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); - - nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_third_raa); - check_added_monitors(&nodes[0], 1); - expect_payment_path_successful!(nodes[0]); - - let bs_second_fulfill_a = bs_2nd_claim.update_fulfill_htlcs.remove(0); - let bs_second_fulfill_b = bs_2nd_claim.update_fulfill_htlcs.remove(0); - nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_second_fulfill_a); - nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_second_fulfill_b); - nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &bs_2nd_claim.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_fourth_raa, as_fourth_cs) = get_revoke_commit_msgs(&nodes[0], &node_b_id); - - nodes[1].node.handle_revoke_and_ack(node_a_id, &as_fourth_raa); check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_fourth_cs); + nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_third_cs); check_added_monitors(&nodes[1], 1); - let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); + let bs_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); - nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_second_raa); + nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_raa); check_added_monitors(&nodes[0], 1); let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 2); - if let Event::PaymentPathSuccessful { .. } = events[0] { - } else { - panic!(); - } - if let Event::PaymentPathSuccessful { .. } = events[1] { - } else { - panic!(); + assert_eq!(events.len(), 3); + for event in &events { + if let Event::PaymentPathSuccessful { .. } = event { + } else { + panic!("Unexpected event {:?}", event); + } } }