Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 36 additions & 202 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4654,34 +4654,20 @@ fn test_claim_to_closed_channel_blocks_claimed_event() {
}

#[test]
#[cfg(all(feature = "std", not(target_os = "windows")))]
fn test_single_channel_multiple_mpp() {
use crate::util::config::UserConfig;
use std::sync::atomic::{AtomicBool, Ordering};

// Test what happens when we attempt to claim an MPP with many parts that came to us through
// the same channel with a synchronous persistence interface which has very high latency.
//
// Previously, if a `revoke_and_ack` came in while we were still running in
// `ChannelManager::claim_payment` we'd end up hanging waiting to apply a
// `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test
// for more info.
// Test that when an MPP payment has many parts ariving on the same channel, all of them are
// claimed in a single commitment update rather than requiring a round-trip per claim.
let chanmon_cfgs = create_chanmon_cfgs(9);
let node_cfgs = create_node_cfgs(9, &chanmon_cfgs);
let mut config = test_default_channel_config();
// Set the percentage to the default value at the time this test was written
config.channel_handshake_config.announced_channel_max_inbound_htlc_value_in_flight_percentage =
10;
let configs: [Option<UserConfig>; 9] = core::array::from_fn(|_| Some(config.clone()));
let node_chanmgrs = create_node_chanmgrs(9, &node_cfgs, &configs);
let mut nodes = create_network(9, &node_cfgs, &node_chanmgrs);

let node_b_id = nodes[1].node.get_our_node_id();
let node_c_id = nodes[2].node.get_our_node_id();
let node_d_id = nodes[3].node.get_our_node_id();
let node_e_id = nodes[4].node.get_our_node_id();
let node_f_id = nodes[5].node.get_our_node_id();
let node_g_id = nodes[6].node.get_our_node_id();
let node_h_id = nodes[7].node.get_our_node_id();
let node_i_id = nodes[8].node.get_our_node_id();

Expand All @@ -4691,28 +4677,7 @@ fn test_single_channel_multiple_mpp() {
// 7
// 8
//
// We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test
// robust is rather challenging. We rely on having the main test thread wait on locks held in
// the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a
// single `ChannelMonitorUpdate`.
// This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both
// of which require `ChannelManager` locks, but we have to make sure this thread gets a chance
// to be blocked on the mutexes before we let the background thread wake `claim_funds` so that
// the mutex can switch to this main thread.
// This relies on our locks being fair, but also on our threads getting runtime during the test
// run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as
// possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending
// into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main
// thread to the background thread, and then we let it sleep a while before we send the
// `ChannelMonitorUpdate` unblocker.
// Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two
// `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so
// we use 6 HTLCs.
// Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement
// actual preemptive multitasking and thinks that cooperative multitasking somehow is
// acceptable in the 21st century, let alone a quarter of the way into it.
const MAX_THREAD_INIT_TIME: std::time::Duration = std::time::Duration::from_secs(1);

// All six parts converge on the same channel (7->8)
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 3, 100_000, 0);
Expand All @@ -4728,7 +4693,7 @@ fn test_single_channel_multiple_mpp() {
create_announced_chan_between_nodes_with_value(&nodes, 6, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 7, 8, 1_000_000, 0);

let (mut route, payment_hash, payment_preimage, payment_secret) =
let (route, payment_hash, payment_preimage, payment_secret) =
get_route_and_payment_hash!(&nodes[0], nodes[8], 50_000_000);

send_along_route_with_secret(
Expand All @@ -4747,177 +4712,46 @@ fn test_single_channel_multiple_mpp() {
payment_secret,
);

let (do_a_write, blocker) = std::sync::mpsc::sync_channel(0);
*nodes[8].chain_monitor.write_blocker.lock().unwrap() = Some(blocker);

// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
// We do this by casting a pointer to a `TestChannelManager` to a pointer to a
// `TestChannelManager` with different (in this case 'static) lifetime.
// This is even suggested in the second example at
// https://doc.rust-lang.org/std/mem/fn.transmute.html#examples
let claim_node: &'static TestChannelManager<'static, 'static> =
unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) };
let thrd = std::thread::spawn(move || {
// Initiate the claim in a background thread as it will immediately block waiting on the
// `write_blocker` we set above.
claim_node.claim_funds(payment_preimage);
});

// First unlock one monitor so that we have a pending
// `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty.
do_a_write.send(()).unwrap();

let event_node: &'static TestChannelManager<'static, 'static> =
unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) };
let thrd_event = std::thread::spawn(move || {
let mut have_event = false;
while !have_event {
let mut events = event_node.get_and_clear_pending_events();
assert!(events.len() == 1 || events.len() == 0);
if events.len() == 1 {
if let Event::PaymentClaimed { .. } = events[0] {
} else {
panic!("Unexpected event {events:?}");
}
have_event = true;
}
if !have_event {
std::thread::yield_now();
}
}
});

// Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the
// `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which
// `claim_funds` is holding. Thus, we release a second write after a small sleep in the
// background to give `claim_funds` a chance to step forward, unblocking
// `get_and_clear_pending_msg_events`.
let do_a_write_background = do_a_write.clone();
let block_thrd2 = AtomicBool::new(true);
let block_thrd2_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd2) };
let thrd2 = std::thread::spawn(move || {
while block_thrd2_read.load(Ordering::Acquire) {
std::thread::yield_now();
}
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
});
block_thrd2.store(false, Ordering::Release);
let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id);

// Thread 2 could unblock first, or it could get blocked waiting on us to process a
// `PaymentClaimed` event. Either way, wait until both have finished.
thrd2.join().unwrap();
thrd_event.join().unwrap();

// Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back
nodes[7].node.peer_disconnected(node_b_id);
nodes[7].node.peer_disconnected(node_c_id);
nodes[7].node.peer_disconnected(node_d_id);
nodes[7].node.peer_disconnected(node_e_id);
nodes[7].node.peer_disconnected(node_f_id);
nodes[7].node.peer_disconnected(node_g_id);

let first_update_fulfill = first_updates.update_fulfill_htlcs.remove(0);
nodes[7].node.handle_update_fulfill_htlc(node_i_id, first_update_fulfill);
check_added_monitors(&nodes[7], 1);
expect_payment_forwarded!(nodes[7], nodes[1], nodes[8], Some(1000), false, false);
nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed);
check_added_monitors(&nodes[7], 1);
let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id);

// Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on
// our peer lock, so we have to release a write to let it process.
// After this call completes, the channel previously would be locked up and should not be able
// to make further progress.
let do_a_write_background = do_a_write.clone();
let block_thrd3 = AtomicBool::new(true);
let block_thrd3_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd3) };
let thrd3 = std::thread::spawn(move || {
while block_thrd3_read.load(Ordering::Acquire) {
std::thread::yield_now();
}
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
});
block_thrd3.store(false, Ordering::Release);
nodes[8].node.handle_revoke_and_ack(node_h_id, &raa);
thrd3.join().unwrap();
assert!(!thrd.is_finished());

let thrd4 = std::thread::spawn(move || {
do_a_write.send(()).unwrap();
do_a_write.send(()).unwrap();
});

thrd4.join().unwrap();
thrd.join().unwrap();

// At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the
// above `revoke_and_ack`.
check_added_monitors(&nodes[8], 7);

// Now drive everything to the end, at least as far as node 7 is concerned...
*nodes[8].chain_monitor.write_blocker.lock().unwrap() = None;
nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs);
// All six parts are on the same channel, so claiming should produce a single batched
// ChannelMonitorUpdate containing all 6 preimages and one commitment.
nodes[8].node.claim_funds(payment_preimage);
expect_payment_claimed!(nodes[8], payment_hash, 50_000_000);
check_added_monitors(&nodes[8], 1);

let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id);

nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
expect_payment_forwarded!(nodes[7], nodes[2], nodes[8], Some(1000), false, false);
nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
expect_payment_forwarded!(nodes[7], nodes[3], nodes[8], Some(1000), false, false);
let mut next_source = 4;
if let Some(update) = updates.update_fulfill_htlcs.get(0) {
nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone());
expect_payment_forwarded!(nodes[7], nodes[4], nodes[8], Some(1000), false, false);
next_source += 1;
let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id);
assert_eq!(first_updates.update_fulfill_htlcs.len(), 6);

// Disconnect node 7 from intermediate nodes so it doesn't bother forwarding back.
nodes[7].node.peer_disconnected(nodes[1].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[2].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[3].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[4].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[5].node.get_our_node_id());
Comment on lines +4726 to +4729
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After delivering all 6 update_fulfill_htlc messages, the test asserts 6 PaymentForwarded events but doesn't validate their contents (e.g. which upstream channel each forward corresponds to, or fee amounts). The old test checked expect_payment_forwarded! with specific source nodes and fee values (Some(1000)). Consider adding at least a fee check to ensure the batch claim correctly reports forwarding fees.

nodes[7].node.peer_disconnected(nodes[6].node.get_our_node_id());

// Deliver all 6 fulfills to node 7 before handling the commitment_signed.
// Each handle_update_fulfill_htlc triggers claim_funds_internal on node 7's upstream
// channels (which are disconnected), generating a preimage monitor update + PaymentForwarded.
for fulfill in first_updates.update_fulfill_htlcs.drain(..) {
nodes[7].node.handle_update_fulfill_htlc(node_i_id, fulfill);
check_added_monitors(&nodes[7], 1);
}

nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed);
nodes[7].node.handle_revoke_and_ack(node_i_id, &raa);
if updates.update_fulfill_htlcs.get(0).is_some() {
check_added_monitors(&nodes[7], 5);
} else {
check_added_monitors(&nodes[7], 4);
let events = nodes[7].node.get_and_clear_pending_events();
assert_eq!(events.len(), 6);
for event in events {
match event {
Event::PaymentForwarded { .. } => {},
_ => panic!("Unexpected event {:?}", event),
}
}

nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed);
check_added_monitors(&nodes[7], 1);
let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id);

nodes[8].node.handle_revoke_and_ack(node_h_id, &raa);
check_added_monitors(&nodes[8], 1);
nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs);
check_added_monitors(&nodes[8], 2);

let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id);

nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
next_source += 1;
nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0));
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
next_source += 1;
if let Some(update) = updates.update_fulfill_htlcs.get(0) {
nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone());
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
}

nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed);
nodes[7].node.handle_revoke_and_ack(node_i_id, &raa);
if updates.update_fulfill_htlcs.get(0).is_some() {
check_added_monitors(&nodes[7], 5);
} else {
check_added_monitors(&nodes[7], 4);
}

let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id);
nodes[8].node.handle_revoke_and_ack(node_h_id, &raa);
nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs);
check_added_monitors(&nodes[8], 2);
check_added_monitors(&nodes[8], 1);

let raa = get_event_msg!(nodes[8], MessageSendEvent::SendRevokeAndACK, node_h_id);
nodes[7].node.handle_revoke_and_ack(node_i_id, &raa);
Expand Down
Loading
Loading