Skip to content

Commit d039d43

Browse files
joostjagerclaude
andcommitted
Use async ChainMonitor persister via new_async_beta
Switch from the sync MonitorUpdatingPersister to MonitorUpdatingPersisterAsync for ChainMonitor, enabling non-blocking channel monitor persistence at runtime. Previously, two separate persisters were created during build: an async one (used only to read monitors at startup, then discarded) and a sync one (passed to ChainMonitor::new for runtime persistence). Now a single AsyncPersister is used for both reading and ongoing persistence via ChainMonitor::new_async_beta. A DynStoreRef newtype is introduced to wrap Arc<DynStore> with a direct KVStore implementation, avoiding higher-ranked lifetime issues that arise when Arc<dyn DynStoreTrait> is used as a type parameter in complex generic bounds. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9443083 commit d039d43

File tree

2 files changed

+51
-32
lines changed

2 files changed

+51
-32
lines changed

src/builder.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ use crate::peer_store::PeerStore;
7575
use crate::runtime::{Runtime, RuntimeSpawner};
7676
use crate::tx_broadcaster::TransactionBroadcaster;
7777
use crate::types::{
78-
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
79-
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore,
80-
Persister, SyncAndAsyncKVStore,
78+
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
79+
GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager,
80+
PendingPaymentStore, SyncAndAsyncKVStore,
8181
};
8282
use crate::wallet::persist::KVStoreWalletPersister;
8383
use crate::wallet::Wallet;
@@ -1289,8 +1289,8 @@ fn build_with_store_internal(
12891289
));
12901290

12911291
let peer_storage_key = keys_manager.get_peer_storage_key();
1292-
let monitor_reader = Arc::new(AsyncPersister::new(
1293-
Arc::clone(&kv_store),
1292+
let persister = Arc::new(AsyncPersister::new(
1293+
DynStoreRef(Arc::clone(&kv_store)),
12941294
RuntimeSpawner::new(Arc::clone(&runtime)),
12951295
Arc::clone(&logger),
12961296
PERSISTER_MAX_PENDING_UPDATES,
@@ -1303,9 +1303,9 @@ fn build_with_store_internal(
13031303
// Read ChannelMonitors and the NetworkGraph
13041304
let kv_store_ref = Arc::clone(&kv_store);
13051305
let logger_ref = Arc::clone(&logger);
1306-
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
1306+
let (monitor_read_res, network_graph_res) = runtime.block_on(async {
13071307
tokio::join!(
1308-
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
1308+
persister.read_all_channel_monitors_with_updates_parallel(),
13091309
read_network_graph(&*kv_store_ref, logger_ref),
13101310
)
13111311
});
@@ -1323,23 +1323,16 @@ fn build_with_store_internal(
13231323
},
13241324
};
13251325

1326-
let persister = Arc::new(Persister::new(
1327-
Arc::clone(&kv_store),
1328-
Arc::clone(&logger),
1329-
PERSISTER_MAX_PENDING_UPDATES,
1330-
Arc::clone(&keys_manager),
1331-
Arc::clone(&keys_manager),
1332-
Arc::clone(&tx_broadcaster),
1333-
Arc::clone(&fee_estimator),
1334-
));
1326+
let persister = Arc::try_unwrap(persister)
1327+
.unwrap_or_else(|_| panic!("Arc<AsyncPersister> should have no other references"));
13351328

13361329
// Initialize the ChainMonitor
1337-
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
1330+
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new_async_beta(
13381331
Some(Arc::clone(&chain_source)),
13391332
Arc::clone(&tx_broadcaster),
13401333
Arc::clone(&logger),
13411334
Arc::clone(&fee_estimator),
1342-
Arc::clone(&persister),
1335+
persister,
13431336
Arc::clone(&keys_manager),
13441337
peer_storage_key,
13451338
true,

src/types.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use lightning::routing::gossip;
2323
use lightning::routing::router::DefaultRouter;
2424
use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters};
2525
use lightning::sign::InMemorySigner;
26-
use lightning::util::persist::{
27-
KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync,
28-
};
26+
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync};
2927
use lightning::util::ser::{Readable, Writeable, Writer};
3028
use lightning::util::sweep::OutputSweeper;
3129
use lightning_block_sync::gossip::GossipVerifier;
@@ -135,6 +133,35 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a {
135133

136134
pub(crate) type DynStore = dyn DynStoreTrait;
137135

136+
#[derive(Clone)]
137+
pub(crate) struct DynStoreRef(pub(crate) Arc<DynStore>);
138+
139+
impl KVStore for DynStoreRef {
140+
fn read(
141+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
142+
) -> impl Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static {
143+
DynStoreTrait::read_async(&*self.0, primary_namespace, secondary_namespace, key)
144+
}
145+
146+
fn write(
147+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
148+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
149+
DynStoreTrait::write_async(&*self.0, primary_namespace, secondary_namespace, key, buf)
150+
}
151+
152+
fn remove(
153+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
154+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
155+
DynStoreTrait::remove_async(&*self.0, primary_namespace, secondary_namespace, key, lazy)
156+
}
157+
158+
fn list(
159+
&self, primary_namespace: &str, secondary_namespace: &str,
160+
) -> impl Future<Output = Result<Vec<String>, bitcoin::io::Error>> + Send + 'static {
161+
DynStoreTrait::list_async(&*self.0, primary_namespace, secondary_namespace)
162+
}
163+
}
164+
138165
pub(crate) struct DynStoreWrapper<T: SyncAndAsyncKVStore + Send + Sync>(pub(crate) T);
139166

140167
impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T> {
@@ -188,7 +215,7 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
188215
}
189216

190217
pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
191-
Arc<DynStore>,
218+
DynStoreRef,
192219
RuntimeSpawner,
193220
Arc<Logger>,
194221
Arc<KeysManager>,
@@ -197,22 +224,21 @@ pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
197224
Arc<OnchainFeeEstimator>,
198225
>;
199226

200-
pub type Persister = MonitorUpdatingPersister<
201-
Arc<DynStore>,
202-
Arc<Logger>,
203-
Arc<KeysManager>,
204-
Arc<KeysManager>,
205-
Arc<Broadcaster>,
206-
Arc<OnchainFeeEstimator>,
207-
>;
208-
209227
pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
210228
InMemorySigner,
211229
Arc<ChainSource>,
212230
Arc<Broadcaster>,
213231
Arc<OnchainFeeEstimator>,
214232
Arc<Logger>,
215-
Arc<Persister>,
233+
chainmonitor::AsyncPersister<
234+
DynStoreRef,
235+
RuntimeSpawner,
236+
Arc<Logger>,
237+
Arc<KeysManager>,
238+
Arc<KeysManager>,
239+
Arc<Broadcaster>,
240+
Arc<OnchainFeeEstimator>,
241+
>,
216242
Arc<KeysManager>,
217243
>;
218244

0 commit comments

Comments
 (0)