Skip to content
This repository has been archived by the owner on Aug 16, 2021. It is now read-only.

Introduce AsyncLoopFactory and fix flaky blocknotification test #181

Merged
merged 1 commit into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void BlockNotificationFeatureCallsNotifyOnStart()
var chain = new Mock<ConcurrentChain>();
var chainState = new Mock<ChainBehavior.ChainState>(new Mock<FullNode>().Object);
var blockPuller = new Mock<LookaheadBlockPuller>(chain.Object, connectionManager.Object);
var blockNotification = new Mock<BlockNotification>(chain.Object, blockPuller.Object, new Signals());
var blockNotification = new Mock<BlockNotification>(chain.Object, blockPuller.Object, new Signals(), new AsyncLoopFactory());

var blockNotificationFeature = new BlockNotificationFeature(blockNotification.Object, cancellationProvider, connectionManager.Object, blockPuller.Object, chainState.Object, chain.Object);
blockNotificationFeature.Start();
Expand Down
51 changes: 41 additions & 10 deletions Stratis.Bitcoin.Tests/Notifications/BlockNotificationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using NBitcoin;
using Stratis.Bitcoin.BlockPulling;
using Stratis.Bitcoin.Notifications;
using Stratis.Bitcoin.Tests.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -11,13 +12,13 @@

namespace Stratis.Bitcoin.Tests.Notifications
{
public class BlockNotificationTest
public class BlockNotificationTest : LogsTestBase
{
private CancellationTokenSource source;

public BlockNotificationTest()
public BlockNotificationTest() : base()
{
this.source = new CancellationTokenSource();
this.source = new CancellationTokenSource();
}

[Fact]
Expand All @@ -29,7 +30,7 @@ public void NotifyStartHashNotOnChainCompletes()
chain.Setup(c => c.GetBlock(startBlockId))
.Returns((ChainedBlock)null);

var notification = new BlockNotification(chain.Object, new Mock<ILookaheadBlockPuller>().Object, new Signals());
var notification = new BlockNotification(chain.Object, new Mock<ILookaheadBlockPuller>().Object, new Signals(), new AsyncLoopFactory());

notification.Notify(this.source.Token);
}
Expand All @@ -47,7 +48,7 @@ public void NotifySetsPullerLocationToBlockMatchingStartHash()
stub.Setup(s => s.NextBlock(this.source.Token))
.Returns((Block)null);

var notification = new BlockNotification(chain.Object, stub.Object, new Signals());
var notification = new BlockNotification(chain.Object, stub.Object, new Signals(), new AsyncLoopFactory());

notification.Notify(this.source.Token);
notification.SyncFrom(startBlockId);
Expand All @@ -56,8 +57,39 @@ public void NotifySetsPullerLocationToBlockMatchingStartHash()
}

[Fact]
public void NotifyBroadcastsOnNextBlock()
public async Task NotifyWithoutSyncFromRunsWithoutBroadcastingBlocks()
{
this.source = new CancellationTokenSource(100);

var startBlockId = new uint256(156);
var chain = new Mock<ConcurrentChain>();
var header = new BlockHeader();
chain.Setup(c => c.GetBlock(startBlockId))
.Returns(new ChainedBlock(header, 0));

var stub = new Mock<ILookaheadBlockPuller>();
stub.SetupSequence(s => s.NextBlock(this.source.Token))
.Returns(new Block())
.Returns(new Block())
.Returns((Block)null);

var signals = new Mock<ISignals>();
var signalerMock = new Mock<ISignaler<Block>>();
signals.Setup(s => s.Blocks)
.Returns(signalerMock.Object);

var notification = new BlockNotification(chain.Object, stub.Object, signals.Object, new AsyncLoopFactory());

await notification.Notify(this.source.Token);

signalerMock.Verify(s => s.Broadcast(It.IsAny<Block>()), Times.Exactly(0));
}

[Fact]
public async Task NotifyWithSyncFromSetBroadcastsOnNextBlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haa xuint supports async tests?

