Skip to content

Commit

Permalink
Removed cache from MaturedBlocksProvider and reduced batch side, adde…
Browse files Browse the repository at this point in the history
…d 60s timeout (stratisproject#340)

* Revert "Add caching to MaturedBlocksProvider (stratisproject#334)"

This reverts commit 4ffdacb.

* exception logging

* Reduce batch size

* make sure we don't trigger timeout

* improved tests by not asserting logs

* fixes per review
  • Loading branch information
noescape00 authored and bokobza committed Dec 20, 2018
1 parent d863a00 commit 8182b55
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Stratis.FederatedPeg.Features.FederationGateway.RestClients
public interface IFederationGatewayClient
{
/// <summary><see cref="FederationGatewayController.PushCurrentBlockTip"/></summary>
Task PushCurrentBlockTipAsync(BlockTipModel model, CancellationToken cancellation = default(CancellationToken));
Task<HttpResponseMessage> PushCurrentBlockTipAsync(BlockTipModel model, CancellationToken cancellation = default(CancellationToken));

/// <summary><see cref="FederationGatewayController.GetMaturedBlockDepositsAsync"/></summary>
Task<List<MaturedBlockDepositsModel>> GetMaturedBlockDepositsAsync(MaturedBlockRequestModel model, CancellationToken cancellation = default(CancellationToken));
Expand All @@ -27,7 +27,7 @@ public FederationGatewayClient(ILoggerFactory loggerFactory, IFederationGatewayS
}

/// <inheritdoc />
public Task PushCurrentBlockTipAsync(BlockTipModel model, CancellationToken cancellation = default(CancellationToken))
public Task<HttpResponseMessage> PushCurrentBlockTipAsync(BlockTipModel model, CancellationToken cancellation = default(CancellationToken))
{
return this.SendPostRequestAsync(model, FederationGatewayRouteEndPoint.PushCurrentBlockTip, cancellation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public abstract class RestApiClientBase
public const int RetryCount = 3;

/// <summary>Delay between retries.</summary>
public const int AttemptDelayMs = 1000;
private const int AttemptDelayMs = 1000;

public const int TimeoutMs = 60_000;

private readonly RetryPolicy policy;

Expand All @@ -35,7 +37,7 @@ public RestApiClientBase(ILoggerFactory loggerFactory, IFederationGatewaySetting

this.endpointUrl = $"http://localhost:{settings.CounterChainApiPort}/api/FederationGateway";

this.policy = Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: RetryCount, sleepDurationProvider:
this.policy = Policy.Handle<HttpRequestException>().WaitAndRetryAsync(retryCount: RetryCount, sleepDurationProvider:
attemptNumber =>
{
// Intervals between new attempts are growing.
Expand All @@ -50,12 +52,17 @@ public RestApiClientBase(ILoggerFactory loggerFactory, IFederationGatewaySetting

protected async Task<HttpResponseMessage> SendPostRequestAsync<Model>(Model requestModel, string apiMethodName, CancellationToken cancellation) where Model : class
{
if (requestModel == null)
throw new ArgumentException($"{nameof(requestModel)} can't be null.");

var publicationUri = new Uri($"{this.endpointUrl}/{apiMethodName}");

HttpResponseMessage response = null;

using (HttpClient client = this.httpClientFactory.CreateClient())
{
client.Timeout = TimeSpan.FromMilliseconds(TimeoutMs);

var request = new JsonContent(requestModel);

try
Expand All @@ -77,10 +84,10 @@ await this.policy.ExecuteAsync(async token =>
this.logger.LogTrace("(-)[CANCELLED]:null");
return null;
}
catch (Exception ex)
catch (HttpRequestException ex)
{
this.logger.LogError("The counter-chain daemon is not ready to receive API calls at this time ({0})", publicationUri);
this.logger.LogDebug(ex, "Failed to send {0}", requestModel);
this.logger.LogError("Failed to send a message. Exception: '{0}'.", ex);
return new HttpResponseMessage() { ReasonPhrase = ex.Message, StatusCode = HttpStatusCode.InternalServerError };
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NBitcoin;
using Stratis.Bitcoin.Consensus;
using Stratis.Bitcoin.Primitives;
using Stratis.FederatedPeg.Features.FederationGateway.Interfaces;
using Stratis.FederatedPeg.Features.FederationGateway.Models;
using Stratis.FederatedPeg.Features.FederationGateway.RestClients;

namespace Stratis.FederatedPeg.Features.FederationGateway.SourceChain
{
Expand All @@ -31,15 +32,12 @@ public class MaturedBlocksProvider : IMaturedBlocksProvider

private readonly ILogger logger;

private Dictionary<int, MaturedBlockDepositsModel> depositCache;

public MaturedBlocksProvider(ILoggerFactory loggerFactory, IDepositExtractor depositExtractor, IConsensusManager consensusManager)
{
this.depositExtractor = depositExtractor;
this.consensusManager = consensusManager;

this.logger = loggerFactory.CreateLogger(this.GetType().FullName);
this.depositCache = new Dictionary<int, MaturedBlockDepositsModel>();
}

/// <inheritdoc />
Expand All @@ -54,51 +52,30 @@ public async Task<List<MaturedBlockDepositsModel>> GetMaturedDepositsAsync(int b
throw new InvalidOperationException($"Block height {blockHeight} submitted is not mature enough. Blocks less than a height of {matureTipHeight} can be processed.");
}

// Cache clean-up.
lock (this.depositCache)
{
// The requested height gives away the fact that the peer is probably no longer interested in cached entries below that height.
// Keep an additional 1,000 blocks anyway in case there are some parallel request that are still executing for lower heights.
foreach (int i in this.depositCache.Where(d => d.Key < (blockHeight - 1000)).Select(d => d.Key).ToArray())
this.depositCache.Remove(i);
}

var maturedBlocks = new List<MaturedBlockDepositsModel>();

// Don't spend to much time that the requester may give up.
DateTime deadLine = DateTime.Now.AddSeconds(30);
// Half of the timeout. We will also need time to convert it to json.
int maxTimeCollectionCanTakeMs = RestApiClientBase.TimeoutMs / 2;
var cancellation = new CancellationTokenSource(maxTimeCollectionCanTakeMs);

for (int i = blockHeight; (i <= matureTipHeight) && (i < blockHeight + maxBlocks); i++)
{
MaturedBlockDepositsModel maturedBlockDeposits = null;
ChainedHeader currentHeader = consensusTip.GetAncestor(i);

// First try the cache.
lock (this.depositCache)
{
this.depositCache.TryGetValue(i, out maturedBlockDeposits);
}
ChainedHeaderBlock block = await this.consensusManager.GetBlockDataAsync(currentHeader.HashBlock).ConfigureAwait(false);

MaturedBlockDepositsModel maturedBlockDeposits = this.depositExtractor.ExtractBlockDeposits(block);

// If not in cache..
if (maturedBlockDeposits == null)
{
ChainedHeader currentHeader = consensusTip.GetAncestor(i);
ChainedHeaderBlock block = await this.consensusManager.GetBlockDataAsync(currentHeader.HashBlock).ConfigureAwait(false);
maturedBlockDeposits = this.depositExtractor.ExtractBlockDeposits(block);

if (maturedBlockDeposits == null)
throw new InvalidOperationException($"Unable to get deposits for block at height {currentHeader.Height}");

// Save this so that we don't need to scan the block again.
lock (this.depositCache)
{
this.depositCache[i] = maturedBlockDeposits;
}
}
throw new InvalidOperationException($"Unable to get deposits for block at height {currentHeader.Height}");

maturedBlocks.Add(maturedBlockDeposits);

if (DateTime.Now >= deadLine)
if (cancellation.IsCancellationRequested && maturedBlocks.Count > 0)
{
this.logger.LogDebug("Stop matured blocks collection because it's taking too long. Send what we've collected.");
break;
}
}

return maturedBlocks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class MaturedBlocksSyncManager : IMaturedBlocksSyncManager
private Task blockRequestingTask;

/// <summary>The maximum amount of blocks to request at a time from alt chain.</summary>
private const int MaxBlocksToRequest = 1000;
private const int MaxBlocksToRequest = 100;

/// <summary>When we are fully synced we stop asking for more blocks for this amount of time.</summary>
private const int RefreshDelayMs = 10_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ public async Task SendBlockTip_Should_Log_Error_When_Failing_To_Send_IBlockTipAs
{
var blockTip = new BlockTipModel(TestingValues.GetUint256(), TestingValues.GetPositiveInt(), TestingValues.GetPositiveInt());

await this.createClient(true).PushCurrentBlockTipAsync(blockTip).ConfigureAwait(false);

this.logger.Received(1).Log<object>(LogLevel.Error, 0, Arg.Any<object>(), Arg.Is<Exception>(e => e == null), Arg.Any<Func<object, Exception, string>>());
HttpResponseMessage result = await this.createClient(true).PushCurrentBlockTipAsync(blockTip).ConfigureAwait(false);
Assert.False(result.IsSuccessStatusCode);
}

/// <inheritdoc />
Expand Down
2 changes: 1 addition & 1 deletion src/Stratis.FederatedPeg.Tests/Utils/TestingHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private static void PrepareHttpClient(ref HttpMessageHandler httpMessageHandler,
object sendCall = httpMessageHandler.Protected("SendAsync", Arg.Any<HttpRequestMessage>(), Arg.Any<CancellationToken>());

if (failingClient)
sendCall.ThrowsForAnyArgs(new Exception("failed"));
sendCall.ThrowsForAnyArgs(new HttpRequestException("failed"));
else
sendCall.Returns(Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK)));

Expand Down

0 comments on commit 8182b55

Please sign in to comment.