Skip to content

Commit 70288e6

Browse files
committed
Use BDK events in update_payment_store instead of scanning all transactions
Replace the full transaction list scan in `update_payment_store` with handling of BDK's `WalletEvent` stream during sync. This leverages the new events in BDK 2.2, reduces redundant work, and prepares the foundation for reliable RBF/CPFP tracking via `WalletEvent::TxReplaced`
1 parent 5af6ea0 commit 70288e6

File tree

1 file changed

+208
-45
lines changed

1 file changed

+208
-45
lines changed

src/wallet/mod.rs

Lines changed: 208 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex};
1212

1313
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
1414
use bdk_wallet::descriptor::ExtendedDescriptor;
15+
use bdk_wallet::event::WalletEvent;
1516
#[allow(deprecated)]
1617
use bdk_wallet::SignOptions;
1718
use bdk_wallet::{Balance, KeychainKind, PersistedWallet, Update};
@@ -49,8 +50,10 @@ use crate::config::Config;
4950
use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator};
5051
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
5152
use crate::payment::store::ConfirmationStatus;
52-
use crate::payment::{PaymentDetails, PaymentDirection, PaymentStatus};
53-
use crate::types::{Broadcaster, PaymentStore};
53+
use crate::payment::{
54+
PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails,
55+
};
56+
use crate::types::{Broadcaster, PaymentStore, PendingPaymentStore};
5457
use crate::Error;
5558

