From a90e9fedf91bac484d22181653d74011338c1aeb Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 12:12:32 -0400 Subject: [PATCH 1/9] Split up some tests that have many variants Helps when debugging to know which variants failed. --- lightning/src/ln/monitor_tests.rs | 60 +++++++++++++++++++++++++++++-- lightning/src/ln/payment_tests.rs | 26 +++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index f52f093917b..d156f874703 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3803,27 +3803,83 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b } #[test] -fn test_lost_timeout_monitor_events() { +fn test_lost_timeout_monitor_events_a() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_b() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_c() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_d() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_e() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_f() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_g() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_h() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_i() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_j() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, false); - +} +#[test] +fn test_lost_timeout_monitor_events_k() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_l() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_m() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_n() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_o() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_p() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_q() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_r() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_s() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_t() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, true); } diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 807d1a1af39..a196e5135c2 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -1430,15 +1430,39 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( } #[test] -fn test_dup_htlc_onchain_doesnt_fail_on_reload() { +fn test_dup_htlc_onchain_doesnt_fail_on_reload_a() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_b() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_c() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_d() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_e() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_f() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_g() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_h() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_i() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false, false); } From 8f127036af7eb2ecfe94d94668abf7ad265e2cd7 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 19 Mar 2026 16:08:46 -0400 Subject: [PATCH 2/9] Remove unnecessary pending_monitor_events clone --- lightning/src/chain/channelmonitor.rs | 31 +++++++++++++-------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 810de80da95..7a3c3315775 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -68,8 +68,8 @@ use crate::util::byte_utils; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; use crate::util::ser::{ - MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, Writeable, Writer, - U48, + Iterable, MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, + Writeable, Writer, U48, }; #[allow(unused_imports)] @@ -1719,24 +1719,23 @@ pub(crate) fn write_chanmon_internal( channel_monitor.lockdown_from_offchain.write(writer)?; channel_monitor.holder_tx_signed.write(writer)?; - // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. - let pending_monitor_events = - match channel_monitor.pending_monitor_events.iter().find(|ev| match ev { - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) { - Some(MonitorEvent::HolderForceClosedWithInfo { outpoint, .. }) => { - let mut pending_monitor_events = channel_monitor.pending_monitor_events.clone(); - pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); - pending_monitor_events - }, - _ => channel_monitor.pending_monitor_events.clone(), - }; + // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` + // for backwards compatibility. + let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events = Iterable( + channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), + ); write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required_vec), + (5, pending_monitor_events, required), // Equivalent to required_vec because Iterable also writes as WithoutLength (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), From f76f16627662a600641541e07d45fc87303ac9cd Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 15:26:43 -0400 Subject: [PATCH 3/9] Add persistent_monitor_events flag to monitors/manager Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. As a first step towards this, here we persist a flag in the ChannelManager and ChannelMonitors indicating whether this new feature is enabled. It will be used in upcoming commits to maintain compatibility and create an upgrade/downgrade path between LDK versions. --- lightning/src/chain/channelmonitor.rs | 20 +++++++++++++++++++ lightning/src/ln/channelmanager.rs | 28 +++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 7a3c3315775..18351f9c97d 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1281,6 +1281,12 @@ pub(crate) struct ChannelMonitorImpl { // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec, + /// When set, monitor events are retained until explicitly acked rather than cleared on read. + /// + /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on + /// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will + /// be always set once support for this feature is complete. + persistent_events_enabled: bool, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1732,8 +1738,12 @@ pub(crate) fn write_chanmon_internal( channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), ); + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); + write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), (5, pending_monitor_events, required), // Equivalent to required_vec because Iterable also writes as WithoutLength (7, channel_monitor.funding_spend_seen, required), @@ -1938,6 +1948,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + persistent_events_enabled: false, pending_events: Vec::new(), is_processing_pending_events: false, @@ -6695,8 +6706,10 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); let mut best_block_previous_blocks = None; + let mut persistent_events_enabled: Option<()> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), (5, pending_monitor_events, optional_vec), (7, funding_spend_seen, option), @@ -6723,6 +6736,12 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP best_block.previous_blocks = previous_blocks; } + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_events_enabled.is_some() { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue) + } + // Note that `payment_preimages_with_info` was added (and is always written) in LDK 0.1, so // we can use it to determine if this monitor was last written by LDK 0.1 or later. let written_by_0_1_or_later = payment_preimages_with_info.is_some(); @@ -6871,6 +6890,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events: pending_monitor_events.unwrap(), + persistent_events_enabled: persistent_events_enabled.is_some(), pending_events, is_processing_pending_events: false, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2c97e4adaa1..6327b22ce2b 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2928,6 +2928,15 @@ pub struct ChannelManager< /// [`ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee`] estimate. last_days_feerates: Mutex>, + /// When set, monitors will repeatedly provide an event back to the `ChannelManager` on restart + /// until the event is explicitly acknowledged as processed. + /// + /// Allows us to reconstruct pending HTLC state by replaying monitor events on startup, rather + /// than from complexly polling and reconciling Channel{Monitor} APIs, as well as move the + /// responsibility of off-chain payment resolution from the Channel to the monitor. Will be + /// always set once support is complete. + persistent_monitor_events: bool, + #[cfg(test)] pub(super) entropy_source: ES, #[cfg(not(test))] @@ -3658,6 +3667,8 @@ impl< signer_provider, logger, + + persistent_monitor_events: false, } } @@ -18102,6 +18113,9 @@ impl< } } + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_monitor_events = self.persistent_monitor_events.then_some(()); + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -18114,6 +18128,7 @@ impl< (9, htlc_purposes, required_vec), (10, legacy_in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), + (12, persistent_monitor_events, option), (13, htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_opt, option), (15, self.inbound_payment_id_secret, required), @@ -18213,6 +18228,7 @@ pub(super) struct ChannelManagerData { forward_htlcs_legacy: HashMap>, pending_intercepted_htlcs_legacy: HashMap, decode_update_add_htlcs_legacy: HashMap>, + persistent_monitor_events: bool, // The `ChannelManager` version that was written. version: u8, } @@ -18399,6 +18415,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> let mut peer_storage_dir: Option)>> = None; let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); let mut best_block_previous_blocks = None; + let mut persistent_monitor_events: Option<()> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs_legacy, option), @@ -18411,6 +18428,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (9, claimable_htlc_purposes, optional_vec), (10, legacy_in_flight_monitor_updates, option), (11, probing_cookie_secret, option), + (12, persistent_monitor_events, option), (13, amountless_claimable_htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_legacy, option), (15, inbound_payment_id_secret, option), @@ -18420,6 +18438,12 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (23, best_block_previous_blocks, option), }); + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_monitor_events.is_some() { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue); + } + // Merge legacy pending_outbound_payments fields into a single HashMap. // Priority: pending_outbound_payments (TLV 3) > pending_outbound_payments_no_retry (TLV 1) // > pending_outbound_payments_compat (non-TLV legacy) @@ -18539,6 +18563,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> peer_storage_dir: peer_storage_dir.unwrap_or_default(), async_receive_offer_cache, version, + persistent_monitor_events: persistent_monitor_events.is_some(), }) } } @@ -18839,6 +18864,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + persistent_monitor_events, version: _version, } = data; @@ -20100,6 +20126,8 @@ impl< logger: args.logger, config: RwLock::new(args.config), + + persistent_monitor_events, }; let mut processed_claims: HashSet> = new_hash_set(); From da3632b29f17e409e7a2128ee90e80caf618bcbc Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 15:08:54 -0400 Subject: [PATCH 4/9] Add helper to push monitor events Cleans up the next commit --- lightning/src/chain/chainmonitor.rs | 53 +++++++-------------------- lightning/src/chain/channelmonitor.rs | 26 +++++++++---- 2 files changed, 32 insertions(+), 47 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index ca01e95c054..cc56bfaebc8 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -366,9 +366,6 @@ pub struct ChainMonitor< fee_estimator: F, persister: P, _entropy_source: ES, - /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly - /// from the user and not from a [`ChannelMonitor`]. - pending_monitor_events: Mutex, PublicKey)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -436,7 +433,6 @@ where logger, fee_estimator: feeest, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::clone(&event_notifier), persister: AsyncPersister { persister, event_notifier }, @@ -657,7 +653,6 @@ where fee_estimator: feeest, persister, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), @@ -802,16 +797,11 @@ where return Ok(()); } let funding_txo = monitor_data.monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( + monitor_data.monitor.push_monitor_event(MonitorEvent::Completed { funding_txo, channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }], - monitor_data.monitor.get_counterparty_node_id(), - )); + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); self.event_notifier.notify(); Ok(()) @@ -824,14 +814,11 @@ where pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); let monitor = &monitors.get(&channel_id).unwrap().monitor; - let counterparty_node_id = monitor.get_counterparty_node_id(); - let funding_txo = monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), channel_id, - vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }], - counterparty_node_id, - )); + monitor_update_id, + }); self.event_notifier.notify(); } @@ -1266,21 +1253,13 @@ where // The channel is post-close (funding spend seen, lockdown, or // holder tx signed). Return InProgress so ChannelManager freezes // the channel until the force-close MonitorEvents are processed. - // Push a Completed event into pending_monitor_events so it gets - // picked up after the per-monitor events in the next - // release_pending_monitor_events call. - let funding_txo = monitor.get_funding_txo(); - let channel_id = monitor.channel_id(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, - channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor.get_latest_update_id(), - }], - monitor.get_counterparty_node_id(), - )); + // Push a Completed event into the monitor so it gets picked up + // in the next release_pending_monitor_events call. + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + monitor_update_id: monitor.get_latest_update_id(), + }); log_debug!( logger, "Deferring completion of ChannelMonitorUpdate id {:?} (channel is post-close)", @@ -1665,10 +1644,6 @@ where )); } } - // Drain pending_monitor_events (which includes deferred post-close - // completions) after per-monitor events so that force-close - // MonitorEvents are processed by ChannelManager first. - pending_monitor_events.extend(self.pending_monitor_events.lock().unwrap().split_off(0)); pending_monitor_events } } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 18351f9c97d..6e1a82ab8ba 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -183,6 +183,10 @@ impl Readable for ChannelMonitorUpdate { } } +fn push_monitor_event(pending_monitor_events: &mut Vec, event: MonitorEvent) { + pending_monitor_events.push(event); +} + /// An event to be processed by the ChannelManager. #[derive(Clone, PartialEq, Eq)] pub enum MonitorEvent { @@ -226,8 +230,6 @@ pub enum MonitorEvent { }, } impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent, - // Note that Completed is currently never serialized to disk as it is generated only in - // ChainMonitor. (0, Completed) => { (0, funding_txo, required), (2, monitor_update_id, required), @@ -2166,6 +2168,10 @@ impl ChannelMonitor { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } + pub(super) fn push_monitor_event(&self, event: MonitorEvent) { + self.inner.lock().unwrap().push_monitor_event(event); + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3891,7 +3897,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - self.pending_monitor_events.push(event); + push_monitor_event(&mut self.pending_monitor_events, event); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4552,6 +4558,10 @@ impl ChannelMonitorImpl { &self.outputs_to_watch } + fn push_monitor_event(&mut self, event: MonitorEvent) { + push_monitor_event(&mut self.pending_monitor_events, event); + } + fn get_and_clear_pending_monitor_events(&mut self) -> Vec { let mut ret = Vec::new(); mem::swap(&mut ret, &mut self.pending_monitor_events); @@ -5611,7 +5621,7 @@ impl ChannelMonitorImpl { ); log_info!(logger, "Channel closed by funding output spend in txid {txid}"); if !self.funding_spend_seen { - self.pending_monitor_events.push(MonitorEvent::CommitmentTxConfirmed(())); + self.push_monitor_event(MonitorEvent::CommitmentTxConfirmed(())); } self.funding_spend_seen = true; @@ -5786,7 +5796,7 @@ impl ChannelMonitorImpl { log_debug!(logger, "HTLC {} failure update in {} has got enough confirmations to be passed upstream", &payment_hash, entry.txid); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + self.push_monitor_event(MonitorEvent::HTLCEvent(HTLCUpdate { payment_hash, payment_preimage: None, source, @@ -5896,7 +5906,7 @@ impl ChannelMonitorImpl { log_error!(logger, "Failing back HTLC {} upstream to preserve the \ channel as the forward HTLC hasn't resolved and our backward HTLC \ expires soon at {}", log_bytes!(htlc.payment_hash.0), inbound_htlc_expiry); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source: source.clone(), payment_preimage: None, payment_hash: htlc.payment_hash, @@ -6313,7 +6323,7 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, @@ -6337,7 +6347,7 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, From 2e84b26867b42b33a8b9c3d578efde629aa3d09f Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 17:18:26 -0400 Subject: [PATCH 5/9] Rename pending_monitor_events to _legacy This field will be deprecated in upcoming commits when we start persisting MonitorEvent ids alongside the MonitorEvents. --- lightning/src/chain/channelmonitor.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 6e1a82ab8ba..611f83e9dd7 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1736,7 +1736,7 @@ pub(crate) fn write_chanmon_internal( None } }); - let pending_monitor_events = Iterable( + let pending_monitor_events_legacy = Iterable( channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), ); @@ -1747,7 +1747,7 @@ pub(crate) fn write_chanmon_internal( (1, channel_monitor.funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required), // Equivalent to required_vec because Iterable also writes as WithoutLength + (5, pending_monitor_events_legacy, required), // Equivalent to required_vec because Iterable also writes as WithoutLength (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -6645,16 +6645,16 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } - let pending_monitor_events_len: u64 = Readable::read(reader)?; - let mut pending_monitor_events = Some( - Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); - for _ in 0..pending_monitor_events_len { + let pending_monitor_events_legacy_len: u64 = Readable::read(reader)?; + let mut pending_monitor_events_legacy = Some( + Vec::with_capacity(cmp::min(pending_monitor_events_legacy_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); + for _ in 0..pending_monitor_events_legacy_len { let ev = match ::read(reader)? { 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?), 1 => MonitorEvent::HolderForceClosed(outpoint), _ => return Err(DecodeError::InvalidValue) }; - pending_monitor_events.as_mut().unwrap().push(ev); + pending_monitor_events_legacy.as_mut().unwrap().push(ev); } let pending_events_len: u64 = Readable::read(reader)?; @@ -6721,7 +6721,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (1, funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), - (5, pending_monitor_events, optional_vec), + (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), @@ -6774,11 +6774,11 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both // events, we can remove the `HolderForceClosed` event and just keep the `HolderForceClosedWithInfo`. - if let Some(ref mut pending_monitor_events) = pending_monitor_events { - if pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && - pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) + if let Some(ref mut evs) = pending_monitor_events_legacy { + if evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && + evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) { - pending_monitor_events.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); + evs.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); } } @@ -6899,7 +6899,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events.unwrap(), + pending_monitor_events: pending_monitor_events_legacy.unwrap(), persistent_events_enabled: persistent_events_enabled.is_some(), pending_events, is_processing_pending_events: false, From fd90e0c065108b81575ffa2d92804fd48643843d Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 13:37:38 -0400 Subject: [PATCH 6/9] Add chain::Watch ack_monitor_event API Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we add an as-yet-unused API to chain::Watch to allow the ChannelManager to tell the a ChannelMonitor that a MonitorEvent has been irrevocably processed and can be deleted. --- fuzz/src/chanmon_consistency.rs | 5 +++++ lightning/src/chain/chainmonitor.rs | 24 ++++++++++++++++++++++++ lightning/src/chain/channelmonitor.rs | 6 ++++++ lightning/src/chain/mod.rs | 14 ++++++++++++++ lightning/src/util/test_utils.rs | 6 +++++- 5 files changed, 54 insertions(+), 1 deletion(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index f20f93c789c..97e1422b542 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,6 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; +use lightning::chain::chainmonitor::MonitorEventSource; use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; use lightning::chain::transaction::OutPoint; use lightning::chain::{ @@ -367,6 +368,10 @@ impl chain::Watch for TestChainMonitor { ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } struct KeyProvider { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index cc56bfaebc8..7b965bea047 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -66,6 +66,21 @@ use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// Identifies the source of a [`MonitorEvent`] for acknowledgment via +/// [`chain::Watch::ack_monitor_event`] once the event has been processed. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MonitorEventSource { + /// The event ID assigned by the [`ChannelMonitor`]. + pub event_id: u64, + /// The channel from which the [`MonitorEvent`] originated. + pub channel_id: ChannelId, +} + +impl_writeable_tlv_based!(MonitorEventSource, { + (1, event_id, required), + (3, channel_id, required), +}); + /// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. enum PendingMonitorOp { /// A new monitor to insert and persist. @@ -1646,6 +1661,15 @@ where } pending_monitor_events } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + let monitors = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitors.get(&source.channel_id) { + monitor_state.monitor.ack_monitor_event(source.event_id); + } else { + debug_assert!(false, "Ack'd monitor events should always have a corresponding monitor"); + } + } } impl< diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 611f83e9dd7..eb2ea746642 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -2172,6 +2172,12 @@ impl ChannelMonitor { self.inner.lock().unwrap().push_monitor_event(event); } + /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. + /// Generally called by [`chain::Watch::ack_monitor_event`]. + pub fn ack_monitor_event(&self, _event_id: u64) { + // TODO: once events have ids, remove the corresponding event here + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 9692558cf7c..59400cc8da9 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,6 +18,7 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY, }; @@ -425,6 +426,15 @@ pub trait Watch { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + + /// Acknowledges and removes a [`MonitorEvent`] previously returned by + /// [`Watch::release_pending_monitor_events`] by its event ID. + /// + /// Once acknowledged, the event will no longer be returned by future calls to + /// [`Watch::release_pending_monitor_events`] and will not be replayed on restart. + /// + /// Events may be acknowledged in any order. + fn ack_monitor_event(&self, source: MonitorEventSource); } impl + ?Sized, W: Deref> @@ -447,6 +457,10 @@ impl + ?Sized, W: Der ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { self.deref().release_pending_monitor_events() } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.deref().ack_monitor_event(source) + } } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 57f9ba6b22f..c31a378be1a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -15,7 +15,7 @@ use crate::chain::chaininterface; #[cfg(any(test, feature = "_externalize_tests"))] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chaininterface::{ConfirmationTarget, TransactionType}; -use crate::chain::chainmonitor::{ChainMonitor, Persist}; +use crate::chain::chainmonitor::{ChainMonitor, MonitorEventSource, Persist}; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; @@ -734,6 +734,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } #[cfg(any(test, feature = "_externalize_tests"))] From cb4a804bb7b4defd419f909e9fcf6978968e24da Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 17:24:04 -0400 Subject: [PATCH 7/9] Add monitor event ids Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To allow the ChannelManager to ack specific monitor events once they are resolved in upcoming commits, here we give each MonitorEvent a corresponding unique id. It's implemented in such a way that we can delete legacy monitor event code in the future when the new persistent monitor events flag is enabled by default. --- fuzz/src/chanmon_consistency.rs | 2 +- lightning/src/chain/chainmonitor.rs | 2 +- lightning/src/chain/channelmonitor.rs | 200 +++++++++++++----- lightning/src/chain/mod.rs | 8 +- lightning/src/ln/chanmon_update_fail_tests.rs | 6 +- lightning/src/ln/channelmanager.rs | 2 +- lightning/src/util/test_utils.rs | 6 +- 7 files changed, 161 insertions(+), 65 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 97e1422b542..04d92901f32 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -365,7 +365,7 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7b965bea047..79ada7c70e6 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1639,7 +1639,7 @@ where fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { let _ = self.channel_monitor_updated(channel_id, update_id); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index eb2ea746642..e007260ed60 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -183,8 +183,13 @@ impl Readable for ChannelMonitorUpdate { } } -fn push_monitor_event(pending_monitor_events: &mut Vec, event: MonitorEvent) { - pending_monitor_events.push(event); +fn push_monitor_event( + pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent, + next_monitor_event_id: &mut u64, +) { + let id = *next_monitor_event_id; + *next_monitor_event_id += 1; + pending_monitor_events.push((id, event)); } /// An event to be processed by the ChannelManager. @@ -1282,13 +1287,14 @@ pub(crate) struct ChannelMonitorImpl { // Note that because the `event_lock` in `ChainMonitor` is only taken in // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. - pending_monitor_events: Vec, + pending_monitor_events: Vec<(u64, MonitorEvent)>, /// When set, monitor events are retained until explicitly acked rather than cleared on read. /// /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on /// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will /// be always set once support for this feature is complete. persistent_events_enabled: bool, + next_monitor_event_id: u64, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1669,32 +1675,38 @@ pub(crate) fn write_chanmon_internal( writer.write_all(&payment_preimage.0[..])?; } - writer.write_all( - &(channel_monitor - .pending_monitor_events - .iter() - .filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) - .count() as u64) - .to_be_bytes(), - )?; - for event in channel_monitor.pending_monitor_events.iter() { - match event { - MonitorEvent::HTLCEvent(upd) => { - 0u8.write(writer)?; - upd.write(writer)?; - }, - MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, - // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep - // backwards compatibility, we write a `HolderForceClosed` event along with the - // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. - MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, - _ => {}, // Covered in the TLV writes below + if !channel_monitor.persistent_events_enabled { + writer.write_all( + &(channel_monitor + .pending_monitor_events + .iter() + .filter(|(_, ev)| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; + for (_, event) in channel_monitor.pending_monitor_events.iter() { + match event { + MonitorEvent::HTLCEvent(upd) => { + 0u8.write(writer)?; + upd.write(writer)?; + }, + MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, + // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep + // backwards compatibility, we write a `HolderForceClosed` event along with the + // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. + MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below + } } + } else { + // If `persistent_events_enabled` is set, we'll write the events with their event ids in the + // TLV section below. + writer.write_all(&(0u64).to_be_bytes())?; } writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?; @@ -1729,25 +1741,40 @@ pub(crate) fn write_chanmon_internal( // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` // for backwards compatibility. - let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| { - if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { - Some(MonitorEvent::HolderForceClosed(*outpoint)) - } else { - None - } - }); - let pending_monitor_events_legacy = Iterable( - channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), - ); + let holder_force_closed_compat = + channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled { + Some(Iterable( + channel_monitor + .pending_monitor_events + .iter() + .map(|(_, ev)| ev) + .chain(holder_force_closed_compat.as_ref()), + )) + } else { + None + }; // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); + let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { + Some(Iterable(channel_monitor.pending_monitor_events.iter())) + } else { + None + }; write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events_legacy, required), // Equivalent to required_vec because Iterable also writes as WithoutLength + (4, pending_mon_evs_with_ids, option), + (5, pending_monitor_events_legacy, option), // Equivalent to optional_vec because Iterable also writes as WithoutLength (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -1767,6 +1794,7 @@ pub(crate) fn write_chanmon_internal( (35, channel_monitor.is_manual_broadcast, required), (37, channel_monitor.funding_seen_onchain, required), (39, channel_monitor.best_block.previous_blocks, required), + (41, channel_monitor.next_monitor_event_id, required), }); Ok(()) @@ -1951,6 +1979,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), persistent_events_enabled: false, + next_monitor_event_id: 0, pending_events: Vec::new(), is_processing_pending_events: false, @@ -2164,7 +2193,7 @@ impl ChannelMonitor { /// Get the list of HTLCs who's status has been updated on chain. This should be called by /// ChannelManager via [`chain::Watch::release_pending_monitor_events`]. - pub fn get_and_clear_pending_monitor_events(&self) -> Vec { + pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } @@ -2178,6 +2207,20 @@ impl ChannelMonitor { // TODO: once events have ids, remove the corresponding event here } + /// Copies [`MonitorEvent`] state from `other` into `self`. + /// Used in tests to align transient runtime state before equality comparison after a + /// serialization round-trip. + #[cfg(any(test, feature = "_test_utils"))] + pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { + let (pending, next_id) = { + let other_inner = other.inner.lock().unwrap(); + (other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id) + }; + let mut self_inner = self.inner.lock().unwrap(); + self_inner.pending_monitor_events = pending; + self_inner.next_monitor_event_id = next_id; + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3903,7 +3946,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - push_monitor_event(&mut self.pending_monitor_events, event); + push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4494,12 +4537,16 @@ impl ChannelMonitorImpl { "Failing HTLC from late counterparty commitment update immediately \ (funding spend already confirmed)" ); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash, - payment_preimage: None, - source: source.clone(), - htlc_value_satoshis, - })); + push_monitor_event( + &mut self.pending_monitor_events, + MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash, + payment_preimage: None, + source: source.clone(), + htlc_value_satoshis, + }), + &mut self.next_monitor_event_id, + ); self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC { commitment_tx_output_idx: None, resolving_txid: Some(confirmed_txid), @@ -4565,10 +4612,14 @@ impl ChannelMonitorImpl { } fn push_monitor_event(&mut self, event: MonitorEvent) { - push_monitor_event(&mut self.pending_monitor_events, event); + push_monitor_event( + &mut self.pending_monitor_events, + event, + &mut self.next_monitor_event_id, + ); } - fn get_and_clear_pending_monitor_events(&mut self) -> Vec { + fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { let mut ret = Vec::new(); mem::swap(&mut ret, &mut self.pending_monitor_events); ret @@ -5899,7 +5950,7 @@ impl ChannelMonitorImpl { continue; } let duplicate_event = self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -5917,7 +5968,7 @@ impl ChannelMonitorImpl { payment_preimage: None, payment_hash: htlc.payment_hash, htlc_value_satoshis: Some(htlc.amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } } @@ -6316,7 +6367,7 @@ impl ChannelMonitorImpl { if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), height, @@ -6334,11 +6385,11 @@ impl ChannelMonitorImpl { payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else if offered_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { @@ -6358,7 +6409,7 @@ impl ChannelMonitorImpl { payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else { self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { @@ -6723,10 +6774,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut funding_seen_onchain = RequiredWrapper(None); let mut best_block_previous_blocks = None; let mut persistent_events_enabled: Option<()> = None; + let mut next_monitor_event_id: Option = None; + let mut pending_mon_evs_with_ids: Option> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), + (4, pending_mon_evs_with_ids, optional_vec), (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), @@ -6747,6 +6801,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), (39, best_block_previous_blocks, option), // Added and always set in 0.3 + (41, next_monitor_event_id, option), }); if let Some(previous_blocks) = best_block_previous_blocks { best_block.previous_blocks = previous_blocks; @@ -6788,6 +6843,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } + // If persistent events are enabled, use the events with their persisted IDs from TLV 4. + // Otherwise, use the legacy events from TLV 5 and assign sequential IDs. + let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) = + if persistent_events_enabled.is_some() { + let evs = pending_mon_evs_with_ids.unwrap_or_default() + .into_iter().map(|ev| (ev.0, ev.1)).collect(); + (next_monitor_event_id.unwrap_or(0), evs) + } else if let Some(events) = pending_monitor_events_legacy { + let next_id = next_monitor_event_id.unwrap_or(events.len() as u64); + let evs = events.into_iter().enumerate() + .map(|(i, ev)| (i as u64, ev)).collect(); + (next_id, evs) + } else { + (next_monitor_event_id.unwrap_or(0), Vec::new()) + }; + let channel_parameters = channel_parameters.unwrap_or_else(|| { onchain_tx_handler.channel_parameters().clone() }); @@ -6905,8 +6976,9 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events_legacy.unwrap(), + pending_monitor_events, persistent_events_enabled: persistent_events_enabled.is_some(), + next_monitor_event_id, pending_events, is_processing_pending_events: false, @@ -6955,6 +7027,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } +/// Deserialization wrapper for reading a `(u64, MonitorEvent)`. +/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait +/// conflicts. +struct ReadableIdMonitorEvent(u64, MonitorEvent); + +impl MaybeReadable for ReadableIdMonitorEvent { + fn read(reader: &mut R) -> Result, DecodeError> { + let id: u64 = Readable::read(reader)?; + let event_opt: Option = MaybeReadable::read(reader)?; + match event_opt { + Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))), + None => Ok(None), + } + } +} + #[cfg(test)] pub(super) fn dummy_monitor( channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S, diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 59400cc8da9..b7ff2cb917c 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -417,6 +417,10 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. /// + /// Each event comes with a corresponding id. Once the event is processed, call + /// [`Watch::ack_monitor_event`] with the corresponding id and channel id. Unacknowledged events + /// will be re-provided by this method after startup. + /// /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. @@ -425,7 +429,7 @@ pub trait Watch { /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)>; /// Acknowledges and removes a [`MonitorEvent`] previously returned by /// [`Watch::release_pending_monitor_events`] by its event ID. @@ -454,7 +458,7 @@ impl + ?Sized, W: Der fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { self.deref().release_pending_monitor_events() } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index af4d1569d0c..4d1451e3ee6 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -5021,7 +5021,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but // separately. @@ -5069,7 +5069,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both @@ -5115,7 +5115,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - if let MonitorEvent::Completed { monitor_update_id, .. } = &completed_persist[0].2[0] { + if let (_, MonitorEvent::Completed { monitor_update_id, .. }) = &completed_persist[0].2[0] { assert_eq!(*monitor_update_id, 4); } else { panic!(); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6327b22ce2b..0384fed65b9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13522,7 +13522,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for monitor_event in monitor_events.drain(..) { + for (_event_id, monitor_event) in monitor_events.drain(..) { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index c31a378be1a..0961f31cfe6 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -649,6 +649,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { ) .unwrap() .1; + new_monitor.copy_monitor_event_state(&monitor); assert!(new_monitor == monitor); self.latest_monitor_update_id .lock() @@ -710,6 +711,9 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // it so it doesn't leak into the rest of the test. let failed_back = monitor.inner.lock().unwrap().failed_back_htlc_ids.clone(); new_monitor.inner.lock().unwrap().failed_back_htlc_ids = failed_back; + // The deserialized monitor will reset the monitor event state, so copy it from the live + // monitor before comparing. + new_monitor.copy_monitor_event_state(&monitor); if let Some(chan_id) = self.expect_monitor_round_trip_fail.lock().unwrap().take() { assert_eq!(chan_id, channel_id); assert!(new_monitor != *monitor); @@ -723,7 +727,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { // Auto-flush pending operations so that the ChannelManager can pick up monitor // completion events. When not in deferred mode the queue is empty so this only // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) From 491f582855f9059b802bc91d3814eb9da5e6d900 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 13:45:21 -0400 Subject: [PATCH 8/9] Ack monitor events immediately Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here for the purposes of merging initial support for persistent monitor events, we ack each immediately after it is received/handled by the ChannelManager, which is equivalent to the behavior we had prior to monitor events becoming persistent. In upcoming work, we'll want to have much more special handling for HTLCUpdate monitor events in particular -- e.g. for outbound payment claim events, we should only ACK the monitor event when the PaymentSent event is processed, until that point we want it to keep being provided back to us on startup. All the other monitor events are trivial to ACK, since they don't need to be re-processed on startup. --- lightning/src/ln/channelmanager.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 0384fed65b9..89ebb41ce71 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -42,6 +42,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, WithChannelMonitor, ANTI_REORG_DELAY, CLTV_CLAIM_BUFFER, HTLC_FAIL_BACK_BUFFER, @@ -13522,7 +13523,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for (_event_id, monitor_event) in monitor_events.drain(..) { + for (event_id, monitor_event) in monitor_events.drain(..) { + let monitor_event_source = MonitorEventSource { event_id, channel_id }; match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( @@ -13572,6 +13574,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ completion_update, ); } + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::HolderForceClosed(_) | MonitorEvent::HolderForceClosedWithInfo { .. } => { @@ -13605,6 +13608,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::CommitmentTxConfirmed(_) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -13626,6 +13632,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::Completed { channel_id, monitor_update_id, .. } => { self.channel_monitor_updated( @@ -13633,6 +13642,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(monitor_update_id), &counterparty_node_id, ); + self.chain_monitor.ack_monitor_event(monitor_event_source); }, } } From 76c142d868cebd59b128c51256367441747153ca Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 15:31:39 -0400 Subject: [PATCH 9/9] Support persistent monitor events Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we complete work that was built on recent prior commits and actually start re-providing monitor events on startup if they went un-acked during runtime. This isn't actually supported in prod yet, so this new code will run randomly in tests, to ensure we still support the old paths. --- lightning/src/chain/channelmonitor.rs | 61 +++++++++++++++++++++------ lightning/src/ln/channelmanager.rs | 27 +++++++++++- lightning/src/ln/monitor_tests.rs | 3 ++ lightning/src/util/config.rs | 8 ++++ 4 files changed, 86 insertions(+), 13 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index e007260ed60..a85cae4b90c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1288,6 +1288,11 @@ pub(crate) struct ChannelMonitorImpl { // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec<(u64, MonitorEvent)>, + // `MonitorEvent`s that have been provided to the `ChannelManager` via + // [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting + // [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it + // will be re-provided to the `ChannelManager` on startup. + provided_monitor_events: Vec<(u64, MonitorEvent)>, /// When set, monitor events are retained until explicitly acked rather than cleared on read. /// /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on @@ -1764,7 +1769,12 @@ pub(crate) fn write_chanmon_internal( // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { - Some(Iterable(channel_monitor.pending_monitor_events.iter())) + Some(Iterable( + channel_monitor + .provided_monitor_events + .iter() + .chain(channel_monitor.pending_monitor_events.iter()), + )) } else { None }; @@ -1978,6 +1988,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + provided_monitor_events: Vec::new(), persistent_events_enabled: false, next_monitor_event_id: 0, pending_events: Vec::new(), @@ -2203,8 +2214,15 @@ impl ChannelMonitor { /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. /// Generally called by [`chain::Watch::ack_monitor_event`]. - pub fn ack_monitor_event(&self, _event_id: u64) { - // TODO: once events have ids, remove the corresponding event here + pub fn ack_monitor_event(&self, event_id: u64) { + let inner = &mut *self.inner.lock().unwrap(); + inner.ack_monitor_event(event_id); + } + + /// Enables persistent monitor events mode. When enabled, monitor events are retained until + /// explicitly acked rather than cleared on read. + pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) { + self.inner.lock().unwrap().persistent_events_enabled = enabled; } /// Copies [`MonitorEvent`] state from `other` into `self`. @@ -2212,11 +2230,16 @@ impl ChannelMonitor { /// serialization round-trip. #[cfg(any(test, feature = "_test_utils"))] pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { - let (pending, next_id) = { + let (provided, pending, next_id) = { let other_inner = other.inner.lock().unwrap(); - (other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id) + ( + other_inner.provided_monitor_events.clone(), + other_inner.pending_monitor_events.clone(), + other_inner.next_monitor_event_id, + ) }; let mut self_inner = self.inner.lock().unwrap(); + self_inner.provided_monitor_events = provided; self_inner.pending_monitor_events = pending; self_inner.next_monitor_event_id = next_id; } @@ -4619,10 +4642,23 @@ impl ChannelMonitorImpl { ); } + fn ack_monitor_event(&mut self, event_id: u64) { + self.provided_monitor_events.retain(|(id, _)| *id != event_id); + // If this event was generated prior to a restart, it may be in this queue instead + self.pending_monitor_events.retain(|(id, _)| *id != event_id); + } + fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut self.pending_monitor_events); - ret + if self.persistent_events_enabled { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + self.provided_monitor_events.extend(ret.iter().cloned()); + ret + } else { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + ret + } } /// Gets the set of events that are repeated regularly (e.g. those which RBF bump @@ -5949,8 +5985,8 @@ impl ChannelMonitorImpl { if inbound_htlc_expiry > max_expiry_height { continue; } - let duplicate_event = self.pending_monitor_events.iter().any( - |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { + let duplicate_event = self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()) + .any(|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -6366,7 +6402,7 @@ impl ChannelMonitorImpl { // HTLC resolution backwards to and figure out whether we learned a preimage from it. if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { - if !self.pending_monitor_events.iter().any( + if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any( |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), @@ -6388,7 +6424,7 @@ impl ChannelMonitorImpl { }), &mut self.next_monitor_event_id); } } else if offered_preimage_claim { - if !self.pending_monitor_events.iter().any( + if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any( |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { @@ -6977,6 +7013,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events, + provided_monitor_events: Vec::new(), persistent_events_enabled: persistent_events_enabled.is_some(), next_monitor_event_id, pending_events, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 89ebb41ce71..3d62573eec5 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3613,6 +3613,8 @@ impl< our_network_pubkey, current_timestamp, expanded_inbound_key, node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(), ); + #[cfg(any(test, feature = "_test_utils"))] + let override_persistent_monitor_events = config.override_persistent_monitor_events; ChannelManager { config: RwLock::new(config), @@ -3669,7 +3671,27 @@ impl< logger, - persistent_monitor_events: false, + persistent_monitor_events: { + #[cfg(not(any(test, feature = "_test_utils")))] + { false } + #[cfg(any(test, feature = "_test_utils"))] + { + override_persistent_monitor_events.unwrap_or_else(|| { + use core::hash::{BuildHasher, Hasher}; + match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") { + Ok(val) => match val.as_str() { + "1" => true, + "0" => false, + _ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val), + }, + Err(_) => { + let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish(); + rand_val % 2 == 0 + }, + } + }) + } + }, } } @@ -11614,6 +11636,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ fail_chan!("Already had channel with the new channel_id"); }, hash_map::Entry::Vacant(e) => { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { // There's no problem signing a counterparty's funding transaction if our monitor @@ -11784,6 +11807,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match chan .funding_signed(&msg, best_block, &self.signer_provider, &self.logger) .and_then(|(funded_chan, monitor)| { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); self.chain_monitor .watch_channel(funded_chan.context.channel_id(), monitor) .map_err(|()| { @@ -12698,6 +12722,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(chan) = chan.as_funded_mut() { if let Some(monitor) = monitor_opt { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index d156f874703..438691b71c7 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3594,6 +3594,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b let mut cfg = test_default_channel_config(); cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + // This test specifically tests lost monitor events, which requires the legacy + // (non-persistent) monitor event behavior. + cfg.override_persistent_monitor_events = Some(false); let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())]; let chanmon_cfgs = create_chanmon_cfgs(3); diff --git a/lightning/src/util/config.rs b/lightning/src/util/config.rs index ebef8c27bca..b6b70554b3f 100644 --- a/lightning/src/util/config.rs +++ b/lightning/src/util/config.rs @@ -1132,6 +1132,10 @@ pub struct UserConfig { /// /// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel pub reject_inbound_splices: bool, + /// If set to `Some`, overrides the random selection of whether to use persistent monitor + /// events. Only available in tests. + #[cfg(any(test, feature = "_test_utils"))] + pub override_persistent_monitor_events: Option, } impl Default for UserConfig { @@ -1148,6 +1152,8 @@ impl Default for UserConfig { enable_htlc_hold: false, hold_outbound_htlcs_at_next_hop: false, reject_inbound_splices: true, + #[cfg(any(test, feature = "_test_utils"))] + override_persistent_monitor_events: None, } } } @@ -1170,6 +1176,8 @@ impl Readable for UserConfig { hold_outbound_htlcs_at_next_hop: Readable::read(reader)?, enable_htlc_hold: Readable::read(reader)?, reject_inbound_splices: Readable::read(reader)?, + #[cfg(any(test, feature = "_test_utils"))] + override_persistent_monitor_events: None, }) } }