diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index ff499d049d4..1308901d56f 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -48,13 +48,6 @@ use crate::prelude::*; use crate::sync::{Arc, Mutex}; use bitcoin::hashes::Hash; -fn get_latest_mon_update_id<'a, 'b, 'c>( - node: &Node<'a, 'b, 'c>, channel_id: ChannelId, -) -> (u64, u64) { - let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap(); - monitor_id_state.get(&channel_id).unwrap().clone() -} - #[test] fn test_monitor_and_persister_update_fail() { // Test that if both updating the `ChannelMonitor` and persisting the updated diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 042b388e1a1..ba74e507638 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -50,8 +50,8 @@ use crate::ln::channel_state::{ OutboundHTLCDetails, OutboundHTLCStateDetails, }; use crate::ln::channelmanager::{ - self, ChannelReadyOrder, FundingConfirmedMessage, HTLCFailureMsg, HTLCSource, - OpenChannelMessage, PaymentClaimDetails, PendingHTLCInfo, PendingHTLCStatus, + self, ChannelReadyOrder, FundingConfirmedMessage, HTLCFailureMsg, HTLCPreviousHopData, + HTLCSource, OpenChannelMessage, PaymentClaimDetails, PendingHTLCInfo, PendingHTLCStatus, RAACommitmentOrder, SentHTLCId, BREAKDOWN_TIMEOUT, MAX_LOCAL_BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, }; @@ -85,6 +85,7 @@ use crate::util::errors::APIError; use crate::util::logger::{Logger, Record, WithContext}; use crate::util::scid_utils::{block_from_scid, scid_from_parts}; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, Writeable, Writer}; +use crate::{impl_readable_for_vec, impl_writeable_for_vec}; use alloc::collections::{btree_map, BTreeMap}; @@ -216,7 +217,7 @@ enum InboundHTLCState { /// Used to rebuild `ChannelManager` HTLC state on restart. Previously the manager would track /// and persist all HTLC forwards and receives itself, but newer LDK versions avoid relying on /// its persistence and instead reconstruct state based on `Channel` and `ChannelMonitor` data. - update_add_htlc_opt: Option, + update_add_htlc: InboundUpdateAdd, }, /// Removed by us and a new commitment_signed was sent (if we were AwaitingRemoteRevoke when we /// created it we would have put it in the holding cell instead). When they next revoke_and_ack @@ -307,6 +308,47 @@ impl InboundHTLCState { } } +/// A field of `InboundHTLCState::Committed` containing the HTLC's `update_add_htlc` message. If +/// the HTLC is a forward and gets irrevocably committed to the outbound edge, we convert to +/// `InboundUpdateAdd::Forwarded`, thus pruning the onion and not persisting it on every +/// `ChannelManager` persist. +/// +/// Useful for reconstructing the pending HTLC set on startup. +#[derive(Debug, Clone)] +pub(super) enum InboundUpdateAdd { + /// The inbound committed HTLC's update_add_htlc message. + WithOnion { update_add_htlc: msgs::UpdateAddHTLC }, + /// This inbound HTLC is a forward that was irrevocably committed to the outbound edge, allowing + /// its onion to be pruned and no longer persisted. + Forwarded { + /// Useful if we need to fail or claim this HTLC backwards after restart, if it's missing in the + /// outbound edge. + hop_data: HTLCPreviousHopData, + /// Useful if we need to claim this HTLC backwards after a restart and it's missing in the + /// outbound edge, to generate an accurate [`Event::PaymentForwarded`]. + /// + /// [`Event::PaymentForwarded`]: crate::events::Event::PaymentForwarded + outbound_amt_msat: u64, + }, + /// This HTLC was received pre-LDK 0.3, before we started persisting the onion for inbound + /// committed HTLCs. + Legacy, +} + +impl_writeable_tlv_based_enum_upgradable!(InboundUpdateAdd, + (0, WithOnion) => { + (0, update_add_htlc, required), + }, + (2, Forwarded) => { + (0, hop_data, required), + (2, outbound_amt_msat, required), + }, + (4, Legacy) => {}, +); + +impl_writeable_for_vec!(&InboundUpdateAdd); +impl_readable_for_vec!(InboundUpdateAdd); + #[cfg_attr(test, derive(Debug))] struct InboundHTLCOutput { htlc_id: u64, @@ -1137,17 +1179,24 @@ pub enum UpdateFulfillCommitFetch { /// The return value of `monitor_updating_restored` pub(super) struct MonitorRestoreUpdates { pub raa: Option, + /// A `CommitmentUpdate` to be sent to our channel peer. pub commitment_update: Option, pub commitment_order: RAACommitmentOrder, pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>, pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, pub finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, + /// Inbound update_adds that are now irrevocably committed to this channel and are ready for the + /// onion to be processed in order to forward or receive the HTLC. pub pending_update_adds: Vec, pub funding_broadcastable: Option, pub channel_ready: Option, pub channel_ready_order: ChannelReadyOrder, pub announcement_sigs: Option, pub tx_signatures: Option, + /// The sources of outbound HTLCs that were forwarded and irrevocably committed on this channel + /// (the outbound edge), along with their outbound amounts. Useful to store in the inbound HTLC + /// to ensure it gets resolved. + pub committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, } /// The return value of `signer_maybe_unblocked` @@ -7813,19 +7862,94 @@ where } /// Useful for reconstructing the set of pending HTLCs when deserializing the `ChannelManager`. - pub(super) fn get_inbound_committed_update_adds(&self) -> Vec { + pub(super) fn inbound_committed_unresolved_htlcs( + &self, + ) -> Vec<(PaymentHash, InboundUpdateAdd)> { + // We don't want to return an HTLC as needing processing if it already has a resolution that's + // pending in the holding cell. + let htlc_resolution_in_holding_cell = |id: u64| -> bool { + self.context.holding_cell_htlc_updates.iter().any(|holding_cell_htlc| { + match holding_cell_htlc { + HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => *htlc_id == id, + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, .. } => *htlc_id == id, + HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, .. } => *htlc_id == id, + HTLCUpdateAwaitingACK::AddHTLC { .. } => false, + } + }) + }; + self.context .pending_inbound_htlcs .iter() - .filter_map(|htlc| match htlc.state { - InboundHTLCState::Committed { ref update_add_htlc_opt } => { - update_add_htlc_opt.clone() + .filter_map(|htlc| match &htlc.state { + InboundHTLCState::Committed { update_add_htlc } => { + if htlc_resolution_in_holding_cell(htlc.htlc_id) { + return None; + } + Some((htlc.payment_hash, update_add_htlc.clone())) }, _ => None, }) .collect() } + /// Useful when reconstructing the set of pending HTLC forwards when deserializing the + /// `ChannelManager`. We don't want to cache an HTLC as needing to be forwarded if it's already + /// present in the outbound edge, or else we'll double-forward. + pub(super) fn outbound_htlc_forwards(&self) -> impl Iterator + '_ { + let holding_cell_outbounds = + self.context.holding_cell_htlc_updates.iter().filter_map(|htlc| match htlc { + HTLCUpdateAwaitingACK::AddHTLC { source, .. } => match source { + HTLCSource::PreviousHopData(prev_hop_data) => Some(prev_hop_data.clone()), + _ => None, + }, + _ => None, + }); + let committed_outbounds = + self.context.pending_outbound_htlcs.iter().filter_map(|htlc| match &htlc.source { + HTLCSource::PreviousHopData(prev_hop_data) => Some(prev_hop_data.clone()), + _ => None, + }); + holding_cell_outbounds.chain(committed_outbounds) + } + + #[cfg(test)] + pub(super) fn test_holding_cell_outbound_htlc_forwards_count(&self) -> usize { + self.context + .holding_cell_htlc_updates + .iter() + .filter_map(|htlc| match htlc { + HTLCUpdateAwaitingACK::AddHTLC { source, .. } => match source { + HTLCSource::PreviousHopData(prev_hop_data) => Some(prev_hop_data.clone()), + _ => None, + }, + _ => None, + }) + .count() + } + + /// This inbound HTLC was irrevocably forwarded to the outbound edge, so we no longer need to + /// persist its onion. + pub(super) fn prune_inbound_htlc_onion( + &mut self, htlc_id: u64, hop_data: HTLCPreviousHopData, outbound_amt_msat: u64, + ) { + for htlc in self.context.pending_inbound_htlcs.iter_mut() { + if htlc.htlc_id == htlc_id { + if let InboundHTLCState::Committed { ref mut update_add_htlc } = htlc.state { + *update_add_htlc = InboundUpdateAdd::Forwarded { hop_data, outbound_amt_msat }; + return; + } + } + } + debug_assert!(false, "If we go to prune an inbound HTLC it should be present") + } + + /// Useful for testing crash scenarios where the holding cell is not persisted. + #[cfg(test)] + pub(super) fn test_clear_holding_cell(&mut self) { + self.context.holding_cell_htlc_updates.clear() + } + /// Marks an outbound HTLC which we have received update_fail/fulfill/malformed #[inline] fn mark_outbound_htlc_removed( @@ -8786,7 +8910,8 @@ where false }; if swap { - let mut state = InboundHTLCState::Committed { update_add_htlc_opt: None }; + let mut state = + InboundHTLCState::Committed { update_add_htlc: InboundUpdateAdd::Legacy }; mem::swap(&mut state, &mut htlc.state); if let InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution) = state { @@ -8827,9 +8952,8 @@ where to_forward_infos.push((forward_info, htlc.htlc_id)); htlc.state = InboundHTLCState::Committed { // HTLCs will only be in state `InboundHTLCResolution::Resolved` if they were - // received on an old pre-0.0.123 version of LDK. In this case, the HTLC is - // required to be resolved prior to upgrading to 0.1+ per CHANGELOG.md. - update_add_htlc_opt: None, + // received on LDK 0.1-. + update_add_htlc: InboundUpdateAdd::Legacy, }; }, } @@ -8838,7 +8962,9 @@ where log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed", &htlc.payment_hash); pending_update_adds.push(update_add_htlc.clone()); htlc.state = InboundHTLCState::Committed { - update_add_htlc_opt: Some(update_add_htlc), + update_add_htlc: InboundUpdateAdd::WithOnion { + update_add_htlc, + }, }; }, } @@ -9411,6 +9537,14 @@ where mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills); let mut pending_update_adds = Vec::new(); mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds); + let committed_outbound_htlc_sources = self.context.pending_outbound_htlcs.iter().filter_map(|htlc| { + if let &OutboundHTLCState::LocalAnnounced(_) = &htlc.state { + if let HTLCSource::PreviousHopData(prev_hop_data) = &htlc.source { + return Some((prev_hop_data.clone(), htlc.amount_msat)) + } + } + None + }).collect(); if self.context.channel_state.is_peer_disconnected() { self.context.monitor_pending_revoke_and_ack = false; @@ -9419,7 +9553,7 @@ where raa: None, commitment_update: None, commitment_order: RAACommitmentOrder::RevokeAndACKFirst, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs, tx_signatures: None, - channel_ready_order, + channel_ready_order, committed_outbound_htlc_sources }; } @@ -9450,7 +9584,7 @@ where MonitorRestoreUpdates { raa, commitment_update, commitment_order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs, tx_signatures, - channel_ready_order, + channel_ready_order, committed_outbound_htlc_sources } } @@ -14465,7 +14599,7 @@ impl Writeable for FundedChannel { } } let mut removed_htlc_attribution_data: Vec<&Option> = Vec::new(); - let mut inbound_committed_update_adds: Vec<&Option> = Vec::new(); + let mut inbound_committed_update_adds: Vec<&InboundUpdateAdd> = Vec::new(); (self.context.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?; for htlc in self.context.pending_inbound_htlcs.iter() { if let &InboundHTLCState::RemoteAnnounced(_) = &htlc.state { @@ -14485,9 +14619,9 @@ impl Writeable for FundedChannel { 2u8.write(writer)?; htlc_resolution.write(writer)?; }, - &InboundHTLCState::Committed { ref update_add_htlc_opt } => { + &InboundHTLCState::Committed { ref update_add_htlc } => { 3u8.write(writer)?; - inbound_committed_update_adds.push(update_add_htlc_opt); + inbound_committed_update_adds.push(update_add_htlc); }, &InboundHTLCState::LocalRemoved(ref removal_reason) => { 4u8.write(writer)?; @@ -14956,7 +15090,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> }; InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) }, - 3 => InboundHTLCState::Committed { update_add_htlc_opt: None }, + 3 => InboundHTLCState::Committed { update_add_htlc: InboundUpdateAdd::Legacy }, 4 => { let reason = match ::read(reader)? { 0 => InboundHTLCRemovalReason::FailRelay(msgs::OnionErrorPacket { @@ -15262,7 +15396,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> let mut pending_outbound_held_htlc_flags_opt: Option>> = None; let mut holding_cell_held_htlc_flags_opt: Option>> = None; - let mut inbound_committed_update_adds_opt: Option>> = None; + let mut inbound_committed_update_adds_opt: Option> = None; let mut holding_cell_accountable: Option> = None; let mut pending_outbound_accountable: Option> = None; @@ -15446,8 +15580,8 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> if let Some(update_adds) = inbound_committed_update_adds_opt { let mut iter = update_adds.into_iter(); for htlc in pending_inbound_htlcs.iter_mut() { - if let InboundHTLCState::Committed { ref mut update_add_htlc_opt } = htlc.state { - *update_add_htlc_opt = iter.next().ok_or(DecodeError::InvalidValue)?; + if let InboundHTLCState::Committed { ref mut update_add_htlc } = htlc.state { + *update_add_htlc = iter.next().ok_or(DecodeError::InvalidValue)?; } } if iter.next().is_some() { @@ -15814,16 +15948,17 @@ mod tests { use crate::chain::BestBlock; use crate::ln::chan_utils::{self, commit_tx_fee_sat, ChannelTransactionParameters}; use crate::ln::channel::{ - AwaitingChannelReadyFlags, ChannelState, FundedChannel, HTLCCandidate, HTLCInitiator, - HTLCUpdateAwaitingACK, InboundHTLCOutput, InboundHTLCState, InboundV1Channel, - OutboundHTLCOutput, OutboundHTLCState, OutboundV1Channel, + AwaitingChannelReadyFlags, ChannelId, ChannelState, FundedChannel, HTLCCandidate, + HTLCInitiator, HTLCUpdateAwaitingACK, InboundHTLCOutput, InboundHTLCState, + InboundUpdateAdd, InboundV1Channel, OutboundHTLCOutput, OutboundHTLCState, + OutboundV1Channel, }; use crate::ln::channel::{ MAX_FUNDING_SATOSHIS_NO_WUMBO, MIN_THEIR_CHAN_RESERVE_SATOSHIS, TOTAL_BITCOIN_SUPPLY_SATOSHIS, }; use crate::ln::channel_keys::{RevocationBasepoint, RevocationKey}; - use crate::ln::channelmanager::{self, HTLCSource, PaymentId}; + use crate::ln::channelmanager::{self, HTLCPreviousHopData, HTLCSource, PaymentId}; use crate::ln::funding::FundingTxInput; use crate::ln::msgs; use crate::ln::msgs::{ChannelUpdate, UnsignedChannelUpdate, MAX_VALUE_MSAT}; @@ -15847,6 +15982,7 @@ mod tests { use bitcoin::amount::Amount; use bitcoin::constants::ChainHash; use bitcoin::hashes::sha256::Hash as Sha256; + use bitcoin::hashes::sha256d::Hash as Sha256d; use bitcoin::hashes::Hash; use bitcoin::hex::FromHex; use bitcoin::locktime::absolute::LockTime; @@ -15856,9 +15992,27 @@ mod tests { use bitcoin::secp256k1::{ecdsa::Signature, Secp256k1}; use bitcoin::secp256k1::{PublicKey, SecretKey}; use bitcoin::transaction::{Transaction, TxOut, Version}; + use bitcoin::Txid; use bitcoin::{ScriptBuf, WPubkeyHash, WitnessProgram, WitnessVersion}; use std::cmp; + fn dummy_prev_hop_data() -> HTLCPreviousHopData { + let txid_hash = Sha256d::from_bytes_ref(&[0; 32]); + HTLCPreviousHopData { + prev_outbound_scid_alias: 0, + user_channel_id: None, + htlc_id: 0, + incoming_packet_shared_secret: [0; 32], + phantom_shared_secret: None, + trampoline_shared_secret: None, + blinded_failure: None, + channel_id: ChannelId([0; 32]), + outpoint: OutPoint { txid: Txid::from_raw_hash(*txid_hash), index: 0 }, + counterparty_node_id: None, + cltv_expiry: None, + } + } + #[test] #[rustfmt::skip] fn test_channel_state_order() { @@ -16061,7 +16215,7 @@ mod tests { amount_msat: htlc_amount_msat, payment_hash: PaymentHash(Sha256::hash(&[42; 32]).to_byte_array()), cltv_expiry: 300000000, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { update_add_htlc: InboundUpdateAdd::Forwarded { hop_data: dummy_prev_hop_data(), outbound_amt_msat: 0 } }, }); node_a_chan.context.pending_outbound_htlcs.push(OutboundHTLCOutput { @@ -16910,7 +17064,12 @@ mod tests { amount_msat: 1000000, cltv_expiry: 500, payment_hash: PaymentHash::from(payment_preimage_0), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); let payment_preimage_1 = @@ -16920,7 +17079,12 @@ mod tests { amount_msat: 2000000, cltv_expiry: 501, payment_hash: PaymentHash::from(payment_preimage_1), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); let payment_preimage_2 = @@ -16962,7 +17126,12 @@ mod tests { amount_msat: 4000000, cltv_expiry: 504, payment_hash: PaymentHash::from(payment_preimage_4), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); // commitment tx with all five HTLCs untrimmed (minimum feerate) @@ -17351,7 +17520,12 @@ mod tests { amount_msat: 2000000, cltv_expiry: 501, payment_hash: PaymentHash::from(payment_preimage_1), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); chan.context.pending_outbound_htlcs.clear(); @@ -17604,7 +17778,12 @@ mod tests { amount_msat: 5000000, cltv_expiry: 920150, payment_hash: PaymentHash::from(htlc_in_preimage), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, })); chan.context.pending_outbound_htlcs.extend( @@ -17668,7 +17847,12 @@ mod tests { amount_msat, cltv_expiry: 920150, payment_hash: PaymentHash::from(htlc_in_preimage), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -17735,7 +17919,12 @@ mod tests { amount_msat: 100000, cltv_expiry: 920125, payment_hash: htlc_0_in_hash, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); let htlc_1_in_preimage = @@ -17753,7 +17942,12 @@ mod tests { amount_msat: 49900000, cltv_expiry: 920125, payment_hash: htlc_1_in_hash, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); chan.context.pending_outbound_htlcs.extend( @@ -17806,7 +18000,12 @@ mod tests { amount_msat: 30000, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -17848,7 +18047,12 @@ mod tests { amount_msat: 29525, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -17886,7 +18090,12 @@ mod tests { amount_msat: 29525, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -17924,7 +18133,12 @@ mod tests { amount_msat: 29753, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -17977,7 +18191,12 @@ mod tests { amount_msat, cltv_expiry, payment_hash, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }), ); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d32a0347316..1a3ea03c475 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -58,9 +58,9 @@ use crate::ln::chan_utils::selected_commitment_sat_per_1000_weight; use crate::ln::channel::QuiescentAction; use crate::ln::channel::{ self, hold_time_since, Channel, ChannelError, ChannelUpdateStatus, DisconnectResult, - FundedChannel, FundingTxSigned, InboundV1Channel, OutboundV1Channel, PendingV2Channel, - ReconnectionMsg, ShutdownResult, SpliceFundingFailed, StfuResponse, UpdateFulfillCommitFetch, - WithChannelContext, + FundedChannel, FundingTxSigned, InboundUpdateAdd, InboundV1Channel, OutboundV1Channel, + PendingV2Channel, ReconnectionMsg, ShutdownResult, SpliceFundingFailed, StfuResponse, + UpdateFulfillCommitFetch, WithChannelContext, }; use crate::ln::channel_state::ChannelDetails; use crate::ln::funding::SpliceContribution; @@ -1407,6 +1407,7 @@ enum PostMonitorUpdateChanResume { decode_update_add_htlcs: Option<(u64, Vec)>, finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, + committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, }, } @@ -9553,6 +9554,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs: Option<(u64, Vec)>, finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, + committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, ) { // If the channel belongs to a batch funding transaction, the progress of the batch // should be updated as we have received funding_signed and persisted the monitor. @@ -9618,6 +9620,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }; self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver, None); } + self.prune_persisted_inbound_htlc_onions(committed_outbound_htlc_sources); } fn handle_monitor_update_completion_actions< @@ -10092,10 +10095,72 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs, finalized_claimed_htlcs: updates.finalized_claimed_htlcs, failed_htlcs: updates.failed_htlcs, + committed_outbound_htlc_sources: updates.committed_outbound_htlc_sources, } } } + /// We store inbound committed HTLCs' onions in `Channel`s for use in reconstructing the pending + /// HTLC set on `ChannelManager` read. If an HTLC has been irrevocably forwarded to the outbound + /// edge, we no longer need to persist the inbound edge's onion and can prune it here. + fn prune_persisted_inbound_htlc_onions( + &self, committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, + ) { + let per_peer_state = self.per_peer_state.read().unwrap(); + for (source, outbound_amt_msat) in committed_outbound_htlc_sources { + let counterparty_node_id = match source.counterparty_node_id.as_ref() { + Some(id) => id, + None => continue, + }; + let mut peer_state = + match per_peer_state.get(counterparty_node_id).map(|state| state.lock().unwrap()) { + Some(peer_state) => peer_state, + None => continue, + }; + + if let Some(chan) = + peer_state.channel_by_id.get_mut(&source.channel_id).and_then(|c| c.as_funded_mut()) + { + chan.prune_inbound_htlc_onion(source.htlc_id, source, outbound_amt_msat); + } + } + } + + #[cfg(test)] + pub(crate) fn test_holding_cell_outbound_htlc_forwards_count( + &self, cp_id: PublicKey, chan_id: ChannelId, + ) -> usize { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state = per_peer_state.get(&cp_id).map(|state| state.lock().unwrap()).unwrap(); + let chan = peer_state.channel_by_id.get(&chan_id).and_then(|c| c.as_funded()).unwrap(); + chan.test_holding_cell_outbound_htlc_forwards_count() + } + + #[cfg(test)] + /// Useful to check that we prune inbound HTLC onions once they are irrevocably forwarded to the + /// outbound edge, see [`Self::prune_persisted_inbound_htlc_onions`]. + pub(crate) fn test_get_inbound_committed_htlcs_with_onion( + &self, cp_id: PublicKey, chan_id: ChannelId, + ) -> usize { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state = per_peer_state.get(&cp_id).map(|state| state.lock().unwrap()).unwrap(); + let chan = peer_state.channel_by_id.get(&chan_id).and_then(|c| c.as_funded()).unwrap(); + chan.inbound_committed_unresolved_htlcs() + .iter() + .filter(|(_, htlc)| matches!(htlc, InboundUpdateAdd::WithOnion { .. })) + .count() + } + + #[cfg(test)] + /// Useful for testing crash scenarios where the holding cell of a channel is not persisted. + pub(crate) fn test_clear_channel_holding_cell(&self, cp_id: PublicKey, chan_id: ChannelId) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state = per_peer_state.get(&cp_id).map(|state| state.lock().unwrap()).unwrap(); + let chan = + peer_state.channel_by_id.get_mut(&chan_id).and_then(|c| c.as_funded_mut()).unwrap(); + chan.test_clear_holding_cell(); + } + /// Completes channel resumption after locks have been released. /// /// Processes the [`PostMonitorUpdateChanResume`] returned by @@ -10121,6 +10186,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs, finalized_claimed_htlcs, failed_htlcs, + committed_outbound_htlc_sources, } => { self.post_monitor_update_unlock( channel_id, @@ -10131,6 +10197,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs, finalized_claimed_htlcs, failed_htlcs, + committed_outbound_htlc_sources, ); }, } @@ -16496,6 +16563,17 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures { const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; +// We plan to start writing this version in 0.5. +// +// LDK 0.5+ will reconstruct the set of pending HTLCs from `Channel{Monitor}` data that started +// being written in 0.3, ignoring legacy `ChannelManager` HTLC maps on read and not writing them. +// LDK 0.5+ will automatically fail to read if the pending HTLC set cannot be reconstructed, i.e. +// if we were last written with pending HTLCs on 0.2- or if the new 0.3+ fields are missing. +// +// If 0.3 or 0.4 reads this manager version, it knows that the legacy maps were not written and +// acts accordingly. +const RECONSTRUCT_HTLCS_FROM_CHANS_VERSION: u8 = 5; + impl_writeable_tlv_based!(PhantomRouteHints, { (2, channels, required_vec), (4, phantom_scid, required), @@ -17255,6 +17333,8 @@ pub(super) struct ChannelManagerData { in_flight_monitor_updates: Option>>, peer_storage_dir: Option)>>, async_receive_offer_cache: AsyncReceiveOfferCache, + // The `ChannelManager` version that was written. + version: u8, } /// Arguments for deserializing [`ChannelManagerData`]. @@ -17271,7 +17351,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> fn read( reader: &mut R, args: ChannelManagerDataReadArgs<'a, ES, SP, L>, ) -> Result { - let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); + let version = read_ver_prefix!(reader, SERIALIZATION_VERSION); let chain_hash: ChainHash = Readable::read(reader)?; let best_block_height: u32 = Readable::read(reader)?; @@ -17293,21 +17373,26 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> channels.push(channel); } - let forward_htlcs_count: u64 = Readable::read(reader)?; - let mut forward_htlcs_legacy: HashMap> = - hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128)); - for _ in 0..forward_htlcs_count { - let short_channel_id = Readable::read(reader)?; - let pending_forwards_count: u64 = Readable::read(reader)?; - let mut pending_forwards = Vec::with_capacity(cmp::min( - pending_forwards_count as usize, - MAX_ALLOC_SIZE / mem::size_of::(), - )); - for _ in 0..pending_forwards_count { - pending_forwards.push(Readable::read(reader)?); - } - forward_htlcs_legacy.insert(short_channel_id, pending_forwards); - } + let forward_htlcs_legacy: HashMap> = + if version < RECONSTRUCT_HTLCS_FROM_CHANS_VERSION { + let forward_htlcs_count: u64 = Readable::read(reader)?; + let mut fwds = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128)); + for _ in 0..forward_htlcs_count { + let short_channel_id = Readable::read(reader)?; + let pending_forwards_count: u64 = Readable::read(reader)?; + let mut pending_forwards = Vec::with_capacity(cmp::min( + pending_forwards_count as usize, + MAX_ALLOC_SIZE / mem::size_of::(), + )); + for _ in 0..pending_forwards_count { + pending_forwards.push(Readable::read(reader)?); + } + fwds.insert(short_channel_id, pending_forwards); + } + fwds + } else { + new_hash_map() + }; let claimable_htlcs_count: u64 = Readable::read(reader)?; let mut claimable_htlcs_list = @@ -17518,6 +17603,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + version, }) } } @@ -17629,6 +17715,15 @@ pub struct ChannelManagerReadArgs< /// /// This is not exported to bindings users because we have no HashMap bindings pub channel_monitors: HashMap>, + + /// Whether the `ChannelManager` should attempt to reconstruct its set of pending HTLCs from + /// `Channel{Monitor}` data rather than its own persisted maps, which is planned to become + /// the default behavior in upcoming versions. + /// + /// If `None`, whether we reconstruct or use the legacy maps will be decided randomly during + /// `ChannelManager::from_channel_manager_data`. + #[cfg(test)] + pub reconstruct_manager_from_monitors: Option, } impl< @@ -17666,6 +17761,8 @@ impl< channel_monitors: hash_map_from_iter( channel_monitors.drain(..).map(|monitor| (monitor.channel_id(), monitor)), ), + #[cfg(test)] + reconstruct_manager_from_monitors: None, } } } @@ -17810,6 +17907,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + version: _version, } = data; let empty_peer_state = || PeerState { @@ -18093,7 +18191,7 @@ impl< } // Post-deserialization processing - let mut decode_update_add_htlcs = new_hash_map(); + let mut decode_update_add_htlcs: HashMap> = new_hash_map(); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); } @@ -18353,34 +18451,54 @@ impl< // `reconstruct_manager_from_monitors` is set below. Currently it is only set in tests, randomly // to ensure the legacy codepaths also have test coverage. #[cfg(not(test))] - let reconstruct_manager_from_monitors = false; + let reconstruct_manager_from_monitors = _version >= RECONSTRUCT_HTLCS_FROM_CHANS_VERSION; #[cfg(test)] - let reconstruct_manager_from_monitors = { - use core::hash::{BuildHasher, Hasher}; - - match std::env::var("LDK_TEST_REBUILD_MGR_FROM_MONITORS") { - Ok(val) => match val.as_str() { - "1" => true, - "0" => false, - _ => panic!("LDK_TEST_REBUILD_MGR_FROM_MONITORS must be 0 or 1, got: {}", val), - }, - Err(_) => { - let rand_val = - std::collections::hash_map::RandomState::new().build_hasher().finish(); - if rand_val % 2 == 0 { - true - } else { - false - } - }, - } - }; + let reconstruct_manager_from_monitors = + args.reconstruct_manager_from_monitors.unwrap_or_else(|| { + use core::hash::{BuildHasher, Hasher}; + + match std::env::var("LDK_TEST_REBUILD_MGR_FROM_MONITORS") { + Ok(val) => match val.as_str() { + "1" => true, + "0" => false, + _ => panic!( + "LDK_TEST_REBUILD_MGR_FROM_MONITORS must be 0 or 1, got: {}", + val + ), + }, + Err(_) => { + let rand_val = + std::collections::hash_map::RandomState::new().build_hasher().finish(); + if rand_val % 2 == 0 { + true + } else { + false + } + }, + } + }); // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we // should ensure we try them again on the inbound edge. We put them here and do so after we // have a fully-constructed `ChannelManager` at the end. let mut pending_claims_to_replay = Vec::new(); + // If we find an inbound HTLC that claims to already be forwarded to the outbound edge, we + // store an identifier for it here and verify that it is either (a) present in the outbound + // edge or (b) removed from the outbound edge via claim. If it's in neither of these states, we + // infer that it was removed from the outbound edge via fail, and fail it backwards to ensure + // that it is handled. + let mut already_forwarded_htlcs = Vec::new(); + let prune_forwarded_htlc = + |already_forwarded_htlcs: &mut Vec<(PaymentHash, HTLCPreviousHopData, u64)>, + prev_hop: &HTLCPreviousHopData| { + if let Some(idx) = already_forwarded_htlcs.iter().position(|(_, htlc, _)| { + prev_hop.htlc_id == htlc.htlc_id + && prev_hop.prev_outbound_scid_alias == htlc.prev_outbound_scid_alias + }) { + already_forwarded_htlcs.swap_remove(idx); + } + }; { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -18403,16 +18521,38 @@ impl< if reconstruct_manager_from_monitors { if let Some(chan) = peer_state.channel_by_id.get(channel_id) { if let Some(funded_chan) = chan.as_funded() { + let scid_alias = funded_chan.context.outbound_scid_alias(); let inbound_committed_update_adds = - funded_chan.get_inbound_committed_update_adds(); - if !inbound_committed_update_adds.is_empty() { - // Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized - // `Channel`, as part of removing the requirement to regularly persist the - // `ChannelManager`. - decode_update_add_htlcs.insert( - funded_chan.context.outbound_scid_alias(), - inbound_committed_update_adds, - ); + funded_chan.inbound_committed_unresolved_htlcs(); + for (payment_hash, htlc) in inbound_committed_update_adds { + match htlc { + InboundUpdateAdd::WithOnion { update_add_htlc } => { + // Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized + // `Channel` as part of removing the requirement to regularly persist the + // `ChannelManager`. + match decode_update_add_htlcs.entry(scid_alias) { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().push(update_add_htlc); + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(vec![update_add_htlc]); + }, + } + }, + InboundUpdateAdd::Forwarded { + hop_data, + outbound_amt_msat, + } => { + already_forwarded_htlcs.push(( + payment_hash, + hop_data, + outbound_amt_msat, + )); + }, + InboundUpdateAdd::Legacy => { + return Err(DecodeError::InvalidValue) + }, + } } } } @@ -18456,48 +18596,70 @@ impl< let mut peer_state_lock = peer_state_mtx.lock().unwrap(); let peer_state = &mut *peer_state_lock; is_channel_closed = !peer_state.channel_by_id.contains_key(channel_id); + if reconstruct_manager_from_monitors && !is_channel_closed { + if let Some(chan) = peer_state.channel_by_id.get(channel_id) { + if let Some(funded_chan) = chan.as_funded() { + for prev_hop in funded_chan.outbound_htlc_forwards() { + dedup_decode_update_add_htlcs( + &mut decode_update_add_htlcs, + &prev_hop, + "HTLC already forwarded to the outbound edge", + &args.logger, + ); + prune_forwarded_htlc(&mut already_forwarded_htlcs, &prev_hop); + } + } + } + } } - for (htlc_source, (htlc, preimage_opt)) in monitor.get_all_current_outbound_htlcs() - { - let logger = - WithChannelMonitor::from(&args.logger, monitor, Some(htlc.payment_hash)); - let htlc_id = SentHTLCId::from_source(&htlc_source); - match htlc_source { - HTLCSource::PreviousHopData(prev_hop_data) => { - let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| { - info.prev_funding_outpoint == prev_hop_data.outpoint - && info.prev_htlc_id == prev_hop_data.htlc_id - }; - // If `reconstruct_manager_from_monitors` is set, we always add all inbound committed - // HTLCs to `decode_update_add_htlcs` in the above loop, but we need to prune from - // those added HTLCs if they were already forwarded to the outbound edge. Otherwise, - // we'll double-forward. - if reconstruct_manager_from_monitors { + if is_channel_closed { + for (htlc_source, (htlc, preimage_opt)) in + monitor.get_all_current_outbound_htlcs() + { + let logger = WithChannelMonitor::from( + &args.logger, + monitor, + Some(htlc.payment_hash), + ); + let htlc_id = SentHTLCId::from_source(&htlc_source); + match htlc_source { + HTLCSource::PreviousHopData(prev_hop_data) => { + let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| { + info.prev_funding_outpoint == prev_hop_data.outpoint + && info.prev_htlc_id == prev_hop_data.htlc_id + }; + + // If `reconstruct_manager_from_monitors` is set, we always add all inbound committed + // HTLCs to `decode_update_add_htlcs` in the above loop, but we need to prune from + // those added HTLCs if they were already forwarded to the outbound edge. Otherwise, + // we'll double-forward. + if reconstruct_manager_from_monitors { + dedup_decode_update_add_htlcs( + &mut decode_update_add_htlcs, + &prev_hop_data, + "HTLC already forwarded to the outbound edge", + &&logger, + ); + prune_forwarded_htlc( + &mut already_forwarded_htlcs, + &prev_hop_data, + ); + } + + // The ChannelMonitor is now responsible for this HTLC's + // failure/success and will let us know what its outcome is. If we + // still have an entry for this HTLC in `forward_htlcs_legacy`, + // `pending_intercepted_htlcs_legacy`, or + // `decode_update_add_htlcs_legacy`, we were apparently not persisted + // after the monitor was when forwarding the payment. dedup_decode_update_add_htlcs( - &mut decode_update_add_htlcs, + &mut decode_update_add_htlcs_legacy, &prev_hop_data, - "HTLC already forwarded to the outbound edge", + "HTLC was forwarded to the closed channel", &&logger, ); - } - - if !is_channel_closed || reconstruct_manager_from_monitors { - continue; - } - // The ChannelMonitor is now responsible for this HTLC's - // failure/success and will let us know what its outcome is. If we - // still have an entry for this HTLC in `forward_htlcs_legacy`, - // `pending_intercepted_htlcs_legacy`, or - // `decode_update_add_htlcs_legacy`, we were apparently not persisted - // after the monitor was when forwarding the payment. - dedup_decode_update_add_htlcs( - &mut decode_update_add_htlcs_legacy, - &prev_hop_data, - "HTLC was forwarded to the closed channel", - &&logger, - ); - forward_htlcs_legacy.retain(|_, forwards| { + forward_htlcs_legacy.retain(|_, forwards| { forwards.retain(|forward| { if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { if pending_forward_matches_htlc(&htlc_info) { @@ -18509,7 +18671,7 @@ impl< }); !forwards.is_empty() }); - pending_intercepted_htlcs_legacy.retain(|intercepted_id, htlc_info| { + pending_intercepted_htlcs_legacy.retain(|intercepted_id, htlc_info| { if pending_forward_matches_htlc(&htlc_info) { log_info!(logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", &htlc.payment_hash, &monitor.channel_id()); @@ -18521,113 +18683,111 @@ impl< false } else { true } }); - }, - HTLCSource::OutboundRoute { - payment_id, - session_priv, - path, - bolt12_invoice, - .. - } => { - if !is_channel_closed { - continue; - } - if let Some(preimage) = preimage_opt { - let pending_events = Mutex::new(pending_events_read); - let update = PaymentCompleteUpdate { - counterparty_node_id: monitor.get_counterparty_node_id(), - channel_funding_outpoint: monitor.get_funding_txo(), - channel_id: monitor.channel_id(), - htlc_id, - }; - let mut compl_action = Some( + }, + HTLCSource::OutboundRoute { + payment_id, + session_priv, + path, + bolt12_invoice, + .. + } => { + if let Some(preimage) = preimage_opt { + let pending_events = Mutex::new(pending_events_read); + let update = PaymentCompleteUpdate { + counterparty_node_id: monitor.get_counterparty_node_id(), + channel_funding_outpoint: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + htlc_id, + }; + let mut compl_action = Some( EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(update) ); - pending_outbounds.claim_htlc( - payment_id, - preimage, - bolt12_invoice, - session_priv, - path, - true, - &mut compl_action, - &pending_events, - &logger, - ); - // If the completion action was not consumed, then there was no - // payment to claim, and we need to tell the `ChannelMonitor` - // we don't need to hear about the HTLC again, at least as long - // as the PaymentSent event isn't still sitting around in our - // event queue. - let have_action = if compl_action.is_some() { - let pending_events = pending_events.lock().unwrap(); - pending_events.iter().any(|(_, act)| *act == compl_action) - } else { - false - }; - if !have_action && compl_action.is_some() { - let mut peer_state = per_peer_state + pending_outbounds.claim_htlc( + payment_id, + preimage, + bolt12_invoice, + session_priv, + path, + true, + &mut compl_action, + &pending_events, + &logger, + ); + // If the completion action was not consumed, then there was no + // payment to claim, and we need to tell the `ChannelMonitor` + // we don't need to hear about the HTLC again, at least as long + // as the PaymentSent event isn't still sitting around in our + // event queue. + let have_action = if compl_action.is_some() { + let pending_events = pending_events.lock().unwrap(); + pending_events.iter().any(|(_, act)| *act == compl_action) + } else { + false + }; + if !have_action && compl_action.is_some() { + let mut peer_state = per_peer_state .get(&counterparty_node_id) .map(|state| state.lock().unwrap()) .expect( "Channels originating a preimage must have peer state", ); - let update_id = peer_state + let update_id = peer_state .closed_channel_monitor_update_ids .get_mut(channel_id) .expect( "Channels originating a preimage must have a monitor", ); - // Note that for channels closed pre-0.1, the latest - // update_id is `u64::MAX`. - *update_id = update_id.saturating_add(1); - - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: monitor - .get_counterparty_node_id(), - funding_txo: monitor.get_funding_txo(), - channel_id: monitor.channel_id(), - update: ChannelMonitorUpdate { - update_id: *update_id, - channel_id: Some(monitor.channel_id()), - updates: vec![ + // Note that for channels closed pre-0.1, the latest + // update_id is `u64::MAX`. + *update_id = update_id.saturating_add(1); + + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: monitor + .get_counterparty_node_id(), + funding_txo: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + update: ChannelMonitorUpdate { + update_id: *update_id, + channel_id: Some(monitor.channel_id()), + updates: vec![ ChannelMonitorUpdateStep::ReleasePaymentComplete { htlc: htlc_id, }, ], + }, }, - }, - ); + ); + } + pending_events_read = pending_events.into_inner().unwrap(); } - pending_events_read = pending_events.into_inner().unwrap(); - } - }, + }, + } } - } - for (htlc_source, payment_hash) in monitor.get_onchain_failed_outbound_htlcs() { - let logger = - WithChannelMonitor::from(&args.logger, monitor, Some(payment_hash)); - log_info!( - logger, - "Failing HTLC with payment hash {} as it was resolved on-chain.", - payment_hash - ); - let completion_action = Some(PaymentCompleteUpdate { - counterparty_node_id: monitor.get_counterparty_node_id(), - channel_funding_outpoint: monitor.get_funding_txo(), - channel_id: monitor.channel_id(), - htlc_id: SentHTLCId::from_source(&htlc_source), - }); + for (htlc_source, payment_hash) in monitor.get_onchain_failed_outbound_htlcs() { + let logger = + WithChannelMonitor::from(&args.logger, monitor, Some(payment_hash)); + log_info!( + logger, + "Failing HTLC with payment hash {} as it was resolved on-chain.", + payment_hash + ); + let completion_action = Some(PaymentCompleteUpdate { + counterparty_node_id: monitor.get_counterparty_node_id(), + channel_funding_outpoint: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + htlc_id: SentHTLCId::from_source(&htlc_source), + }); - failed_htlcs.push(( - htlc_source, - payment_hash, - monitor.get_counterparty_node_id(), - monitor.channel_id(), - LocalHTLCFailureReason::OnChainTimeout, - completion_action, - )); + failed_htlcs.push(( + htlc_source, + payment_hash, + monitor.get_counterparty_node_id(), + monitor.channel_id(), + LocalHTLCFailureReason::OnChainTimeout, + completion_action, + )); + } } // Whether the downstream channel was closed or not, try to re-apply any payment @@ -19009,6 +19169,7 @@ impl< "HTLC was failed backwards during manager read", &args.logger, ); + prune_forwarded_htlc(&mut already_forwarded_htlcs, prev_hop_data); } } @@ -19154,9 +19315,47 @@ impl< }; let mut processed_claims: HashSet> = new_hash_set(); - for (_, monitor) in args.channel_monitors.iter() { + for (channel_id, monitor) in args.channel_monitors.iter() { for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() { + // If we have unresolved inbound committed HTLCs that were already forwarded to the + // outbound edge and removed via claim, we need to make sure to claim them backwards via + // adding them to `pending_claims_to_replay`. + for (hash, hop_data, outbound_amt_msat) in + mem::take(&mut already_forwarded_htlcs).drain(..) + { + if hash != payment_hash { + already_forwarded_htlcs.push((hash, hop_data, outbound_amt_msat)); + continue; + } + let new_pending_claim = !pending_claims_to_replay.iter().any(|(src, _, _, _, _, _, _)| { + matches!(src, HTLCSource::PreviousHopData(hop) if hop.htlc_id == hop_data.htlc_id && hop.prev_outbound_scid_alias == hop_data.prev_outbound_scid_alias) + }); + if new_pending_claim { + let counterparty_node_id = monitor.get_counterparty_node_id(); + let is_channel_closed = channel_manager + .per_peer_state + .read() + .unwrap() + .get(&counterparty_node_id) + .map_or(true, |peer_state_mtx| { + !peer_state_mtx + .lock() + .unwrap() + .channel_by_id + .contains_key(channel_id) + }); + pending_claims_to_replay.push(( + HTLCSource::PreviousHopData(hop_data), + payment_preimage, + outbound_amt_msat, + is_channel_closed, + counterparty_node_id, + monitor.get_funding_txo(), + *channel_id, + )); + } + } if !payment_claims.is_empty() { for payment_claim in payment_claims { if processed_claims.contains(&payment_claim.mpp_parts) { @@ -19398,6 +19597,17 @@ impl< channel_manager .fail_htlc_backwards_internal(&source, &hash, &reason, receiver, ev_action); } + for (hash, htlc, _) in already_forwarded_htlcs { + let channel_id = htlc.channel_id; + let node_id = htlc.counterparty_node_id; + let source = HTLCSource::PreviousHopData(htlc); + let reason = + HTLCFailReason::reason(LocalHTLCFailureReason::TemporaryNodeFailure, Vec::new()); + let receiver = HTLCHandlingFailureType::Forward { node_id, channel_id }; + // The event completion action is only relevant for HTLCs that originate from our node, not + // forwarded HTLCs. + channel_manager.fail_htlc_backwards_internal(&source, &hash, &reason, receiver, None); + } for ( source, diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index cea9ea45428..36d7c4fd345 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -909,6 +909,8 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { tx_broadcaster: &broadcaster, logger: &self.logger, channel_monitors, + #[cfg(test)] + reconstruct_manager_from_monitors: None, }, ) .unwrap(); @@ -1268,6 +1270,13 @@ pub fn check_added_monitors>(node: & } } +pub fn get_latest_mon_update_id<'a, 'b, 'c>( + node: &Node<'a, 'b, 'c>, channel_id: ChannelId, +) -> (u64, u64) { + let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap(); + monitor_id_state.get(&channel_id).unwrap().clone() +} + fn claimed_htlc_matches_path<'a, 'b, 'c>( origin_node: &Node<'a, 'b, 'c>, path: &[&Node<'a, 'b, 'c>], htlc: &ClaimedHTLC, ) -> bool { @@ -1300,7 +1309,7 @@ fn check_claimed_htlcs_match_route<'a, 'b, 'c>( pub fn _reload_node<'a, 'b, 'c>( node: &'a Node<'a, 'b, 'c>, config: UserConfig, chanman_encoded: &[u8], - monitors_encoded: &[&[u8]], + monitors_encoded: &[&[u8]], _reconstruct_manager_from_monitors: Option, ) -> TestChannelManager<'b, 'c> { let mut monitors_read = Vec::with_capacity(monitors_encoded.len()); for encoded in monitors_encoded { @@ -1334,6 +1343,8 @@ pub fn _reload_node<'a, 'b, 'c>( tx_broadcaster: node.tx_broadcaster, logger: node.logger, channel_monitors, + #[cfg(test)] + reconstruct_manager_from_monitors: _reconstruct_manager_from_monitors, }, ) .unwrap() @@ -1353,8 +1364,10 @@ pub fn _reload_node<'a, 'b, 'c>( } #[macro_export] -macro_rules! reload_node { - ($node: expr, $new_config: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: ident, $new_chain_monitor: ident, $new_channelmanager: ident) => { +macro_rules! _reload_node_inner { + ($node: expr, $new_config: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: + ident, $new_chain_monitor: ident, $new_channelmanager: ident, $reconstruct_pending_htlcs: expr + ) => { let chanman_encoded = $chanman_encoded; $persister = $crate::util::test_utils::TestPersister::new(); @@ -1368,22 +1381,63 @@ macro_rules! reload_node { ); $node.chain_monitor = &$new_chain_monitor; - $new_channelmanager = - _reload_node(&$node, $new_config, &chanman_encoded, $monitors_encoded); + $new_channelmanager = _reload_node( + &$node, + $new_config, + &chanman_encoded, + $monitors_encoded, + $reconstruct_pending_htlcs, + ); $node.node = &$new_channelmanager; $node.onion_messenger.set_offers_handler(&$new_channelmanager); $node.onion_messenger.set_async_payments_handler(&$new_channelmanager); }; +} + +#[macro_export] +macro_rules! reload_node { + // Reload the node using the node's current config ($node: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: ident, $new_chain_monitor: ident, $new_channelmanager: ident) => { let config = $node.node.get_current_config(); - reload_node!( + _reload_node_inner!( + $node, + config, + $chanman_encoded, + $monitors_encoded, + $persister, + $new_chain_monitor, + $new_channelmanager, + None + ); + }; + // Reload the node with the new provided config + ($node: expr, $new_config: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: ident, $new_chain_monitor: ident, $new_channelmanager: ident) => { + _reload_node_inner!( + $node, + $new_config, + $chanman_encoded, + $monitors_encoded, + $persister, + $new_chain_monitor, + $new_channelmanager, + None + ); + }; + // Reload the node and have the `ChannelManager` use new codepaths that reconstruct its set of + // pending HTLCs from `Channel{Monitor}` data. + ($node: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: + ident, $new_chain_monitor: ident, $new_channelmanager: ident, $reconstruct_pending_htlcs: expr + ) => { + let config = $node.node.get_current_config(); + _reload_node_inner!( $node, config, $chanman_encoded, $monitors_encoded, $persister, $new_chain_monitor, - $new_channelmanager + $new_channelmanager, + $reconstruct_pending_htlcs ); }; } @@ -5172,6 +5226,9 @@ pub struct ReconnectArgs<'a, 'b, 'c, 'd> { pub pending_cell_htlc_claims: (usize, usize), pub pending_cell_htlc_fails: (usize, usize), pub pending_raa: (bool, bool), + /// If true, don't assert that pending messages are empty after the commitment dance completes. + /// Useful when holding cell HTLCs will be released and need to be handled by the caller. + pub allow_post_commitment_dance_msgs: (bool, bool), } impl<'a, 'b, 'c, 'd> ReconnectArgs<'a, 'b, 'c, 'd> { @@ -5194,6 +5251,7 @@ impl<'a, 'b, 'c, 'd> ReconnectArgs<'a, 'b, 'c, 'd> { pending_cell_htlc_claims: (0, 0), pending_cell_htlc_fails: (0, 0), pending_raa: (false, false), + allow_post_commitment_dance_msgs: (false, false), } } } @@ -5219,6 +5277,7 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) { pending_raa, pending_responding_commitment_signed, pending_responding_commitment_signed_dup_monitor, + allow_post_commitment_dance_msgs, } = args; connect_nodes(node_a, node_b); let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b); @@ -5402,11 +5461,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) { get_event_msg!(node_a, MessageSendEvent::SendRevokeAndACK, node_b_id); // No commitment_signed so get_event_msg's assert(len == 1) passes node_b.node.handle_revoke_and_ack(node_a_id, &as_revoke_and_ack); - assert!(node_b.node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors( &node_b, if pending_responding_commitment_signed_dup_monitor.0 { 0 } else { 1 }, ); + if !allow_post_commitment_dance_msgs.0 { + assert!(node_b.node.get_and_clear_pending_msg_events().is_empty()); + } } } else { assert!(chan_msgs.2.is_none()); @@ -5516,11 +5577,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) { get_event_msg!(node_b, MessageSendEvent::SendRevokeAndACK, node_a_id); // No commitment_signed so get_event_msg's assert(len == 1) passes node_a.node.handle_revoke_and_ack(node_b_id, &bs_revoke_and_ack); - assert!(node_a.node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors( &node_a, if pending_responding_commitment_signed_dup_monitor.1 { 0 } else { 1 }, ); + if !allow_post_commitment_dance_msgs.1 { + assert!(node_a.node.get_and_clear_pending_msg_events().is_empty()); + } } } else { assert!(chan_msgs.2.is_none()); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 4fb2753b6be..9aec66eb7b8 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -437,6 +437,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { tx_broadcaster: nodes[0].tx_broadcaster, logger: &logger, channel_monitors: node_0_stale_monitors.iter().map(|monitor| { (monitor.channel_id(), monitor) }).collect(), + reconstruct_manager_from_monitors: None, }) { } else { panic!("If the monitor(s) are stale, this indicates a bug and we should get an Err return"); }; @@ -455,6 +456,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { tx_broadcaster: nodes[0].tx_broadcaster, logger: &logger, channel_monitors: node_0_monitors.iter().map(|monitor| { (monitor.channel_id(), monitor) }).collect(), + reconstruct_manager_from_monitors: None, }).unwrap(); nodes_0_deserialized = nodes_0_deserialized_tmp; assert!(nodes_0_read.is_empty()); @@ -1210,6 +1212,13 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { let updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]); do_commitment_signed_dance(&nodes[1], &nodes[0], &updates.commitment_signed, false, false); + // While an inbound HTLC is committed in a channel but not yet forwarded, we store its onion in + // the `Channel` in case we need to remember it on restart. Once it's irrevocably forwarded to the + // outbound edge, we can prune it on the inbound edge. + assert_eq!( + nodes[1].node.test_get_inbound_committed_htlcs_with_onion(nodes[0].node.get_our_node_id(), chan_id_1), + 1 + ); // Decode the HTLC onion but don't forward it to the next hop, such that the HTLC ends up in // `ChannelManager::forward_htlcs` or `ChannelManager::pending_intercepted_htlcs`. @@ -1231,6 +1240,13 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { args_b_c.send_announcement_sigs = (true, true); reconnect_nodes(args_b_c); + // Before an inbound HTLC is irrevocably forwarded, its onion should still be persisted within the + // inbound edge channel. + assert_eq!( + nodes[1].node.test_get_inbound_committed_htlcs_with_onion(nodes[0].node.get_our_node_id(), chan_id_1), + 1 + ); + // Forward the HTLC and ensure we can claim it post-reload. nodes[1].node.process_pending_htlc_forwards(); @@ -1253,6 +1269,12 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { nodes[2].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); do_commitment_signed_dance(&nodes[2], &nodes[1], &updates.commitment_signed, false, false); expect_and_process_pending_htlcs(&nodes[2], false); + // After an inbound HTLC is irrevocably forwarded, its onion should be pruned within the inbound + // edge channel. + assert_eq!( + nodes[1].node.test_get_inbound_committed_htlcs_with_onion(nodes[0].node.get_our_node_id(), chan_id_1), + 0 + ); expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id()); let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; @@ -1318,6 +1340,117 @@ fn test_manager_persisted_post_outbound_edge_forward() { expect_payment_sent(&nodes[0], payment_preimage, None, true, true); } +#[test] +fn test_manager_persisted_post_outbound_edge_holding_cell() { + // Test that we will not double-forward an HTLC after restart if it is already in the outbound + // edge's holding cell, which was previously broken. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2; + let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2).2; + send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 5000000); + + // Lock in the HTLC from node_a <> node_b. + let amt_msat = 1000; + let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[2], amt_msat); + nodes[0].node.send_payment_with_route(route, payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_hash.0)).unwrap(); + check_added_monitors(&nodes[0], 1); + let updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[1], &nodes[0], &updates.commitment_signed, false, false); + + // Send a 2nd HTLC node_c -> node_b, to force the first HTLC into the holding cell. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + let (route_2, payment_hash_2, payment_preimage_2, payment_secret_2) = get_route_and_payment_hash!(nodes[2], nodes[1], amt_msat); + nodes[2].node.send_payment_with_route(route_2, payment_hash_2, RecipientOnionFields::secret_only(payment_secret_2), PaymentId(payment_hash_2.0)).unwrap(); + let send_event = + SendEvent::from_event(nodes[2].node.get_and_clear_pending_msg_events().remove(0)); + nodes[1].node.handle_update_add_htlc(nodes[2].node.get_our_node_id(), &send_event.msgs[0]); + nodes[1].node.handle_commitment_signed_batch_test(nodes[2].node.get_our_node_id(), &send_event.commitment_msg); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + check_added_monitors(&nodes[1], 1); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + // Add the HTLC to the outbound edge, node_b <> node_c. Force the outbound HTLC into the b<>c + // holding cell. + nodes[1].node.process_pending_htlc_forwards(); + check_added_monitors(&nodes[1], 0); + assert_eq!( + nodes[1].node.test_holding_cell_outbound_htlc_forwards_count(nodes[2].node.get_our_node_id(), chan_id_2), + 1 + ); + + // Disconnect peers and reload the forwarding node_b. + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[2].node.peer_disconnected(nodes[1].node.get_our_node_id()); + + let node_b_encoded = nodes[1].node.encode(); + let chan_0_monitor_serialized = get_monitor!(nodes[1], chan_id_1).encode(); + let chan_1_monitor_serialized = get_monitor!(nodes[1], chan_id_2).encode(); + reload_node!(nodes[1], node_b_encoded, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], persister, new_chain_monitor, nodes_1_deserialized); + + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + let (latest_update, _) = get_latest_mon_update_id(&nodes[1], chan_id_2); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id_2, latest_update); + + reconnect_nodes(ReconnectArgs::new(&nodes[1], &nodes[0])); + + // Reconnect b<>c. Node_b has pending RAA + commitment_signed from the incomplete c->b + // commitment dance, plus an HTLC in the holding cell that will be released after the dance. + let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[2]); + reconnect_args.pending_raa = (false, true); + reconnect_args.pending_responding_commitment_signed = (false, true); + // Node_c needs a monitor update to catch up after processing node_b's reestablish. + reconnect_args.expect_renegotiated_funding_locked_monitor_update = (false, true); + // The holding cell HTLC will be released after the commitment dance - handle it below. + reconnect_args.allow_post_commitment_dance_msgs = (false, true); + reconnect_nodes(reconnect_args); + + // The holding cell HTLC was released during the reconnect. Complete its commitment dance. + let holding_cell_htlc_msgs = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(holding_cell_htlc_msgs.len(), 1); + match &holding_cell_htlc_msgs[0] { + MessageSendEvent::UpdateHTLCs { node_id, updates, .. } => { + assert_eq!(*node_id, nodes[2].node.get_our_node_id()); + assert_eq!(updates.update_add_htlcs.len(), 1); + nodes[2].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[2], &nodes[1], &updates.commitment_signed, false, false); + } + _ => panic!("Unexpected message: {:?}", holding_cell_htlc_msgs[0]), + } + + // Ensure node_b won't double-forward the outbound HTLC (this was previously broken). + nodes[1].node.process_pending_htlc_forwards(); + let msgs = nodes[1].node.get_and_clear_pending_msg_events(); + assert!(msgs.is_empty(), "Expected 0 messages, got {:?}", msgs); + + // The a->b->c HTLC is now committed on node_c. The c->b HTLC is committed on node_b. + // Both payments should now be claimable. + expect_and_process_pending_htlcs(&nodes[2], false); + expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id()); + expect_payment_claimable!(nodes[1], payment_hash_2, payment_secret_2, amt_msat, None, nodes[1].node.get_our_node_id()); + + // Claim the a->b->c payment on node_c. + let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; + do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage)); + expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + + // Claim the c->b payment on node_b. + nodes[1].node.claim_funds(payment_preimage_2); + expect_payment_claimed!(nodes[1], payment_hash_2, amt_msat); + check_added_monitors(&nodes[1], 1); + let mut update = get_htlc_update_msgs(&nodes[1], &nodes[2].node.get_our_node_id()); + nodes[2].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), update.update_fulfill_htlcs.remove(0)); + do_commitment_signed_dance(&nodes[2], &nodes[1], &update.commitment_signed, false, false); + expect_payment_sent(&nodes[2], payment_preimage_2, None, true, true); +} + #[test] fn test_reload_partial_funding_batch() { let chanmon_cfgs = create_chanmon_cfgs(3); @@ -1565,3 +1698,306 @@ fn test_peer_storage() { assert!(res.is_err()); } +#[test] +fn outbound_removed_holding_cell_resolved_no_double_forward() { + // Test that if a forwarding node has an HTLC that is fully removed on the outbound edge + // but where the inbound edge resolution is in the holding cell, and we reload the node in this + // state, that node will not double-forward the HTLC. + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let node_0_id = nodes[0].node.get_our_node_id(); + let node_1_id = nodes[1].node.get_our_node_id(); + let node_2_id = nodes[2].node.get_our_node_id(); + + let chan_0_1 = create_announced_chan_between_nodes(&nodes, 0, 1); + let chan_1_2 = create_announced_chan_between_nodes(&nodes, 1, 2); + + let chan_id_0_1 = chan_0_1.2; + let chan_id_1_2 = chan_1_2.2; + + // Send a payment from nodes[0] to nodes[2] via nodes[1]. + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[2], 1_000_000); + send_along_route_with_secret( + &nodes[0], route, &[&[&nodes[1], &nodes[2]]], 1_000_000, payment_hash, payment_secret, + ); + + // Claim the payment on nodes[2]. + nodes[2].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[2], 1); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + + // Disconnect nodes[0] from nodes[1] BEFORE processing the fulfill. + // This forces the inbound fulfill resolution go to into nodes[1]'s holding cell for the inbound + // channel. + nodes[0].node.peer_disconnected(node_1_id); + nodes[1].node.peer_disconnected(node_0_id); + + // Process the fulfill from nodes[2] to nodes[1]. + let updates_2_1 = get_htlc_update_msgs(&nodes[2], &node_1_id); + nodes[1].node.handle_update_fulfill_htlc(node_2_id, updates_2_1.update_fulfill_htlcs[0].clone()); + check_added_monitors(&nodes[1], 1); + do_commitment_signed_dance(&nodes[1], &nodes[2], &updates_2_1.commitment_signed, false, false); + expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false); + + // At this point: + // - The outbound HTLC nodes[1]->nodes[2] is resolved and removed + // - The inbound HTLC nodes[0]->nodes[1] is still in a Committed state, with the fulfill + // resolution in nodes[1]'s chan_0_1 holding cell + let node_1_serialized = nodes[1].node.encode(); + let mon_0_1_serialized = get_monitor!(nodes[1], chan_id_0_1).encode(); + let mon_1_2_serialized = get_monitor!(nodes[1], chan_id_1_2).encode(); + + // Reload nodes[1]. + // During deserialization, we previously would have not noticed that the nodes[0]<>nodes[1] HTLC + // had a resolution pending in the holding cell, and reconstructed the ChannelManager's pending + // HTLC state indicating that the HTLC still needed to be forwarded to the outbound edge. + reload_node!( + nodes[1], + node_1_serialized, + &[&mon_0_1_serialized, &mon_1_2_serialized], + persister, + new_chain_monitor, + nodes_1_deserialized + ); + + // Check that nodes[1] doesn't double-forward the HTLC. + nodes[1].node.process_pending_htlc_forwards(); + + // Reconnect nodes[1] to nodes[0]. The claim should be in nodes[1]'s holding cell. + let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[0]); + reconnect_args.pending_cell_htlc_claims = (0, 1); + reconnect_nodes(reconnect_args); + + // nodes[0] should now have received the fulfill and generate PaymentSent. + expect_payment_sent(&nodes[0], payment_preimage, None, true, true); +} + +#[test] +fn test_reload_node_with_preimage_in_monitor_claims_htlc() { + // Test that if a forwarding node has an HTLC that was irrevocably removed on the outbound edge + // via claim but is still forwarded-and-unresolved in the inbound edge, that HTLC will not be + // failed back on the inbound edge on reload. + // + // For context, the ChannelManager is moving towards reconstructing the pending inbound HTLC set + // from Channel data on startup. If we find an inbound HTLC that is flagged as already-forwarded, + // we then check that the HTLC is either (a) still present in the outbound edge or (b) removed + // from the outbound edge but with a preimage present in the corresponding ChannelMonitor, + // indicating that it was removed from the outbound edge via claim. If neither of those are the + // case, we infer that the HTLC was removed from the outbound edge via failure and fail the HTLC + // backwards. + // + // Here we ensure that inbound HTLCs in case (b) above will not be failed backwards on manager + // reload. + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let node_0_id = nodes[0].node.get_our_node_id(); + let node_1_id = nodes[1].node.get_our_node_id(); + let node_2_id = nodes[2].node.get_our_node_id(); + + let chan_0_1 = create_announced_chan_between_nodes(&nodes, 0, 1); + let chan_1_2 = create_announced_chan_between_nodes(&nodes, 1, 2); + + let chan_id_0_1 = chan_0_1.2; + let chan_id_1_2 = chan_1_2.2; + + // Send a payment from nodes[0] to nodes[2] via nodes[1]. + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[2], 1_000_000); + send_along_route_with_secret( + &nodes[0], route, &[&[&nodes[1], &nodes[2]]], 1_000_000, payment_hash, payment_secret, + ); + + // Claim the payment on nodes[2]. + nodes[2].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[2], 1); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + + // Disconnect nodes[0] from nodes[1] BEFORE processing the fulfill. + // This prevents the claim from propagating back, leaving the inbound HTLC in ::Forwarded state. + nodes[0].node.peer_disconnected(node_1_id); + nodes[1].node.peer_disconnected(node_0_id); + + // Process the fulfill from nodes[2] to nodes[1]. + // This stores the preimage in nodes[1]'s monitor for chan_1_2. + let updates_2_1 = get_htlc_update_msgs(&nodes[2], &node_1_id); + nodes[1].node.handle_update_fulfill_htlc(node_2_id, updates_2_1.update_fulfill_htlcs[0].clone()); + check_added_monitors(&nodes[1], 1); + do_commitment_signed_dance(&nodes[1], &nodes[2], &updates_2_1.commitment_signed, false, false); + expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false); + + // Clear the holding cell's claim entry on chan_0_1 before serialization. + // This simulates a crash where the HTLC was fully removed from the outbound edge but is still + // present on the inbound edge without a resolution. + nodes[1].node.test_clear_channel_holding_cell(node_0_id, chan_id_0_1); + + // At this point: + // - The inbound HTLC on nodes[1] (from nodes[0]) is in ::Forwarded state + // - The preimage IS in nodes[1]'s monitor for chan_1_2 + // - The outbound HTLC to nodes[2] is resolved + // + // Serialize nodes[1] state and monitors before reloading. + let node_1_serialized = nodes[1].node.encode(); + let mon_0_1_serialized = get_monitor!(nodes[1], chan_id_0_1).encode(); + let mon_1_2_serialized = get_monitor!(nodes[1], chan_id_1_2).encode(); + + // Reload nodes[1]. + // During deserialization, we track inbound HTLCs that purport to already be forwarded on the + // outbound edge. If any are entirely missing from the outbound edge with no preimage available, + // they will be failed backwards. Otherwise, as in this case where a preimage is available, the + // payment should be claimed backwards. + reload_node!( + nodes[1], + node_1_serialized, + &[&mon_0_1_serialized, &mon_1_2_serialized], + persister, + new_chain_monitor, + nodes_1_deserialized, + Some(true) + ); + + // When the claim is reconstructed during reload, a PaymentForwarded event is generated. + // This event has next_user_channel_id as None since the outbound HTLC was already removed. + // Fetching events triggers the pending monitor update (adding preimage) to be applied. + let events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match &events[0] { + Event::PaymentForwarded { total_fee_earned_msat: Some(1000), .. } => {}, + _ => panic!("Expected PaymentForwarded event"), + } + check_added_monitors(&nodes[1], 1); + + // Reconnect nodes[1] to nodes[0]. The claim should be in nodes[1]'s holding cell. + let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[0]); + reconnect_args.pending_cell_htlc_claims = (0, 1); + reconnect_nodes(reconnect_args); + + // nodes[0] should now have received the fulfill and generate PaymentSent. + expect_payment_sent(&nodes[0], payment_preimage, None, true, true); +} + +#[test] +fn test_reload_node_without_preimage_fails_htlc() { + // Test that if a forwarding node has an HTLC that was removed on the outbound edge via failure + // but is still forwarded-and-unresolved in the inbound edge, that HTLC will be correctly + // failed back on reload via the already_forwarded_htlcs mechanism. + // + // For context, the ChannelManager reconstructs the pending inbound HTLC set from Channel data + // on startup. If an inbound HTLC is present but flagged as already-forwarded, we check that + // the HTLC is either (a) still present in the outbound edge or (b) removed from the outbound + // edge but with a preimage present in the corresponding ChannelMonitor, indicating it was + // removed via claim. If neither, we infer the HTLC was removed via failure and fail it back. + // + // Here we test the failure case: no preimage is present, so the HTLC should be failed back. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let node_0_id = nodes[0].node.get_our_node_id(); + let node_1_id = nodes[1].node.get_our_node_id(); + let node_2_id = nodes[2].node.get_our_node_id(); + + let chan_0_1 = create_announced_chan_between_nodes(&nodes, 0, 1); + let chan_1_2 = create_announced_chan_between_nodes(&nodes, 1, 2); + + let chan_id_0_1 = chan_0_1.2; + let chan_id_1_2 = chan_1_2.2; + + // Send a payment from nodes[0] to nodes[2] via nodes[1]. + let (route, payment_hash, _, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[2], 1_000_000); + send_along_route_with_secret( + &nodes[0], route, &[&[&nodes[1], &nodes[2]]], 1_000_000, payment_hash, payment_secret, + ); + + // Disconnect nodes[0] from nodes[1] BEFORE processing the failure. + // This prevents the fail from propagating back, leaving the inbound HTLC in ::Forwarded state. + nodes[0].node.peer_disconnected(node_1_id); + nodes[1].node.peer_disconnected(node_0_id); + + // Fail the payment on nodes[2] and process the failure to nodes[1]. + // This removes the outbound HTLC and queues a fail in the holding cell. + nodes[2].node.fail_htlc_backwards(&payment_hash); + expect_and_process_pending_htlcs_and_htlc_handling_failed( + &nodes[2], &[HTLCHandlingFailureType::Receive { payment_hash }] + ); + check_added_monitors(&nodes[2], 1); + + let updates_2_1 = get_htlc_update_msgs(&nodes[2], &node_1_id); + nodes[1].node.handle_update_fail_htlc(node_2_id, &updates_2_1.update_fail_htlcs[0]); + do_commitment_signed_dance(&nodes[1], &nodes[2], &updates_2_1.commitment_signed, false, false); + expect_and_process_pending_htlcs_and_htlc_handling_failed( + &nodes[1], &[HTLCHandlingFailureType::Forward { node_id: Some(node_2_id), channel_id: chan_id_1_2 }] + ); + + // Clear the holding cell's fail entry on chan_0_1 before serialization. + // This simulates a crash where the HTLC was fully removed from the outbound edge but is still + // present on the inbound edge without a resolution. Otherwise, we would not be able to exercise + // the desired failure paths due to the holding cell failure resolution being present. + nodes[1].node.test_clear_channel_holding_cell(node_0_id, chan_id_0_1); + + // Now serialize. The state has: + // - Inbound HTLC on chan_0_1 in ::Forwarded state + // - Outbound HTLC on chan_1_2 resolved (not present) + // - No preimage in monitors (it was a failure) + // - No holding cell entry for the fail (we cleared it) + let node_1_serialized = nodes[1].node.encode(); + let mon_0_1_serialized = get_monitor!(nodes[1], chan_id_0_1).encode(); + let mon_1_2_serialized = get_monitor!(nodes[1], chan_id_1_2).encode(); + + // Reload nodes[1]. + // The already_forwarded_htlcs mechanism should detect: + // - Inbound HTLC is in ::Forwarded state + // - Outbound HTLC is not present in outbound channel + // - No preimage in monitors + // Therefore it should fail the HTLC backwards. + reload_node!( + nodes[1], + node_1_serialized, + &[&mon_0_1_serialized, &mon_1_2_serialized], + persister, + new_chain_monitor, + nodes_1_deserialized, + Some(true) + ); + + // After reload, nodes[1] should have generated an HTLCHandlingFailed event. + let events = nodes[1].node.get_and_clear_pending_events(); + assert!(!events.is_empty(), "Expected HTLCHandlingFailed event"); + for event in events { + match event { + Event::HTLCHandlingFailed { .. } => {}, + _ => panic!("Unexpected event {:?}", event), + } + } + + // Process the failure so it goes back into chan_0_1's holding cell. + nodes[1].node.process_pending_htlc_forwards(); + check_added_monitors(&nodes[1], 0); // No monitor update yet (peer disconnected) + + // Reconnect nodes[1] to nodes[0]. The fail should be in nodes[1]'s holding cell. + let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[0]); + reconnect_args.pending_cell_htlc_fails = (0, 1); + reconnect_nodes(reconnect_args); + + // nodes[0] should now have received the failure and generate PaymentFailed. + expect_payment_failed_conditions(&nodes[0], payment_hash, false, PaymentFailedConditions::new()); +} diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index f821aa5afc0..6579c0353a3 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -979,13 +979,15 @@ where } } -// Vectors +/// Write number of items in a vec followed by each element, without writing a length-prefix for +/// each element. +#[macro_export] macro_rules! impl_writeable_for_vec { ($ty: ty $(, $name: ident)*) => { impl<$($name : Writeable),*> Writeable for Vec<$ty> { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { - CollectionLength(self.len() as u64).write(w)?; + $crate::util::ser::CollectionLength(self.len() as u64).write(w)?; for elem in self.iter() { elem.write(w)?; } @@ -994,15 +996,21 @@ macro_rules! impl_writeable_for_vec { } } } +/// Read the number of items in a vec followed by each element, without reading a length prefix for +/// each element. +/// +/// Each element is read with `MaybeReadable`, meaning if an element cannot be read then it is +/// skipped without returning `DecodeError::InvalidValue`. +#[macro_export] macro_rules! impl_readable_for_vec { ($ty: ty $(, $name: ident)*) => { impl<$($name : Readable),*> Readable for Vec<$ty> { #[inline] - fn read(r: &mut R) -> Result { - let len: CollectionLength = Readable::read(r)?; - let mut ret = Vec::with_capacity(cmp::min(len.0 as usize, MAX_BUF_SIZE / core::mem::size_of::<$ty>())); + fn read(r: &mut R) -> Result { + let len: $crate::util::ser::CollectionLength = Readable::read(r)?; + let mut ret = Vec::with_capacity(cmp::min(len.0 as usize, $crate::util::ser::MAX_BUF_SIZE / core::mem::size_of::<$ty>())); for _ in 0..len.0 { - if let Some(val) = MaybeReadable::read(r)? { + if let Some(val) = $crate::util::ser::MaybeReadable::read(r)? { ret.push(val); } }