Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/DBChecker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package fr.acinq.eclair

import fr.acinq.eclair.channel.{ChannelDataWithCommitments, PersistentChannelData}
import fr.acinq.eclair.channel.PersistentChannelDataWithKeys
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
import grizzled.slf4j.Logging
Expand All @@ -31,18 +31,16 @@ object DBChecker extends Logging {
* - it is compatible with the current version of eclair
* - channel keys can be re-generated from the channel seed
*/
def checkChannelsDB(nodeParams: NodeParams): Seq[PersistentChannelData] = {
def checkChannelsDB(nodeParams: NodeParams): Seq[PersistentChannelDataWithKeys] = {
Try(nodeParams.db.channels.listLocalChannels()) match {
case Success(channels) =>
channels.foreach {
case data: ChannelDataWithCommitments =>
val channelKeys = nodeParams.channelKeyManager.channelKeys(data.channelParams.channelConfig, data.channelParams.localParams.fundingKeyPath)
if (!data.commitments.validateSeed(channelKeys)) {
throw InvalidChannelSeedException(data.channelId)
}
case _ => ()
}
channels
channels.map(channel => {
val channelWithKeys = channel.withChannelKeys(nodeParams)
if (!channelWithKeys.validateSeed()) {
throw InvalidChannelSeedException(channel.channelId)
}
channelWithKeys
})
case Failure(t) => throw IncompatibleDBException(t)
}
}
Expand Down
4 changes: 2 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class Setup(val datadir: File,

txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory)
pendingChannelsRateLimiter = system.spawn(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, channels)).onFailure(typed.SupervisorStrategy.resume), name = "pending-channels-rate-limiter")
pendingChannelsRateLimiter = system.spawn(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, channels.map(_.channelData))).onFailure(typed.SupervisorStrategy.resume), name = "pending-channels-rate-limiter")
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, pendingChannelsRateLimiter, register, router.toTyped)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
Expand All @@ -403,7 +403,7 @@ class Setup(val datadir: File,
balanceActor = system.spawn(BalanceActor(bitcoinClient, nodeParams.channelConf.minDepth, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")
postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard, router.toTyped, register, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
peerScorer_opt = if (nodeParams.peerScoringConfig.enabled) {
val statsTracker = system.spawn(Behaviors.supervise(PeerStatsTracker(nodeParams.peerStatsTrackerConfig, nodeParams.db.audit, channels)).onFailure(typed.SupervisorStrategy.restart), name = "peer-stats-tracker")
val statsTracker = system.spawn(Behaviors.supervise(PeerStatsTracker(nodeParams.peerStatsTrackerConfig, nodeParams.db.audit, channels.map(_.channelData))).onFailure(typed.SupervisorStrategy.restart), name = "peer-stats-tracker")
Some(system.spawn(Behaviors.supervise(PeerScorer(nodeParams, bitcoinClient, statsTracker, register)).onFailure(typed.SupervisorStrategy.restart), name = "peer-scorer"))
} else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import fr.acinq.eclair.blockchain.fee.{ConfirmationTarget, FeeratePerKw}
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel.fund.InteractiveTxBuilder._
import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession}
import fr.acinq.eclair.crypto.keymanager.ChannelKeys
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.reputation.Reputation
import fr.acinq.eclair.transactions.CommitmentSpec
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureReason, FundingCreated, FundingSigned, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxInitRbf, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64}
import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, NodeParams, RealShortChannelId, TimestampMilli, UInt64}
import scodec.bits.ByteVector

