diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 6bd031379..501e4edd6 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,7 @@ dictionary Config { u64 probing_liquidity_limit_multiplier; AnchorChannelsConfig? anchor_channels_config; RouteParametersConfig? route_parameters; + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode; }; dictionary AnchorChannelsConfig { @@ -189,6 +190,11 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); + sequence list_forwarded_payments(); + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode(); + ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id); + sequence list_channel_forwarding_stats(); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -486,6 +492,11 @@ enum PaymentStatus { "Failed", }; +enum ForwardedPaymentTrackingMode { + "Detailed", + "Stats", +}; + dictionary LSPFeeLimits { u64? max_total_opening_fee_msat; u64? max_proportional_opening_fee_ppm_msat; @@ -507,6 +518,35 @@ dictionary PaymentDetails { u64 latest_update_timestamp; }; +dictionary ForwardedPaymentDetails { + ForwardedPaymentId id; + ChannelId prev_channel_id; + ChannelId next_channel_id; + UserChannelId? prev_user_channel_id; + UserChannelId? next_user_channel_id; + PublicKey? prev_node_id; + PublicKey? next_node_id; + u64? total_fee_earned_msat; + u64? skimmed_fee_msat; + boolean claim_from_onchain_tx; + u64? outbound_amount_forwarded_msat; + u64 forwarded_at_timestamp; +}; + +dictionary ChannelForwardingStats { + ChannelId channel_id; + PublicKey? counterparty_node_id; + u64 inbound_payments_forwarded; + u64 outbound_payments_forwarded; + u64 total_inbound_amount_msat; + u64 total_outbound_amount_msat; + u64 total_fee_earned_msat; + u64 total_skimmed_fee_msat; + u64 onchain_claims_count; + u64 first_forwarded_at_timestamp; + u64 last_forwarded_at_timestamp; +}; + dictionary RouteParametersConfig { u64? max_total_routing_fee_msat; u32 max_total_cltv_expiry_delta; @@ -894,6 +934,9 @@ typedef string OfferId; [Custom] typedef string PaymentId; +[Custom] +typedef string ForwardedPaymentId; + [Custom] typedef string PaymentHash; diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..de0eb527a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,13 +55,17 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, - read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, - read_scorer, write_node_metrics, + read_channel_forwarding_stats, read_event_queue, read_external_pathfinding_scores_from_cache, + read_forwarded_payments, read_network_graph, read_node_metrics, read_output_sweeper, + read_payments, read_peer_info, read_pending_payments, read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -75,9 +79,10 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager, DynStore, + DynStoreWrapper, ForwardedPaymentStore, GossipSync, Graph, KeysManager, MessageRouter, + OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1060,14 +1065,21 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) - ) - }); + let ( + payment_store_res, + forwarded_payment_store_res, + channel_forwarding_stats_res, + node_metris_res, + pending_payment_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_forwarded_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_channel_forwarding_stats(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1096,6 +1108,34 @@ fn build_with_store_internal( }, }; + let forwarded_payment_store = match forwarded_payment_store_res { + Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new( + forwarded_payments, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read forwarded payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_forwarding_stats_store = match channel_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelForwardingStatsStore::new( + stats, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1782,6 +1822,8 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, is_running, node_metrics, om_mailbox, diff --git a/src/config.rs b/src/config.rs index 103b74657..d38542273 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,24 @@ use lightning::util::config::{ use crate::logger::LogLevel; +/// The mode used for tracking forwarded payments. +/// +/// This determines how much detail is stored about payment forwarding activity. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum ForwardedPaymentTrackingMode { + /// Store every individual forwarded payment AND track per-channel aggregate statistics. + /// + /// Use this when you need full history of forwarded payments for accounting, debugging, + /// or detailed analytics. + Detailed, + /// Track only per-channel aggregate statistics without storing individual payment records. + /// + /// This is the default mode. Use this to reduce storage requirements when you only need + /// aggregate metrics like total fees earned per channel. + #[default] + Stats, +} + // Config defaults const DEFAULT_NETWORK: Network = Network::Bitcoin; const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; @@ -127,9 +145,10 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; /// | `probing_liquidity_limit_multiplier` | 3 | /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | -/// | `route_parameters` | None | +/// | `route_parameters` | None | +/// | `forwarded_payment_tracking_mode` | Detailed | /// -/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their +/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and [`ForwardedPaymentTrackingMode`] for more information regarding their /// respective default values. /// /// [`Node`]: crate::Node @@ -192,6 +211,10 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub route_parameters: Option, + /// The mode used for tracking forwarded payments. + /// + /// See [`ForwardedPaymentTrackingMode`] for more information on the available modes. + pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode, } impl Default for Config { @@ -206,6 +229,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), route_parameters: None, node_alias: None, + forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(), } } } diff --git a/src/event.rs b/src/event.rs index 6f0ed8e09..6f7b8914c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,6 +10,7 @@ use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -32,7 +33,7 @@ use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use rand::{rng, Rng}; -use crate::config::{may_announce_channel, Config}; +use crate::config::{may_announce_channel, Config, ForwardedPaymentTrackingMode}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; @@ -45,10 +46,14 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, + PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::runtime::Runtime; -use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet}; +use crate::types::{ + ChannelForwardingStatsStore, CustomTlvRecord, DynStore, ForwardedPaymentStore, OnionMessenger, + PaymentStore, Sweeper, Wallet, +}; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -487,6 +492,8 @@ where network_graph: Arc, liquidity_source: Option>>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, runtime: Arc, logger: L, @@ -506,10 +513,11 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc, peer_store: Arc>, - static_invoice_store: Option, onion_messenger: Arc, - om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + payment_store: Arc, forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + peer_store: Arc>, static_invoice_store: Option, + onion_messenger: Arc, om_mailbox: Option>, + runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -521,6 +529,8 @@ where network_graph, liquidity_source, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, peer_store, logger, runtime, @@ -1364,9 +1374,103 @@ where .await; } + let prev_channel_id_value = prev_channel_id + .expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."); + let next_channel_id_value = next_channel_id + .expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."); + + let forwarded_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH") + .as_secs(); + + // Calculate inbound amount (outbound + fee) + let inbound_amount_msat = outbound_amount_forwarded_msat + .unwrap_or(0) + .saturating_add(total_fee_earned_msat.unwrap_or(0)); + + // Update per-channel forwarding stats for the inbound channel (prev_channel) + // For new entries, this becomes the initial value; for existing entries, + // these values are used as increments via the to_update() -> update() pattern. + let inbound_stats = ChannelForwardingStats { + channel_id: prev_channel_id_value, + counterparty_node_id: prev_node_id, + inbound_payments_forwarded: 1, + outbound_payments_forwarded: 0, + total_inbound_amount_msat: inbound_amount_msat, + total_outbound_amount_msat: 0, + total_fee_earned_msat: total_fee_earned_msat.unwrap_or(0), + total_skimmed_fee_msat: skimmed_fee_msat.unwrap_or(0), + onchain_claims_count: if claim_from_onchain_tx { 1 } else { 0 }, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store + .insert_or_update(inbound_stats) + .map_err(|e| { + log_error!( + self.logger, + "Failed to update inbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + + // Update per-channel forwarding stats for the outbound channel (next_channel) + let outbound_stats = ChannelForwardingStats { + channel_id: next_channel_id_value, + counterparty_node_id: next_node_id, + inbound_payments_forwarded: 0, + outbound_payments_forwarded: 1, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: outbound_amount_forwarded_msat.unwrap_or(0), + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store + .insert_or_update(outbound_stats) + .map_err(|e| { + log_error!( + self.logger, + "Failed to update outbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + + // Only store individual forwarded payment details in Detailed mode + if self.config.forwarded_payment_tracking_mode + == ForwardedPaymentTrackingMode::Detailed + { + // PaymentForwarded does not have a unique id, so we generate a random one here. + let mut id_bytes = [0u8; 32]; + rng().fill(&mut id_bytes); + + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(id_bytes), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, + prev_user_channel_id: prev_user_channel_id.map(UserChannelId), + next_user_channel_id: next_user_channel_id.map(UserChannelId), + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + + self.forwarded_payment_store.insert(forwarded_payment).map_err(|e| { + log_error!(self.logger, "Failed to store forwarded payment: {e}"); + ReplayEvent() + })?; + } + let event = Event::PaymentForwarded { - prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."), - next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, prev_user_channel_id: prev_user_channel_id.map(UserChannelId), next_user_channel_id: next_user_channel_id.map(UserChannelId), prev_node_id, diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 2a349a967..84b1d5577 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -54,9 +54,11 @@ pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, Nod pub use crate::liquidity::{LSPS1OrderStatus, LSPS2ServiceConfig}; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ConfirmationStatus, ForwardedPaymentId, LSPFeeLimits, PaymentDirection, + PaymentKind, PaymentStatus, }; -pub use crate::payment::UnifiedPaymentResult; +pub use crate::payment::{ForwardedPaymentDetails, UnifiedPaymentResult}; +pub use crate::config::ForwardedPaymentTrackingMode; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; impl UniffiCustomTypeConverter for PublicKey { @@ -722,6 +724,24 @@ impl UniffiCustomTypeConverter for PaymentId { } } +impl UniffiCustomTypeConverter for ForwardedPaymentId { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(bytes_vec) = hex_utils::to_vec(&val) { + let bytes_res = bytes_vec.try_into(); + if let Ok(bytes) = bytes_res { + return Ok(ForwardedPaymentId(bytes)); + } + } + Err(Error::InvalidPaymentId.into()) + } + + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&obj.0) + } +} + impl UniffiCustomTypeConverter for PaymentHash { type Builtin = String; diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..d6c4d0c68 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -27,6 +27,15 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The forwarded payment information will be persisted under this prefix. +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_forwarding_stats"; +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index d2f70377b..485560bdb 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -39,6 +39,10 @@ use rand::rngs::OsRng; use rand::TryRngCore; use super::*; +use crate::io::{ + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::chain::ChainSource; use crate::config::WALLET_KEYS_SEED_LEN; use crate::fee_estimator::OnchainFeeEstimator; @@ -46,7 +50,7 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::payment::PendingPaymentDetails; +use crate::payment::{ChannelForwardingStats, ForwardedPaymentDetails, PendingPaymentDetails}; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -223,21 +227,17 @@ where }) } -/// Read previously persisted payments information from the store. -pub(crate) async fn read_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> +/// Generic helper to read persisted items from a KV store namespace. +async fn read_objects_from_store( + kv_store: &DynStore, logger: L, primary_namespace: &str, secondary_namespace: &str, +) -> Result, std::io::Error> where + T: Readable, L::Target: LdkLogger, { - let mut res = Vec::new(); + let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; - let mut stored_keys = KVStore::list( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; + let mut res = Vec::with_capacity(stored_keys.len()); const BATCH_SIZE: usize = 50; @@ -246,52 +246,44 @@ where // Fill JoinSet with tasks if possible while set.len() < BATCH_SIZE && !stored_keys.is_empty() { if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } } + let type_name = std::any::type_name::(); + while let Some(read_res) = set.join_next().await { // Exit early if we get an IO error. let reader = read_res .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })? .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })?; // Refill set for every finished future, if we still have something to do. if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } // Handle result. - let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + let item = T::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize {type_name}: {e}"); std::io::Error::new( std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", + format!("Failed to deserialize {type_name}"), ) })?; - res.push(payment); + res.push(item); } debug_assert!(set.is_empty()); @@ -300,6 +292,54 @@ where Ok(res) } +/// Read previously persisted payments information from the store. +pub(crate) async fn read_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted forwarded payments information from the store. +pub(crate) async fn read_forwarded_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted channel forwarding stats from the store. +pub(crate) async fn read_channel_forwarding_stats( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, diff --git a/src/lib.rs b/src/lib.rs index d2222d949..1322e099d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,7 +124,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, - NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + ForwardedPaymentTrackingMode, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -142,6 +143,7 @@ use lightning::events::bump_transaction::{Input, Wallet as LdkWallet}; use lightning::impl_writeable_tlv_based; use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; use lightning::ln::channel_state::{ChannelDetails as LdkChannelDetails, ChannelShutdownState}; +use lightning::ln::types::ChannelId; use lightning::ln::channelmanager::PaymentId; use lightning::ln::funding::SpliceContribution; use lightning::ln::msgs::SocketAddress; @@ -153,16 +155,16 @@ use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, ChannelForwardingStats, ForwardedPaymentDetails, + ForwardedPaymentId, OnchainPayment, PaymentDetails, SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelForwardingStatsStore, + ChannelManager, DynStore, ForwardedPaymentStore, Graph, HRNResolver, KeysManager, + OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use { @@ -222,6 +224,8 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, is_running: Arc>, node_metrics: Arc>, om_mailbox: Option>, @@ -573,6 +577,8 @@ impl Node { Arc::clone(&self.network_graph), self.liquidity_source.clone(), Arc::clone(&self.payment_store), + Arc::clone(&self.forwarded_payment_store), + Arc::clone(&self.channel_forwarding_stats_store), Arc::clone(&self.peer_store), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1692,6 +1698,72 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieve the details of a specific forwarded payment with the given id. + /// + /// Returns `Some` if the forwarded payment was known and `None` otherwise. + pub fn forwarded_payment( + &self, forwarded_payment_id: &ForwardedPaymentId, + ) -> Option { + self.forwarded_payment_store.get(forwarded_payment_id) + } + + /// Retrieves all forwarded payments that match the given predicate. + /// + /// For example, to list all forwarded payments that earned at least 1000 msat in fees: + /// ```ignore + /// node.list_forwarded_payments_with_filter(|p| { + /// p.total_fee_earned_msat.unwrap_or(0) >= 1000 + /// }); + /// ``` + pub fn list_forwarded_payments_with_filter bool>( + &self, f: F, + ) -> Vec { + self.forwarded_payment_store.list_filter(f) + } + + /// Retrieves all forwarded payments. + /// + /// **Note:** In [`ForwardedPaymentTrackingMode::Stats`] mode, this will return an empty vector + /// since individual payment records are not stored in that mode. + /// + /// [`ForwardedPaymentTrackingMode::Stats`]: crate::config::ForwardedPaymentTrackingMode::Stats + pub fn list_forwarded_payments(&self) -> Vec { + self.forwarded_payment_store.list_filter(|_| true) + } + + /// Returns the configured forwarded payment tracking mode. + pub fn forwarded_payment_tracking_mode(&self) -> ForwardedPaymentTrackingMode { + self.config.forwarded_payment_tracking_mode + } + + /// Retrieve the forwarding statistics for a specific channel. + /// + /// Returns `Some` if statistics exist for the given channel and `None` otherwise. + pub fn channel_forwarding_stats( + &self, channel_id: &ChannelId, + ) -> Option { + self.channel_forwarding_stats_store.get(channel_id) + } + + /// Retrieves all channel forwarding statistics. + pub fn list_channel_forwarding_stats(&self) -> Vec { + self.channel_forwarding_stats_store.list_filter(|_| true) + } + + /// Retrieves all channel forwarding statistics that match the given predicate. + /// + /// For example, to list stats for all channels that have earned at least 10000 msat in fees: + /// ```ignore + /// node.list_channel_forwarding_stats_with_filter(|s| { + /// s.total_fee_earned_msat >= 10000 + /// }); + /// ``` + pub fn list_channel_forwarding_stats_with_filter bool>( + &self, f: F, + ) -> Vec { + self.channel_forwarding_stats_store.list_filter(f) + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 42b5aff3b..28b4bb31e 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -22,6 +22,7 @@ pub use onchain::OnchainPayment; pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ConfirmationStatus, ForwardedPaymentDetails, ForwardedPaymentId, + LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..2ea81f92c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -7,9 +7,11 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Txid}; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; +use lightning::ln::types::ChannelId; use lightning::offers::offer::OfferId; use lightning::util::ser::{Readable, Writeable}; use lightning::{ @@ -21,6 +23,7 @@ use lightning_types::string::UntrustedString; use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; use crate::hex_utils; +use crate::types::UserChannelId; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -762,3 +765,289 @@ mod tests { } } } + +/// A unique identifier for a forwarded payment. +/// +/// This will be a randomly generated 32-byte identifier. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ForwardedPaymentId(pub [u8; 32]); + +impl StorableObjectId for ForwardedPaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl Writeable for ForwardedPaymentId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + self.0.write(writer) + } +} + +impl Readable for ForwardedPaymentId { + fn read(reader: &mut R) -> Result { + Ok(Self(Readable::read(reader)?)) + } +} + +/// Details of a payment that has been forwarded through this node. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ForwardedPaymentDetails { + /// A unique identifier for this forwarded payment. + pub id: ForwardedPaymentId, + /// The channel id of the incoming channel between the previous node and us. + pub prev_channel_id: ChannelId, + /// The channel id of the outgoing channel between the next node and us. + pub next_channel_id: ChannelId, + /// The `user_channel_id` of the incoming channel between the previous node and us. + /// + /// This is only None for events generated or serialized by versions prior to 0.3.0. + pub prev_user_channel_id: Option, + /// The `user_channel_id` of the outgoing channel between the next node and us. + /// + /// This will be `None` if the payment was settled via an on-chain transaction or if the + /// event was generated or serialized by versions prior to 0.3.0. + pub next_user_channel_id: Option, + /// The node id of the previous node. + pub prev_node_id: Option, + /// The node id of the next node. + pub next_node_id: Option, + /// The total fee, in milli-satoshis, which was earned as a result of the payment. + /// + /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC + /// was pending, the amount the next hop claimed will have been rounded down to the nearest + /// whole satoshi. Thus, the fee calculated here may be higher than expected as we still + /// claimed the full value in millisatoshis from the source. + /// + /// If the channel which sent us the payment has been force-closed, we will claim the funds + /// via an on-chain transaction. In that case we do not yet know the on-chain transaction + /// fees which we will spend and will instead set this to `None`. It is possible duplicate + /// `PaymentForwarded` events are generated for the same payment iff `total_fee_earned_msat` + /// is `None`. + pub total_fee_earned_msat: Option, + /// The share of the total fee, in milli-satoshis, which was withheld in addition to the + /// forwarding fee. + /// + /// This will be `None` if no fee was skimmed from the forwarded HTLC. + pub skimmed_fee_msat: Option, + /// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain + /// transaction. + pub claim_from_onchain_tx: bool, + /// The final amount forwarded, in milli-satoshis, after the fee is deducted. + /// + /// The caveat described above the total_fee_earned_msat field applies here as well. + pub outbound_amount_forwarded_msat: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the payment was forwarded. + pub forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ForwardedPaymentDetails, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_user_channel_id, option), + (8, next_user_channel_id, option), + (10, prev_node_id, option), + (12, next_node_id, option), + (14, total_fee_earned_msat, option), + (16, skimmed_fee_msat, option), + (18, claim_from_onchain_tx, required), + (20, outbound_amount_forwarded_msat, option), + (22, forwarded_at_timestamp, required), +}); + +/// A no-op update type for [`ForwardedPaymentDetails`]. +/// +/// Forwarded payments are immutable once stored, so updates are not supported. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct ForwardedPaymentDetailsUpdate { + id: ForwardedPaymentId, +} + +impl StorableObjectUpdate for ForwardedPaymentDetailsUpdate { + fn id(&self) -> ForwardedPaymentId { + self.id + } +} + +impl StorableObject for ForwardedPaymentDetails { + type Id = ForwardedPaymentId; + type Update = ForwardedPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, _update: &Self::Update) -> bool { + // Forwarded payments are immutable, so updates are no-ops. + false + } + + fn to_update(&self) -> Self::Update { + ForwardedPaymentDetailsUpdate { id: self.id } + } +} + +/// Aggregate statistics for forwarded payments through a single channel. +/// +/// Each channel has one stats entry tracking all forwards where it was either +/// the inbound or outbound channel. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelForwardingStats { + /// The channel this stats entry tracks. + pub channel_id: ChannelId, + /// The counterparty node id for this channel. + pub counterparty_node_id: Option, + /// Number of payments forwarded where this was the inbound channel. + pub inbound_payments_forwarded: u64, + /// Number of payments forwarded where this was the outbound channel. + pub outbound_payments_forwarded: u64, + /// Total amount received on this channel for forwarding (msat). + pub total_inbound_amount_msat: u64, + /// Total amount sent on this channel for forwarding (msat). + pub total_outbound_amount_msat: u64, + /// Total fees earned from forwards where this was the inbound channel (msat). + pub total_fee_earned_msat: u64, + /// Total skimmed fees (msat). + pub total_skimmed_fee_msat: u64, + /// Number of forwards claimed via onchain tx. + pub onchain_claims_count: u64, + /// Timestamp of first forward through this channel. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of most recent forward through this channel. + pub last_forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelForwardingStats, { + (0, channel_id, required), + (2, counterparty_node_id, option), + (4, inbound_payments_forwarded, required), + (6, outbound_payments_forwarded, required), + (8, total_inbound_amount_msat, required), + (10, total_outbound_amount_msat, required), + (12, total_fee_earned_msat, required), + (14, total_skimmed_fee_msat, required), + (16, onchain_claims_count, required), + (18, first_forwarded_at_timestamp, required), + (20, last_forwarded_at_timestamp, required), +}); + +/// Update type for [`ChannelForwardingStats`] that supports incrementing counters. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelForwardingStatsUpdate { + /// The channel ID being updated. + pub channel_id: ChannelId, + /// The counterparty node id (used when creating new entry). + pub counterparty_node_id: Option, + /// Increment for inbound payment count. + pub inbound_payments_increment: u64, + /// Increment for outbound payment count. + pub outbound_payments_increment: u64, + /// Increment for total inbound amount. + pub inbound_amount_increment_msat: u64, + /// Increment for total outbound amount. + pub outbound_amount_increment_msat: u64, + /// Increment for total fee earned. + pub fee_earned_increment_msat: u64, + /// Increment for skimmed fee. + pub skimmed_fee_increment_msat: u64, + /// Increment for onchain claims count. + pub onchain_claims_increment: u64, + /// Current timestamp for updating first/last timestamps. + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelForwardingStatsUpdate { + fn id(&self) -> ChannelId { + self.channel_id + } +} + +impl StorableObjectId for ChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl StorableObject for ChannelForwardingStats { + type Id = ChannelId; + type Update = ChannelForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.channel_id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.channel_id, update.channel_id, + "We should only ever update stats for the same channel id" + ); + + let mut updated = false; + + // Update counterparty if not already set + if self.counterparty_node_id.is_none() && update.counterparty_node_id.is_some() { + self.counterparty_node_id = update.counterparty_node_id; + updated = true; + } + + // Increment counters + if update.inbound_payments_increment > 0 { + self.inbound_payments_forwarded += update.inbound_payments_increment; + updated = true; + } + if update.outbound_payments_increment > 0 { + self.outbound_payments_forwarded += update.outbound_payments_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + + // Update timestamps + if updated { + if self.first_forwarded_at_timestamp == 0 { + self.first_forwarded_at_timestamp = update.timestamp; + } + self.last_forwarded_at_timestamp = update.timestamp; + } + + updated + } + + fn to_update(&self) -> Self::Update { + // This creates an update representing the current state as increments. + // This is primarily used for insert_or_update behavior. + ChannelForwardingStatsUpdate { + channel_id: self.channel_id, + counterparty_node_id: self.counterparty_node_id, + inbound_payments_increment: self.inbound_payments_forwarded, + outbound_payments_increment: self.outbound_payments_forwarded, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..178d1b1c1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,6 +39,7 @@ use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::store::{ChannelForwardingStats, ForwardedPaymentDetails}; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -320,6 +321,8 @@ pub(crate) type BumpTransactionEventHandler = >; pub(crate) type PaymentStore = DataStore>; +pub(crate) type ForwardedPaymentStore = DataStore>; +pub(crate) type ChannelForwardingStatsStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. ///