{
this.source = new CancellationTokenSource(100);

var startBlockId = new uint256(156);
var chain = new Mock<ConcurrentChain>();
var header = new BlockHeader();
Expand All @@ -75,12 +107,11 @@ public void NotifyBroadcastsOnNextBlock()
signals.Setup(s => s.Blocks)
.Returns(signalerMock.Object);

var notification = new BlockNotification(chain.Object, stub.Object, signals.Object);
var notification = new BlockNotification(chain.Object, stub.Object, signals.Object, new AsyncLoopFactory());

notification.Notify(this.source.Token);
notification.SyncFrom(startBlockId);

Thread.Sleep(100);
await notification.Notify(this.source.Token);

signalerMock.Verify(s => s.Broadcast(It.IsAny<Block>()), Times.Exactly(2));
}
}
Expand Down
43 changes: 43 additions & 0 deletions Stratis.Bitcoin/AsyncLoopFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Stratis.Bitcoin.Utilities;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Stratis.Bitcoin
{
public interface IAsyncLoopFactory
{
IAsyncLoop Create(string name, Func<CancellationToken, Task> loop);

Task Run(string name, Func<CancellationToken, Task> loop, TimeSpan? repeatEvery = null, TimeSpan? startAfter = null);
Task Run(string name, Func<CancellationToken, Task> loop, CancellationToken cancellation, TimeSpan? repeatEvery = null, TimeSpan? startAfter = null);
}

public class AsyncLoopFactory : IAsyncLoopFactory
{
public AsyncLoopFactory()
{
}

public IAsyncLoop Create(string name, Func<CancellationToken, Task> loop)
{
return new AsyncLoop(name, loop);
}

public Task Run(string name, Func<CancellationToken, Task> loop, TimeSpan? repeatEvery = null, TimeSpan? startAfter = null)
{
return new AsyncLoop(name, loop).Run(repeatEvery, startAfter);
}

public Task Run(string name, Func<CancellationToken, Task> loop, CancellationToken cancellation, TimeSpan? repeatEvery = null, TimeSpan? startAfter = null)
{
Guard.NotNull(cancellation, nameof(cancellation));
Guard.NotEmpty(name, nameof(name));
Guard.NotNull(loop, nameof(loop));

return new AsyncLoop(name, loop).Run(cancellation, repeatEvery ?? TimeSpan.FromMilliseconds(1000), startAfter);
}
}
}
11 changes: 7 additions & 4 deletions Stratis.Bitcoin/Notifications/BlockNotification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@ public class BlockNotification
{
private readonly ISignals signals;
private ChainedBlock tip;
private IAsyncLoopFactory asyncLoopFactory;

public BlockNotification(ConcurrentChain chain, ILookaheadBlockPuller puller, ISignals signals)
public BlockNotification(ConcurrentChain chain, ILookaheadBlockPuller puller, ISignals signals, IAsyncLoopFactory asyncLoopFactory)
{
Guard.NotNull(chain, nameof(chain));
Guard.NotNull(puller, nameof(puller));
Guard.NotNull(signals, nameof(signals));
Guard.NotNull(asyncLoopFactory, nameof(asyncLoopFactory));

this.Chain = chain;
this.Puller = puller;
this.signals = signals;
}
this.asyncLoopFactory = asyncLoopFactory;
}

public ILookaheadBlockPuller Puller { get; }

Expand Down Expand Up @@ -55,9 +58,9 @@ public void SyncFrom(uint256 startHash)
/// Notifies about blocks, starting from block with hash passed as parameter.
/// </summary>
/// <param name="cancellationToken">A cancellation token</param>
public virtual void Notify(CancellationToken cancellationToken)
public virtual Task Notify(CancellationToken cancellationToken)
{
AsyncLoop.Run("block notifier", token =>
return this.asyncLoopFactory.Run("block notifier", token =>
{
// if the StartHash hasn't been set yet
if (this.StartHash == null)
Expand Down