Skip to content

Commit

Permalink
Fix rare race conditions in integration tests (#1653)
Browse files Browse the repository at this point in the history
When a channel goes to the CLOSED state, the actor will stop itself.

We were previously sending messages to the actor asking for its state,
which returns a failure when the actor is stopped. We can simply listen
to state events to safely get the same result.
  • Loading branch information
t-bast authored Jan 6, 2021
1 parent 7343283 commit 629c2e6
Showing 1 changed file with 47 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,23 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {

/** Disconnect node C from a given F node. */
def disconnectCF(channelId: ByteVector32, sender: TestProbe = TestProbe()): Unit = {
val (stateListenerC, stateListenerF) = (TestProbe(), TestProbe())
nodes("C").system.eventStream.subscribe(stateListenerC.ref, classOf[ChannelStateChanged])
nodes("F").system.eventStream.subscribe(stateListenerF.ref, classOf[ChannelStateChanged])

sender.send(nodes("F").switchboard, Symbol("peers"))
val peers = sender.expectMsgType[Iterable[ActorRef]]
// F's only node is C
peers.head ! Peer.Disconnect(nodes("C").nodeParams.nodeId)

// we then wait for F to be in disconnected state
awaitCond({
sender.send(nodes("F").register, Register.Forward(sender.ref, channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == OFFLINE
}, max = 20 seconds, interval = 1 second)
Seq(stateListenerC, stateListenerF).foreach(listener => awaitCond({
val channelState = listener.expectMsgType[ChannelStateChanged]
channelState.currentState == OFFLINE && channelState.channelId == channelId
}, max = 20 seconds, interval = 1 second))
}

case class ForceCloseFixture(sender: TestProbe, paymentSender: TestProbe, stateListener: TestProbe, paymentId: UUID, htlc: UpdateAddHtlc, preimage: ByteVector32, minerAddress: String, finalAddressC: String, finalAddressF: String)
case class ForceCloseFixture(sender: TestProbe, paymentSender: TestProbe, stateListenerC: TestProbe, stateListenerF: TestProbe, paymentId: UUID, htlc: UpdateAddHtlc, preimage: ByteVector32, minerAddress: String, finalAddressC: String, finalAddressF: String)

/** Prepare a C <-> F channel for a force-close test by adding an HTLC that will be hodl-ed at F. */
def prepareForceCloseCF(commitmentFormat: Transactions.CommitmentFormat): ForceCloseFixture = {
Expand All @@ -116,9 +121,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
connect(nodes("C"), nodes("F"), 5000000 sat, 500000000 msat)
generateBlocks(bitcoincli, 6, Some(minerAddress))
awaitAnnouncements(2)
// we subscribe to C's channel state transitions
val stateListener = TestProbe()
nodes("C").system.eventStream.subscribe(stateListener.ref, classOf[ChannelStateChanged])
// we subscribe to channel state transitions
val stateListenerC = TestProbe()
val stateListenerF = TestProbe()
nodes("C").system.eventStream.subscribe(stateListenerC.ref, classOf[ChannelStateChanged])
nodes("F").system.eventStream.subscribe(stateListenerF.ref, classOf[ChannelStateChanged])
// first we make sure we are in sync with current blockchain height
val currentBlockCount = getBlockCount
awaitCond(getBlockCount == currentBlockCount, max = 20 seconds, interval = 1 second)
Expand All @@ -143,7 +150,7 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val dataF = sender.expectMsgType[RES_GETSTATEDATA[DATA_NORMAL]].data
assert(dataF.commitments.commitmentFormat === commitmentFormat)
val finalAddressF = scriptPubKeyToAddress(dataF.commitments.localParams.defaultFinalScriptPubKey)
ForceCloseFixture(sender, paymentSender, stateListener, paymentId, htlc, preimage, minerAddress, finalAddressC, finalAddressF)
ForceCloseFixture(sender, paymentSender, stateListenerC, stateListenerF, paymentId, htlc, preimage, minerAddress, finalAddressC, finalAddressF)
}

def testDownstreamFulfillLocalCommit(commitmentFormat: Transactions.CommitmentFormat): Unit = {
Expand All @@ -158,10 +165,7 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_FORCECLOSE(sender.ref)))
sender.expectMsgType[RES_SUCCESS[CMD_FORCECLOSE]]
// we then wait for F to detect the unilateral close and go to CLOSING state
awaitCond({
sender.send(nodes("F").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSING
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
// we generate a few blocks to get the commit tx confirmed
generateBlocks(bitcoincli, 3, Some(minerAddress))
// we then fulfill the htlc, which will make F redeem it on-chain
Expand All @@ -181,11 +185,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
// we generate blocks to make txs confirm
generateBlocks(bitcoincli, 2, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond({
sender.send(nodes("F").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitAnnouncements(1)
}

Expand All @@ -200,7 +201,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
// then we have F unilaterally close the channel
sender.send(nodes("F").register, Register.Forward(sender.ref, htlc.channelId, CMD_FORCECLOSE(sender.ref)))
sender.expectMsgType[RES_SUCCESS[CMD_FORCECLOSE]]
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
// we generate a few blocks to get the commit tx confirmed
generateBlocks(bitcoincli, 3, Some(minerAddress))
// we then fulfill the htlc (it won't be sent to C, and will be used to pull funds on-chain)
Expand All @@ -220,11 +222,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
// we generate blocks to make txs confirm
generateBlocks(bitcoincli, 2, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond({
sender.send(nodes("F").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitAnnouncements(1)
}

Expand All @@ -240,7 +239,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
disconnectCF(htlc.channelId, sender)
// we generate enough blocks to reach the htlc timeout
generateBlocks(bitcoincli, (htlc.cltvExpiry.toLong - getBlockCount).toInt, Some(minerAddress))
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATEDATA(ActorRef.noSender)))
val Some(localCommit) = sender.expectMsgType[RES_GETSTATEDATA[DATA_CLOSING]].data.localCommitPublished
// we wait until the commit tx has been broadcast
Expand Down Expand Up @@ -270,11 +270,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
// we generate blocks to make txs confirm
generateBlocks(bitcoincli, 2, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond({
sender.send(nodes("F").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -324,11 +321,8 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
// we generate blocks to make tx confirm
generateBlocks(bitcoincli, 2, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond({
sender.send(nodes("F").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -529,27 +523,24 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec {

awaitAnnouncements(2)

val stateListener = TestProbe()
funder.system.eventStream.subscribe(stateListener.ref, classOf[ChannelStateChanged])

// close that wumbo channel
sender.send(nodes("C").register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA(ActorRef.noSender)))
sender.send(funder.register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA(ActorRef.noSender)))
val commitmentsC = sender.expectMsgType[RES_GETSTATEDATA[DATA_NORMAL]].data.commitments
val finalPubKeyScriptC = commitmentsC.localParams.defaultFinalScriptPubKey
val fundingOutpoint = commitmentsC.commitInput.outPoint
sender.send(nodes("F").register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA(ActorRef.noSender)))
sender.send(fundee.register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA(ActorRef.noSender)))
val finalPubKeyScriptF = sender.expectMsgType[RES_GETSTATEDATA[DATA_NORMAL]].data.commitments.localParams.defaultFinalScriptPubKey

nodes("F").register ! Register.Forward(sender.ref, channelId, CMD_CLOSE(sender.ref, Some(finalPubKeyScriptF)))
fundee.register ! Register.Forward(sender.ref, channelId, CMD_CLOSE(sender.ref, Some(finalPubKeyScriptF)))
sender.expectMsgType[RES_SUCCESS[CMD_CLOSE]]
// we then wait for C and F to negotiate the closing fee
awaitCond({
sender.send(nodes("C").register, Register.Forward(sender.ref, channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSING
}, max = 20 seconds, interval = 1 second)

awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)
// and close the channel
generateBlocks(bitcoincli, 2)
awaitCond({
sender.send(nodes("C").register, Register.Forward(sender.ref, channelId, CMD_GETSTATE(ActorRef.noSender)))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)

val bitcoinClient = new ExtendedBitcoinClient(bitcoinrpcclient)
bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt).pipeTo(sender.ref)
Expand Down Expand Up @@ -671,27 +662,27 @@ class AnchorOutputChannelIntegrationSpec extends ChannelIntegrationSpec {
val Some(toRemoteOutCNew) = commitTx.txOut.find(_.publicKeyScript == Script.write(toRemoteAddress))

// there is a new commitment index in the channel state
assert(commitmentIndex == initialCommitmentIndex + 1)
assert(commitmentIndex > initialCommitmentIndex)

// script pubkeys of toRemote output remained the same across commitments
assert(toRemoteOutCNew.publicKeyScript == toRemoteOutC.publicKeyScript)
assert(toRemoteOutCNew.amount < toRemoteOutC.amount)

val stateListener = TestProbe()
nodes("C").system.eventStream.subscribe(stateListener.ref, classOf[ChannelStateChanged])

// now let's force close the channel and check the toRemote is what we had at the beginning
sender.send(nodes("F").register, Register.Forward(sender.ref, channelId, CMD_FORCECLOSE(sender.ref)))
sender.expectMsgType[RES_SUCCESS[CMD_FORCECLOSE]]
// we then wait for C to detect the unilateral close and go to CLOSING state
awaitCond({
nodes("C").register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE(ActorRef.noSender))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSING
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 60 seconds)

val bitcoinClient = new ExtendedBitcoinClient(bitcoinrpcclient)
awaitCond({
bitcoinClient.getTransaction(commitTx.txid).pipeTo(sender.ref)
val tx = sender.expectMsgType[Transaction]
bitcoinClient.getTransaction(commitTx.txid).map(tx => Some(tx)).recover(_ => None).pipeTo(sender.ref)
val tx = sender.expectMsgType[Option[Transaction]]
// the unilateral close contains the static toRemote output
tx.txOut.exists(_.publicKeyScript == toRemoteOutC.publicKeyScript)
tx.exists(_.txOut.exists(_.publicKeyScript == toRemoteOutC.publicKeyScript))
}, max = 20 seconds, interval = 1 second)

// bury the unilateral close in a block, C should claim its main output
Expand All @@ -703,10 +694,7 @@ class AnchorOutputChannelIntegrationSpec extends ChannelIntegrationSpec {

// get the claim-remote-output confirmed, then the channel can go to the CLOSED state
generateBlocks(bitcoincli, 2)
awaitCond({
nodes("C").register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE(ActorRef.noSender))
sender.expectMsgType[RES_GETSTATE[State]].state == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 60 seconds)
awaitAnnouncements(1)
}

Expand Down

0 comments on commit 629c2e6

Please sign in to comment.