5659
pub(crate) enum OnchainSendAmount {
@@ -71,18 +74,28 @@ pub(crate) struct Wallet {
7174
payment_store: Arc<PaymentStore>,
7275
config: Arc<Config>,
7376
logger: Arc<Logger>,
77+
pending_payment_store: Arc<PendingPaymentStore>,
7478
}
7579

7680
impl Wallet {
7781
pub(crate) fn new(
7882
wallet: bdk_wallet::PersistedWallet<KVStoreWalletPersister>,
7983
wallet_persister: KVStoreWalletPersister, broadcaster: Arc<Broadcaster>,
8084
fee_estimator: Arc<OnchainFeeEstimator>, payment_store: Arc<PaymentStore>,
81-
config: Arc<Config>, logger: Arc<Logger>,
85+
config: Arc<Config>, logger: Arc<Logger>, pending_payment_store: Arc<PendingPaymentStore>,
8286
) -> Self {
8387
let inner = Mutex::new(wallet);
8488
let persister = Mutex::new(wallet_persister);
85-
Self { inner, persister, broadcaster, fee_estimator, payment_store, config, logger }
89+
Self {
90+
inner,
91+
persister,
92+
broadcaster,
93+
fee_estimator,
94+
payment_store,
95+
config,
96+
logger,
97+
pending_payment_store,
98+
}
8699
}
87100

88101
pub(crate) fn get_full_scan_request(&self) -> FullScanRequest<KeychainKind> {
@@ -114,15 +127,15 @@ impl Wallet {
114127

115128
pub(crate) fn apply_update(&self, update: impl Into<Update>) -> Result<(), Error> {
116129
let mut locked_wallet = self.inner.lock().unwrap();
117-
match locked_wallet.apply_update(update) {
118-
Ok(()) => {
130+
match locked_wallet.apply_update_events(update) {
131+
Ok(events) => {
119132
let mut locked_persister = self.persister.lock().unwrap();
120133
locked_wallet.persist(&mut locked_persister).map_err(|e| {
121134
log_error!(self.logger, "Failed to persist wallet: {}", e);
122135
Error::PersistenceFailed
123136
})?;
124137

125-
self.update_payment_store(&mut *locked_wallet).map_err(|e| {
138+
self.update_payment_store(&mut *locked_wallet, events).map_err(|e| {
126139
log_error!(self.logger, "Failed to update payment store: {}", e);
127140
Error::PersistenceFailed
128141
})?;
@@ -167,42 +180,159 @@ impl Wallet {
167180

168181
fn update_payment_store<'a>(
169182
&self, locked_wallet: &'a mut PersistedWallet<KVStoreWalletPersister>,
183+
mut events: Vec<WalletEvent>,
170184
) -> Result<(), Error> {
171-
for wtx in locked_wallet.transactions() {
172-
let id = PaymentId(wtx.tx_node.txid.to_byte_array());
173-
let txid = wtx.tx_node.txid;
174-
let (payment_status, confirmation_status) = match wtx.chain_position {
175-
bdk_chain::ChainPosition::Confirmed { anchor, .. } => {
176-
let confirmation_height = anchor.block_id.height;
185+
if events.is_empty() {
186+
return Ok(());
187+
}
188+
189+
// Sort events to ensure proper sequencing for data consistency:
190+
// 1. TXReplaced (0) before TxUnconfirmed (1) - Critical for RBF handling
191+
// When a transaction is replaced via RBF, both events fire. Processing
192+
// TXReplaced first stores the replaced transaction, allowing TxUnconfirmed
193+
// to detect and skip duplicate payment record creation.
194+
// 2. TxConfirmed (2) before ChainTipChanged (3) - Ensures height accuracy
195+
// ChainTipChanged updates block height. Processing TxConfirmed first ensures
196+
// it references the correct height for confirmation depth calculations.
197+
// 3. Other events follow in deterministic order for predictable processing
198+
if events.len() > 1 {
199+
events.sort_by_key(|e| match e {
200+
WalletEvent::TxReplaced { .. } => 0,
201+
WalletEvent::TxUnconfirmed { .. } => 1,
202+
WalletEvent::TxConfirmed { .. } => 2,
203+
WalletEvent::ChainTipChanged { .. } => 3,
204+
WalletEvent::TxDropped { .. } => 4,
205+
_ => 5,
206+
});
207+
}
208+
209+
for event in events {
210+
match event {
211+
WalletEvent::TxConfirmed { txid, tx, block_time, .. } => {
177212
let cur_height = locked_wallet.latest_checkpoint().height();
213+
let confirmation_height = block_time.block_id.height;
178214
let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1
179215
{
180216
PaymentStatus::Succeeded
181217
} else {
182218
PaymentStatus::Pending
183219
};
220+
184221
let confirmation_status = ConfirmationStatus::Confirmed {
185-
block_hash: anchor.block_id.hash,
222+
block_hash: block_time.block_id.hash,
186223
height: confirmation_height,
187-
timestamp: anchor.confirmation_time,
224+
timestamp: block_time.confirmation_time,
188225
};
189-
(payment_status, confirmation_status)
226+
227+
let payment_id = self
228+
.find_payment_by_txid(txid)
229+
.unwrap_or_else(|| PaymentId(txid.to_byte_array()));
230+
231+
let payment = self.create_payment_from_tx(
232+
locked_wallet,
233+
txid,
234+
payment_id,
235+
&tx,
236+
payment_status,
237+
confirmation_status,
238+
);
239+
240+
let pending_payment =
241+
self.create_pending_payment_from_tx(payment.clone(), Vec::new());
242+
243+
self.payment_store.insert_or_update(payment)?;
244+
self.pending_payment_store.insert_or_update(pending_payment)?;
190245
},
191-
bdk_chain::ChainPosition::Unconfirmed { .. } => {
192-
(PaymentStatus::Pending, ConfirmationStatus::Unconfirmed)
246+
WalletEvent::ChainTipChanged { new_tip, .. } => {
247+
// Get all payments that are Pending with Confirmed status
248+
let pending_payments: Vec<PendingPaymentDetails> =
249+
self.pending_payment_store.list_filter(|p| {
250+
p.details.status == PaymentStatus::Pending
251+
&& matches!(
252+
p.details.kind,
253+
PaymentKind::Onchain {
254+
status: ConfirmationStatus::Confirmed { .. },
255+
..
256+
}
257+
)
258+
});
259+
260+
for mut payment in pending_payments {
261+
if let PaymentKind::Onchain {
262+
status: ConfirmationStatus::Confirmed { height, .. },
263+
..
264+
} = payment.details.kind
265+
{
266+
let payment_id = payment.details.id;
267+
if new_tip.height >= height + ANTI_REORG_DELAY - 1 {
268+
payment.details.status = PaymentStatus::Succeeded;
269+
self.payment_store.insert_or_update(payment.details)?;
270+
self.pending_payment_store.remove(&payment_id)?;
271+
}
272+
}
273+
}
193274
},
194-
};
195-
196-
let payment = self.create_payment_from_tx(
197-
locked_wallet,
198-
txid,
199-
id,
200-
&wtx.tx_node.tx,
201-
payment_status,
202-
confirmation_status,
203-
);
275+
WalletEvent::TxUnconfirmed { txid, tx, old_block_time: None } => {
276+
let payment_id = self
277+
.find_payment_by_txid(txid)
278+
.unwrap_or_else(|| PaymentId(txid.to_byte_array()));
279+
280+
let payment = self.create_payment_from_tx(
281+
locked_wallet,
282+
txid,
283+
payment_id,
284+
&tx,
285+
PaymentStatus::Pending,
286+
ConfirmationStatus::Unconfirmed,
287+
);
288+
let pending_payment =
289+
self.create_pending_payment_from_tx(payment.clone(), Vec::new());
290+
self.payment_store.insert_or_update(payment)?;
291+
self.pending_payment_store.insert_or_update(pending_payment)?;
292+
},
293+
WalletEvent::TxReplaced { txid, conflicts, tx, .. } => {
294+
let payment_id = self
295+
.find_payment_by_txid(txid)
296+
.unwrap_or_else(|| PaymentId(txid.to_byte_array()));
297+
298+
// Collect all conflict txids
299+
let conflict_txids: Vec<Txid> =
300+
conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect();
301+
302+
let payment = self.create_payment_from_tx(
303+
locked_wallet,
304+
txid,
305+
payment_id,
306+
&tx,
307+
PaymentStatus::Pending,
308+
ConfirmationStatus::Unconfirmed,
309+
);
310+
let pending_payment_details = self
311+
.create_pending_payment_from_tx(payment.clone(), conflict_txids.clone());
204312

205-
self.payment_store.insert_or_update(payment)?;
313+
self.pending_payment_store.insert_or_update(pending_payment_details)?;
314+
},
315+
WalletEvent::TxDropped { txid, tx } => {
316+
let payment_id = self
317+
.find_payment_by_txid(txid)
318+
.unwrap_or_else(|| PaymentId(txid.to_byte_array()));
319+
let payment = self.create_payment_from_tx(
320+
locked_wallet,
321+
txid,
322+
payment_id,
323+
&tx,
324+
PaymentStatus::Pending,
325+
ConfirmationStatus::Unconfirmed,
326+
);
327+
let pending_payment =
328+
self.create_pending_payment_from_tx(payment.clone(), Vec::new());
329+
self.payment_store.insert_or_update(payment)?;
330+
self.pending_payment_store.insert_or_update(pending_payment)?;
331+
},
332+
_ => {
333+
continue;
334+
},
335+
};
206336
}
207337

208338
Ok(())
@@ -793,27 +923,60 @@ impl Wallet {
793923
// here to determine the `PaymentKind`, but that's not really satisfactory, so
794924
// we're punting on it until we can come up with a better solution.
795925

796-
let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status };
926+
let kind = PaymentKind::Onchain { txid, status: confirmation_status };
797927

798928
let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO);
799929
let (sent, received) = locked_wallet.sent_and_received(tx);
930+
let fee_sat = fee.to_sat();
931+
800932
let (direction, amount_msat) = if sent > received {
801-
let direction = PaymentDirection::Outbound;
802-
let amount_msat = Some(
803-
sent.to_sat().saturating_sub(fee.to_sat()).saturating_sub(received.to_sat()) * 1000,
804-
);
805-
(direction, amount_msat)
933+
(
934+
PaymentDirection::Outbound,
935+
Some(
936+
(sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat()))
937+
* 1000,
938+
),
939+
)
806940
} else {
807-
let direction = PaymentDirection::Inbound;
808-
let amount_msat = Some(
809-
received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee.to_sat())) * 1000,
810-
);
811-
(direction, amount_msat)
941+
(
942+
PaymentDirection::Inbound,
943+
Some(
944+
received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000,
945+
),
946+
)
812947
};
813948

814-
let fee_paid_msat = Some(fee.to_sat() * 1000);
949+
PaymentDetails::new(
950+
payment_id,
951+
kind,
952+
amount_msat,
953+
Some(fee_sat * 1000),
954+
direction,
955+
payment_status,
956+
)
957+
}
958+
959+
fn create_pending_payment_from_tx(
960+
&self, payment: PaymentDetails, conflicting_txids: Vec<Txid>,
961+
) -> PendingPaymentDetails {
962+
PendingPaymentDetails::new(payment, conflicting_txids)
963+
}
964+
965+
fn find_payment_by_txid(&self, target_txid: Txid) -> Option<PaymentId> {
966+
let direct_payment_id = PaymentId(target_txid.to_byte_array());
967+
if self.pending_payment_store.contains_key(&direct_payment_id) {
968+
return Some(direct_payment_id);
969+
}
970+
971+
if let Some(replaced_details) = self
972+
.pending_payment_store
973+
.list_filter(|p| p.conflicting_txids.contains(&target_txid))
974+
.first()
975+
{
976+
return Some(replaced_details.details.id);
977+
}
815978

816-
PaymentDetails::new(payment_id, kind, amount_msat, fee_paid_msat, direction, payment_status)
979+
None
817980
}
818981
}
819982

@@ -843,9 +1006,9 @@ impl Listen for Wallet {
8431006
);
8441007
}
8451008

846-
match locked_wallet.apply_block(block, height) {
847-
Ok(()) => {
848-
if let Err(e) = self.update_payment_store(&mut *locked_wallet) {
1009+
match locked_wallet.apply_block_events(block, height) {
1010+
Ok(events) => {
1011+
if let Err(e) = self.update_payment_store(&mut *locked_wallet, events) {
8491012
log_error!(self.logger, "Failed to update payment store: {}", e);
8501013
return;
8511014
}

0 commit comments

Comments
 (0)