Skip to content

Commit

Permalink
Add support for option_compression
Browse files Browse the repository at this point in the history
Implement lightning/bolts#825

This lets us specify which compression algorithms we support and use one
that our peer supports as well when syncing the network graph.
  • Loading branch information
t-bast committed Sep 15, 2021
1 parent fb0199c commit 94f6c64
Show file tree
Hide file tree
Showing 27 changed files with 411 additions and 248 deletions.
3 changes: 2 additions & 1 deletion eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ eclair {
// Do not enable option_anchor_outputs unless you really know what you're doing.
option_anchor_outputs = disabled
option_anchors_zero_fee_htlc_tx = disabled
option_compression = optional
option_shutdown_anysegwit = optional
trampoline_payment = disabled
keysend = disabled
Expand Down Expand Up @@ -189,7 +190,7 @@ eclair {

sync {
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know
encoding-type = zlib // encoding for short_channel_ids and timestamps in query channel sync messages; other possible value is "uncompressed"
preferred-compression-algorithm = zlib // encoding for short_channel_ids and timestamps in query channel sync messages; other possible value is "uncompressed"
channel-range-chunk-size = 1500 // max number of short_channel_ids (+ timestamps + checksums) in reply_channel_range *do not change this unless you know what you are doing*
channel-query-chunk-size = 100 // max number of short_channel_ids in query_short_channel_ids *do not change this unless you know what you are doing*
}
Expand Down
6 changes: 6 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/Features.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ object Features {
val mandatory = 22
}

case object CompressionSupport extends Feature {
val rfcName = "option_compression"
val mandatory = 24
}

case object ShutdownAnySegwit extends Feature {
val rfcName = "option_shutdown_anysegwit"
val mandatory = 26
Expand Down Expand Up @@ -230,6 +235,7 @@ object Features {
StaticRemoteKey,
AnchorOutputs,
AnchorOutputsZeroFeeHtlcTx,
CompressionSupport,
ShutdownAnySegwit,
KeySend
)
Expand Down
12 changes: 6 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import fr.acinq.eclair.router.Graph.WeightRatios
import fr.acinq.eclair.router.PathFindingExperimentConf
import fr.acinq.eclair.router.Router.{MultiPartParams, PathFindingConf, RouterConf, SearchBoundaries}
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.protocol.{Color, EncodingType, NodeAddress}
import fr.acinq.eclair.wire.protocol.{Color, CompressionAlgorithm, NodeAddress}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

Expand Down Expand Up @@ -213,6 +213,7 @@ object NodeParams extends Logging {
"router.path-finding.ratio-channel-capacity" -> "router.path-finding.default.ratios.channel-capacity",
"router.path-finding.hop-cost-base-msat" -> "router.path-finding.default.hop-cost.fee-base-msat",
"router.path-finding.hop-cost-millionths" -> "router.path-finding.default.hop-cost.fee-proportional-millionths",
"router.sync.encoding-type" -> "router.sync.preferred-compression-algorithm"
)
deprecatedKeyPaths.foreach {
case (old, new_) => require(!config.hasPath(old), s"configuration key '$old' has been replaced by '$new_'")
Expand Down Expand Up @@ -340,15 +341,14 @@ object NodeParams extends Logging {
experimentName = name,
experimentPercentage = config.getInt("percentage"))


def getPathFindingExperimentConf(config: Config): PathFindingExperimentConf = {
val experiments = config.root.asScala.keys.map(name => name -> getPathFindingConf(config.getConfig(name), name))
PathFindingExperimentConf(experiments.toMap)
}

val routerSyncEncodingType = config.getString("router.sync.encoding-type") match {
case "uncompressed" => EncodingType.UNCOMPRESSED
case "zlib" => EncodingType.COMPRESSED_ZLIB
val routerSyncPreferredCompression = config.getString("router.sync.preferred-compression-algorithm") match {
case "uncompressed" => CompressionAlgorithm.Uncompressed
case "zlib" => CompressionAlgorithm.ZlibDeflate
}

NodeParams(
Expand Down Expand Up @@ -429,7 +429,7 @@ object NodeParams extends Logging {
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
networkStatsRefreshInterval = FiniteDuration(config.getDuration("router.network-stats-interval").getSeconds, TimeUnit.SECONDS),
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
encodingType = routerSyncEncodingType,
preferredCompression = routerSyncPreferredCompression,
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments"))
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ object Peer {
case object GetPeerInfo
case class PeerInfo(nodeId: PublicKey, state: String, address: Option[InetSocketAddress], channels: Int)

case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes
case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, remoteInit: protocol.Init, message: RoutingMessage) extends RemoteTypes

case class Transition(previousData: Peer.Data, nextData: Peer.Data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.info(s"using features=$localFeatures")
val localInit = protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil)))
val localInit = protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.CompressionAlgorithms(Set(CompressionAlgorithm.Uncompressed, CompressionAlgorithm.ZlibDeflate))))
d.transport ! localInit
startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout)
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync)
Expand Down Expand Up @@ -234,7 +234,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A

case Event(DelayedRebroadcast(rebroadcast), d: ConnectedData) =>

val thisRemote = RemoteGossip(self, d.remoteNodeId)
val thisRemote = RemoteGossip(self, d.remoteNodeId, d.remoteInit)

/**
* Send and count in a single iteration
Expand Down Expand Up @@ -285,7 +285,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.ReadAck(msg)
case _ =>
// Note: we don't ack messages here because we don't want them to be stacked in the router's mailbox
router ! Peer.PeerRoutingMessage(self, d.remoteNodeId, msg)
router ! Peer.PeerRoutingMessage(self, d.remoteNodeId, d.remoteInit, msg)
}
stay()

Expand Down Expand Up @@ -347,7 +347,8 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
case Event(DoSync(replacePrevious), d: ConnectedData) =>
val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
val hasCompatibleCompression = CompressionAlgorithm.select(d.localInit.compressionAlgorithms, d.remoteInit.compressionAlgorithms).nonEmpty
if ((canUseChannelRangeQueries || canUseChannelRangeQueriesEx) && hasCompatibleCompression) {
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
log.info(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import fr.acinq.eclair.io.Switchboard.RouterPeerConf
import fr.acinq.eclair.io.{ClientSpawner, Peer, PeerConnection, Switchboard}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Graph.WeightRatios
import fr.acinq.eclair.router.Router.{GossipDecision, MultiPartParams, PathFindingConf, RouteParams, RouterConf, SearchBoundaries, SendChannelQuery}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._
Expand All @@ -50,9 +50,9 @@ object EclairInternalsSerializer {

val searchBoundariesCodec: Codec[SearchBoundaries] = (
("maxFee" | millisatoshi) ::
("maxFeeProportional" | double) ::
("maxRouteLength" | int32) ::
("maxCltv" | int32.as[CltvExpiryDelta])).as[SearchBoundaries]
("maxFeeProportional" | double) ::
("maxRouteLength" | int32) ::
("maxCltv" | int32.as[CltvExpiryDelta])).as[SearchBoundaries]

val relayFeesCodec: Codec[RelayFees] = (
("feeBase" | millisatoshi) ::
Expand All @@ -78,17 +78,19 @@ object EclairInternalsSerializer {
("experimentPercentage" | int32)).as[PathFindingConf]

val pathFindingExperimentConfCodec: Codec[PathFindingExperimentConf] = (
("experiments" | listOfN(int32, pathFindingConfCodec).xmap[Map[String, PathFindingConf]](_.map(e => (e.experimentName -> e)).toMap, _.values.toList))
"experiments" | listOfN(int32, pathFindingConfCodec).xmap[Map[String, PathFindingConf]](_.map(e => e.experimentName -> e).toMap, _.values.toList)
).as[PathFindingExperimentConf]

private val compressionAlgorithmCodec: Codec[CompressionAlgorithm] = discriminated[CompressionAlgorithm].by(uint8)
.typecase(CompressionAlgorithm.Uncompressed.bitPosition, provide(CompressionAlgorithm.Uncompressed))
.typecase(CompressionAlgorithm.ZlibDeflate.bitPosition, provide(CompressionAlgorithm.ZlibDeflate))

val routerConfCodec: Codec[RouterConf] = (
("channelExcludeDuration" | finiteDurationCodec) ::
("routerBroadcastInterval" | finiteDurationCodec) ::
("networkStatsRefreshInterval" | finiteDurationCodec) ::
("requestNodeAnnouncements" | bool(8)) ::
("encodingType" | discriminated[EncodingType].by(uint8)
.typecase(0, provide(EncodingType.UNCOMPRESSED))
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
("preferredCompression" | compressionAlgorithmCodec) ::
("channelRangeChunkSize" | int32) ::
("channelQueryChunkSize" | int32) ::
("pathFindingExperimentConf" | pathFindingExperimentConfCodec)).as[RouterConf]
Expand Down Expand Up @@ -161,6 +163,7 @@ object EclairInternalsSerializer {
def peerRoutingMessageCodec(system: ExtendedActorSystem): Codec[PeerRoutingMessage] = (
("peerConnection" | actorRefCodec(system)) ::
("remoteNodeId" | publicKey) ::
("remoteInit" | lengthPrefixedInitCodec) ::
("msg" | lengthPrefixedLightningMessageCodec.downcast[RoutingMessage])).as[PeerRoutingMessage]

val singleChannelDiscoveredCodec: Codec[SingleChannelDiscovered] = (lengthPrefixedChannelAnnouncementCodec :: satoshi :: optional(bool(8), lengthPrefixedChannelUpdateCodec) :: optional(bool(8), lengthPrefixedChannelUpdateCodec)).as[SingleChannelDiscovered]
Expand Down
34 changes: 17 additions & 17 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
stay() using RouteCalculation.handleRouteRequest(d, nodeParams.routerConf, nodeParams.currentBlockHeight, r)

// Warning: order matters here, this must be the first match for HasChainHash messages !
case Event(PeerRoutingMessage(_, _, routingMessage: HasChainHash), _) if routingMessage.chainHash != nodeParams.chainHash =>
case Event(PeerRoutingMessage(_, _, _, routingMessage: HasChainHash), _) if routingMessage.chainHash != nodeParams.chainHash =>
sender() ! TransportHandler.ReadAck(routingMessage)
log.warning("message {} for wrong chain {}, we're on {}", routingMessage, routingMessage.chainHash, nodeParams.chainHash)
stay()

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, c: ChannelAnnouncement), d) =>
stay() using Validation.handleChannelAnnouncement(d, nodeParams.db.network, watcher, RemoteGossip(peerConnection, remoteNodeId), c)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, c: ChannelAnnouncement), d) =>
stay() using Validation.handleChannelAnnouncement(d, nodeParams.db.network, watcher, RemoteGossip(peerConnection, remoteNodeId, remoteInit), c)

case Event(r: ValidateResult, d) =>
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)
Expand All @@ -240,14 +240,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
case Event(n: NodeAnnouncement, d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, n: NodeAnnouncement), d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(RemoteGossip(peerConnection, remoteNodeId)), n)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, n: NodeAnnouncement), d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(RemoteGossip(peerConnection, remoteNodeId, remoteInit)), n)

case Event(u: ChannelUpdate, d: Data) =>
stay() using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, Set(LocalGossip))))

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, u: ChannelUpdate), d) =>
stay() using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, Set(RemoteGossip(peerConnection, remoteNodeId)))))
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, u: ChannelUpdate), d) =>
stay() using Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, Set(RemoteGossip(peerConnection, remoteNodeId, remoteInit)))))

case Event(lcu: LocalChannelUpdate, d: Data) =>
stay() using Validation.handleLocalChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, nodeParams.nodeId, watcher, lcu)
Expand All @@ -261,19 +261,19 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
case Event(s: SendChannelQuery, d) =>
stay() using Sync.handleSendChannelQuery(d, s)

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) =>
Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, q: QueryChannelRange), d) =>
Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId, remoteInit), q)
stay()

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyChannelRange), d) =>
stay() using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), r)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, r: ReplyChannelRange), d) =>
stay() using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId, remoteInit), r)

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryShortChannelIds), d) =>
Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId), q)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, q: QueryShortChannelIds), d) =>
Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId, remoteInit), q)
stay()

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyShortChannelIdsEnd), d) =>
stay() using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(peerConnection, remoteNodeId), r)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, remoteInit, r: ReplyShortChannelIdsEnd), d) =>
stay() using Sync.handleReplyShortChannelIdsEnd(d, RemoteGossip(peerConnection, remoteNodeId, remoteInit), r)

}

Expand Down Expand Up @@ -325,7 +325,7 @@ object Router {
routerBroadcastInterval: FiniteDuration,
networkStatsRefreshInterval: FiniteDuration,
requestNodeAnnouncements: Boolean,
encodingType: EncodingType,
preferredCompression: CompressionAlgorithm,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
pathFindingExperimentConf: PathFindingExperimentConf)
Expand Down Expand Up @@ -558,7 +558,7 @@ object Router {
// @formatter:off
sealed trait GossipOrigin
/** Gossip that we received from a remote peer. */
case class RemoteGossip(peerConnection: ActorRef, nodeId: PublicKey) extends GossipOrigin
case class RemoteGossip(peerConnection: ActorRef, nodeId: PublicKey, remoteInit: Init) extends GossipOrigin
/** Gossip that was generated by our node. */
case object LocalGossip extends GossipOrigin

Expand Down
Loading

0 comments on commit 94f6c64

Please sign in to comment.