import java.util.UUID
Expand Down Expand Up @@ -549,7 +550,22 @@ case object Nothing extends TransientChannelData {
sealed trait PersistentChannelData extends ChannelData {
def remoteNodeId: PublicKey
def channelParams: ChannelParams
def withChannelKeys(nodeParams: NodeParams): PersistentChannelDataWithKeys = {
val channelKeys = nodeParams.channelKeyManager.channelKeys(channelParams.channelConfig, channelParams.localParams.fundingKeyPath)
PersistentChannelDataWithKeys(this, channelKeys)
}
}

case class PersistentChannelDataWithKeys(channelData: PersistentChannelData, channelKeys: ChannelKeys) {
val channelId: ByteVector32 = channelData.channelId
val remoteNodeId: PublicKey = channelData.remoteNodeId

def validateSeed(): Boolean = channelData match {
case c: ChannelDataWithCommitments => c.commitments.validateSeed(channelKeys)
case _: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED => true
}
}

sealed trait ChannelDataWithoutCommitments extends PersistentChannelData {
val channelId: ByteVector32 = channelParams.channelId
val remoteNodeId: PublicKey = channelParams.remoteNodeId
Expand Down
9 changes: 4 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ class Peer(val nodeParams: NodeParams,
case Event(init: Init, _) =>
pendingOnTheFlyFunding = init.pendingOnTheFlyFunding
val channels = init.storedChannels.map { state =>
val channelKeys = nodeParams.channelKeyManager.channelKeys(state.channelParams.channelConfig, state.channelParams.localParams.fundingKeyPath)
val channel = spawnChannel(channelKeys)
channel ! INPUT_RESTORED(state)
val channel = spawnChannel(state.channelKeys)
channel ! INPUT_RESTORED(state.channelData)
FinalChannelId(state.channelId) -> channel
}.toMap
// We only connect to nodes with whom we have a channel, which aren't mobile wallets.
val autoReconnect = init.storedChannels.exists(c => !c.channelParams.remoteParams.initFeatures.hasFeature(Features.WakeUpNotificationClient))
val autoReconnect = init.storedChannels.exists(c => !c.channelData.channelParams.remoteParams.initFeatures.hasFeature(Features.WakeUpNotificationClient))
context.system.eventStream.publish(PeerCreated(self, remoteNodeId))
// When we restart, we will attempt to reconnect right away, but then we'll wait.
// We don't fetch our peer's features from the DB: if the connection succeeds, we will get them from their init message, which saves a DB call.
Expand Down Expand Up @@ -1121,7 +1120,7 @@ object Peer {
case object DISCONNECTED extends State
case object CONNECTED extends State

case class Init(storedChannels: Set[PersistentChannelData], pendingOnTheFlyFunding: Map[ByteVector32, OnTheFlyFunding.Pending])
case class Init(storedChannels: Set[PersistentChannelDataWithKeys], pendingOnTheFlyFunding: Map[ByteVector32, OnTheFlyFunding.Pending])
case class Connect(nodeId: PublicKey, address_opt: Option[NodeAddress], replyTo: ActorRef, isPersistent: Boolean) {
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
}
Expand Down
12 changes: 6 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
// Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped
// just after a channel state has transitioned to CLOSED and before it has effectively been removed.
// Closed channels will be removed, other channels will be restored.
val (channels, closedChannels) = init.channels.partition(c => Closing.isClosed(c, None).isEmpty)
val (channels, closedChannels) = init.channels.partition(c => Closing.isClosed(c.channelData, None).isEmpty)
closedChannels.foreach(c => {
log.info("channel {} was closed before restarting, updating the DB", c.channelId)
val closingData_opt = (c, Closing.isClosed(c, None)) match {
val closingData_opt = (c.channelData, Closing.isClosed(c.channelData, None)) match {
case (c: DATA_CLOSING, Some(closingType)) => Some(DATA_CLOSED(c, closingType))
case _ => None
}
Expand All @@ -71,7 +71,7 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
(peersWithOnTheFlyFunding -- peersWithChannels.keySet).foreach {
case (remoteNodeId, pending) => createOrGetPeer(remoteNodeId, Set.empty, pending)
}
val peerCapacities = channels.map {
val peerCapacities = channels.map(_.channelData).map {
case channelData: ChannelDataWithoutCommitments => (channelData.remoteNodeId, 0L)
case channelData: ChannelDataWithCommitments => (channelData.remoteNodeId, channelData.commitments.capacity.toLong)
}.groupMapReduce[PublicKey, Long](_._1)(_._2)(_ + _)
Expand Down Expand Up @@ -152,9 +152,9 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
*/
def getPeer(remoteNodeId: PublicKey): Option[ActorRef] = context.child(peerActorName(remoteNodeId))

def createPeer(remoteNodeId: PublicKey): ActorRef = peerFactory.spawn(context, remoteNodeId)
private def createPeer(remoteNodeId: PublicKey): ActorRef = peerFactory.spawn(context, remoteNodeId)

def createOrGetPeer(remoteNodeId: PublicKey, offlineChannels: Set[PersistentChannelData], pendingOnTheFlyFunding: Map[ByteVector32, OnTheFlyFunding.Pending]): ActorRef = {
private def createOrGetPeer(remoteNodeId: PublicKey, offlineChannels: Set[PersistentChannelDataWithKeys], pendingOnTheFlyFunding: Map[ByteVector32, OnTheFlyFunding.Pending]): ActorRef = {
getPeer(remoteNodeId) match {
case Some(peer) => peer
case None =>
Expand Down Expand Up @@ -190,7 +190,7 @@ object Switchboard {
def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"

// @formatter:off
case class Init(channels: Seq[PersistentChannelData])
case class Init(channels: Seq[PersistentChannelDataWithKeys])

case object GetPeers
case class GetPeerInfo(replyTo: typed.ActorRef[PeerInfoResponse], remoteNodeId: PublicKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial
val channels = listLocalChannels(init.channels)
val onTheFlyPayments = nodeParams.db.liquidity.listPendingOnTheFlyPayments().values.flatten.toSet
val nonStandardIncomingHtlcs: Seq[IncomingHtlc] = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getIncomingHtlcs(nodeParams, log) }.flatten
val htlcsIn: Seq[IncomingHtlc] = getIncomingHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey, nodeParams.features) ++ nonStandardIncomingHtlcs
val htlcsIn: Seq[IncomingHtlc] = getIncomingHtlcs(channels.map(_.channelData), nodeParams.db.payments, nodeParams.privateKey, nodeParams.features) ++ nonStandardIncomingHtlcs
val nonStandardRelayedOutHtlcs: Map[Origin.Cold, Set[(ByteVector32, Long)]] = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getHtlcsRelayedOut(htlcsIn, nodeParams, log) }.flatten.toMap
val relayedOut: Map[Origin.Cold, Set[(ByteVector32, Long)]] = getHtlcsRelayedOut(nodeParams, channels, htlcsIn) ++ nonStandardRelayedOutHtlcs
val relayedOut: Map[Origin.Cold, Set[(ByteVector32, Long)]] = getHtlcsRelayedOut(channels, htlcsIn) ++ nonStandardRelayedOutHtlcs

val settledHtlcs: Set[(ByteVector32, Long)] = nodeParams.db.pendingCommands.listSettlementCommands().map { case (channelId, cmd) => (channelId, cmd.id) }.toSet
val notRelayed = htlcsIn.filterNot(htlcIn => {
Expand Down Expand Up @@ -310,7 +310,7 @@ object PostRestartHtlcCleaner {

def props(nodeParams: NodeParams, register: ActorRef, initialized: Option[Promise[Done]] = None): Props = Props(new PostRestartHtlcCleaner(nodeParams, register, initialized))

case class Init(channels: Seq[PersistentChannelData])
case class Init(channels: Seq[PersistentChannelDataWithKeys])

case object GetBrokenHtlcs

Expand Down Expand Up @@ -405,10 +405,9 @@ object PostRestartHtlcCleaner {
.toMap

/** @return pending outgoing HTLCs, grouped by their upstream origin. */
private def getHtlcsRelayedOut(nodeParams: NodeParams, channels: Seq[PersistentChannelData], htlcsIn: Seq[IncomingHtlc])(implicit log: LoggingAdapter): Map[Origin.Cold, Set[(ByteVector32, Long)]] = {
private def getHtlcsRelayedOut(channels: Seq[PersistentChannelDataWithKeys], htlcsIn: Seq[IncomingHtlc])(implicit log: LoggingAdapter): Map[Origin.Cold, Set[(ByteVector32, Long)]] = {
val htlcsOut = channels
.collect { case c: ChannelDataWithCommitments => c }
.flatMap { c =>
.collect { case PersistentChannelDataWithKeys(c: ChannelDataWithCommitments, channelKeys) =>
// Filter out HTLCs that will never reach the blockchain or have already been settled on-chain.
val htlcsToIgnore: Set[Long] = c match {
case d: DATA_CLOSING =>
Expand All @@ -429,7 +428,6 @@ object PostRestartHtlcCleaner {
case Some(_: Closing.MutualClose) => Set.empty
case None => Set.empty
}
val channelKeys = nodeParams.channelKeyManager.channelKeys(d.commitments.channelParams.channelConfig, d.commitments.localChannelParams.fundingKeyPath)
val timedOutHtlcs: Set[Long] = (closingType_opt match {
case Some(c: Closing.LocalClose) => confirmedTxs.flatMap(tx => Closing.trimmedOrTimedOutHtlcs(channelKeys, d.commitments.latest, c.localCommit, tx))
case Some(c: Closing.RemoteClose) => confirmedTxs.flatMap(tx => Closing.trimmedOrTimedOutHtlcs(channelKeys, d.commitments.latest, c.remoteCommit, tx))
Expand All @@ -444,7 +442,7 @@ object PostRestartHtlcCleaner {
c.commitments.originChannels.collect {
case (outgoingHtlcId, origin: Origin.Cold) if !htlcsToIgnore.contains(outgoingHtlcId) => (origin, c.channelId, outgoingHtlcId)
}
}
}.flatten
groupByOrigin(htlcsOut, htlcsIn)
}

Expand All @@ -454,8 +452,8 @@ object PostRestartHtlcCleaner {
* and before it has effectively been removed. Such closed channels will automatically be removed once the channel is
* restored.
*/
private def listLocalChannels(channels: Seq[PersistentChannelData]): Seq[PersistentChannelData] =
channels.filterNot(c => Closing.isClosed(c, None).isDefined)
private def listLocalChannels(channels: Seq[PersistentChannelDataWithKeys]): Seq[PersistentChannelDataWithKeys] =
channels.filterNot(c => Closing.isClosed(c.channelData, None).isDefined)

/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]] in a database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
val switchboard = system.actorOf(Switchboard.props(nodeParams, peerFactory), "switchboard")
val paymentFactory = PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)
val paymentInitiator = system.actorOf(PaymentInitiator.props(nodeParams, paymentFactory), "payment-initiator")
val channels = nodeParams.db.channels.listLocalChannels()
val channels = nodeParams.db.channels.listLocalChannels().map(_.withChannelKeys(nodeParams))
val postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard, router.toTyped, register, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
switchboard ! Switchboard.Init(channels)
relayer ! PostRestartHtlcCleaner.Init(channels)
Expand Down
Loading
Loading