Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Transaction Metrics #4781

Merged
merged 2 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

namespace Orleans.Transactions.Abstractions
{
public interface ITransactionAgentStatistics
{
void TrackTransactionStarted();
long TransactionsStarted { get; }

void TrackTransactionSucceeded();
long TransactionsSucceeded { get; }

void TrackTransactionFailed();
long TransactionsFailed { get; }

void TrackTransactionThrottled();
long TransactionsThrottled { get; }
}
}
79 changes: 24 additions & 55 deletions src/Orleans.Transactions/DistributedTM/TransactionAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,14 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Orleans.Runtime;
using Orleans.Concurrency;
using System.Linq;
using System.Threading;
using System.Transactions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Concurrency;
using Orleans.Transactions.Abstractions;

namespace Orleans.Transactions
{
public class TransactionAgentStatistics
{
public long TransactionStartedCounter { get; set; }

private const string TransactionStartedPerSecondMetric = "TransactionAgent.TransactionStartedPerSecond";
//Transaction started recorded at when metrics was reported last time
private long transactionStartedAtLastReported;
private DateTime lastReportTime;
private readonly ITelemetryProducer telemetryProducer;
private readonly PeriodicAction monitor;
public TransactionAgentStatistics(ITelemetryProducer telemetryProducer, IOptions<StatisticsOptions> options)
{
this.telemetryProducer = telemetryProducer;
this.lastReportTime = DateTime.UtcNow;
this.monitor = new PeriodicAction(options.Value.PerfCountersWriteInterval, this.ReportMetrics);
}

public void TryReportMetrics(DateTime timestamp)
{
this.monitor.TryAction(timestamp);
}

private void ReportMetrics()
{
var now = DateTime.UtcNow;
var txStartedDelta = TransactionStartedCounter - transactionStartedAtLastReported;
double transactionStartedPerSecond;
var timelapseSinceLastReport = now - this.lastReportTime;
if (timelapseSinceLastReport.TotalSeconds <= 1)
transactionStartedPerSecond = txStartedDelta;
else transactionStartedPerSecond = txStartedDelta * 1000 / timelapseSinceLastReport.TotalMilliseconds;
this.telemetryProducer.TrackMetric(TransactionStartedPerSecondMetric, transactionStartedPerSecond);
//record snapshot data of this report
transactionStartedAtLastReported = TransactionStartedCounter;
lastReportTime = now;
}
}

[Reentrant]
internal class TransactionAgent : ITransactionAgent
{
Expand All @@ -60,10 +18,10 @@ internal class TransactionAgent : ITransactionAgent
private readonly ILogger logger;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
private readonly CausalClock clock;
private readonly TransactionAgentStatistics statistics;
private readonly ITransactionAgentStatistics statistics;
private readonly ITransactionOverloadDetector overloadDetector;

public TransactionAgent(IClock clock, ILogger<TransactionAgent> logger, TransactionAgentStatistics statistics, ITransactionOverloadDetector overloadDetector)
public TransactionAgent(IClock clock, ILogger<TransactionAgent> logger, ITransactionAgentStatistics statistics, ITransactionOverloadDetector overloadDetector)
{
this.clock = new CausalClock(clock);
this.logger = logger;
Expand All @@ -73,20 +31,22 @@ public TransactionAgent(IClock clock, ILogger<TransactionAgent> logger, Transact

public Task<ITransactionInfo> StartTransaction(bool readOnly, TimeSpan timeout)
{
this.statistics.TryReportMetrics(DateTime.UtcNow);
if (overloadDetector.IsOverloaded())
{
this.statistics.TrackTransactionThrottled();
throw new OrleansStartTransactionFailedException(new OrleansTransactionOverloadException());
}

var guid = Guid.NewGuid();
DateTime ts = this.clock.UtcNow();

if (logger.IsEnabled(LogLevel.Trace))
logger.Trace($"{stopwatch.Elapsed.TotalMilliseconds:f2} start transaction {guid} at {ts:o}");
this.statistics.TransactionStartedCounter++;
this.statistics.TrackTransactionStarted();
return Task.FromResult<ITransactionInfo>(new TransactionInfo(guid, ts, ts));
}

public Task<TransactionalStatus> Commit(ITransactionInfo info)
public async Task<TransactionalStatus> Commit(ITransactionInfo info)
{
var transactionInfo = (TransactionInfo)info;

Expand All @@ -108,13 +68,21 @@ public Task<TransactionalStatus> Commit(ITransactionInfo info)
}
}

if (writeParticipants == null)
try
{
return CommitReadOnlyTransaction(transactionInfo);
TransactionalStatus status = (writeParticipants == null)
? await CommitReadOnlyTransaction(transactionInfo)
: await CommitReadWriteTransaction(transactionInfo, writeParticipants);
if (status == TransactionalStatus.Ok)
this.statistics.TrackTransactionSucceeded();
else
this.statistics.TrackTransactionFailed();
return status;
}
else
catch (Exception)
{
return CommitReadWriteTransaction(transactionInfo, writeParticipants);
this.statistics.TrackTransactionFailed();
throw;
}
}

Expand Down Expand Up @@ -188,7 +156,7 @@ private async Task<TransactionalStatus> CommitReadWriteTransaction(TransactionIn
try
{
// wait for the TM to commit the transaction
var status = await tmPrepareAndCommitTask;
TransactionalStatus status = await tmPrepareAndCommitTask;

if (status != TransactionalStatus.Ok)
{
Expand Down Expand Up @@ -227,9 +195,10 @@ private async Task<TransactionalStatus> CommitReadWriteTransaction(TransactionIn

public void Abort(ITransactionInfo info, OrleansTransactionAbortedException reason)
{
this.statistics.TrackTransactionFailed();
var transactionInfo = (TransactionInfo)info;

var participants = transactionInfo.Participants.Keys.ToList();
List<ITransactionParticipant> participants = transactionInfo.Participants.Keys.ToList();

if (logger.IsEnabled(LogLevel.Trace))
logger.Trace($"abort {transactionInfo} {string.Join(",", participants.Select(p => p.ToString()))} {reason}");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System.Threading;
using Orleans.Transactions.Abstractions;

namespace Orleans.Transactions
{
public class TransactionAgentStatistics : ITransactionAgentStatistics
{
private long transactionsStarted;
public long TransactionsStarted => transactionsStarted;

private long transactionsSucceeded;
public long TransactionsSucceeded => transactionsSucceeded;

private long transactionsFailed;
public long TransactionsFailed => transactionsFailed;

private long transactionsThrottled;
public long TransactionsThrottled => transactionsThrottled;

public void TrackTransactionStarted()
{
Interlocked.Increment(ref this.transactionsStarted);
}

public void TrackTransactionSucceeded()
{
Interlocked.Increment(ref this.transactionsSucceeded);
}

public void TrackTransactionFailed()
{
Interlocked.Increment(ref this.transactionsFailed);
}

public void TrackTransactionThrottled()
{
Interlocked.Increment(ref this.transactionsThrottled);
}

public static ITransactionAgentStatistics Copy(ITransactionAgentStatistics initialStatistics)
{
return new TransactionAgentStatistics
{
transactionsStarted = initialStatistics.TransactionsStarted,
transactionsSucceeded = initialStatistics.TransactionsSucceeded,
transactionsFailed = initialStatistics.TransactionsFailed,
transactionsThrottled = initialStatistics.TransactionsThrottled
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Transactions.Abstractions;

namespace Orleans.Transactions
{
public class TransactionAgentStatisticsReporter : ILifecycleParticipant<ISiloLifecycle>
{
private const string TransactionsStartedTotalMetric = "TransactionAgent.TransactionsStarted.Total";
private const string TransactionsStartedPerSecondMetric = "TransactionAgent.TransactionsStarted.PerSecond";

private const string SuccessfulTransactionsTotalMetric = "TransactionAgent.SuccessfulTransactions.Total";
private const string SuccessfulTransactionsPerSecondMetric = "TransactionAgent.SuccessfulTransactions.PerSecond";

private const string FailedTransactionsTotalMetric = "TransactionAgent.FailedTransactions.Total";
private const string FailedTransactionsPerSecondMetric = "TransactionAgent.FailedTransactions.PerSecond";

private const string ThrottledTransactionsTotalMetric = "TransactionAgent.ThrottledTransactions.Total";
private const string ThrottledTransactionsPerSecondMetric = "TransactionAgent.ThrottledTransactions.PerSecond";

private readonly ITransactionAgentStatistics statistics;
private readonly ITelemetryProducer telemetryProducer;
private readonly StatisticsOptions statisticsOptions;

private ITransactionAgentStatistics lastReported;
private DateTime lastReportTime;
private IDisposable timer;

public TransactionAgentStatisticsReporter(ITransactionAgentStatistics statistics, ITelemetryProducer telemetryProducer, IOptions<StatisticsOptions> options)
{
this.statistics = statistics ?? throw new ArgumentNullException(nameof(statistics));
this.telemetryProducer = telemetryProducer ?? throw new ArgumentNullException(nameof(statistics));
this.statisticsOptions = options.Value;
this.lastReported = TransactionAgentStatistics.Copy(statistics);
this.lastReportTime = DateTime.UtcNow;
}

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe<TransactionAgentStatisticsReporter>(ServiceLifecycleStage.Active, OnStart, OnStop);
}

private Task OnStart(CancellationToken tc)
{
this.timer = new Timer(ReportMetrics, null, this.statisticsOptions.PerfCountersWriteInterval, this.statisticsOptions.PerfCountersWriteInterval);
return Task.CompletedTask;
}

private Task OnStop(CancellationToken tc)
{
this.timer?.Dispose();
this.timer = null;
return Task.CompletedTask;
}


private void ReportMetrics(object ignore)
{
ITransactionAgentStatistics currentReported = TransactionAgentStatistics.Copy(statistics);
var now = DateTime.UtcNow;
TimeSpan reportPeriod = now - this.lastReportTime;

this.telemetryProducer.TrackMetric(TransactionsStartedTotalMetric, currentReported.TransactionsStarted);
this.telemetryProducer.TrackMetric(TransactionsStartedPerSecondMetric, PerSecond(this.lastReported.TransactionsStarted, currentReported.TransactionsStarted, reportPeriod));

this.telemetryProducer.TrackMetric(SuccessfulTransactionsTotalMetric, currentReported.TransactionsSucceeded);
this.telemetryProducer.TrackMetric(SuccessfulTransactionsPerSecondMetric, PerSecond(this.lastReported.TransactionsSucceeded, currentReported.TransactionsSucceeded, reportPeriod));

this.telemetryProducer.TrackMetric(FailedTransactionsTotalMetric, currentReported.TransactionsFailed);
this.telemetryProducer.TrackMetric(FailedTransactionsPerSecondMetric, PerSecond(this.lastReported.TransactionsFailed, currentReported.TransactionsFailed, reportPeriod));

this.telemetryProducer.TrackMetric(ThrottledTransactionsTotalMetric, currentReported.TransactionsThrottled);
this.telemetryProducer.TrackMetric(ThrottledTransactionsPerSecondMetric, PerSecond(this.lastReported.TransactionsThrottled, currentReported.TransactionsThrottled, reportPeriod));

this.lastReportTime = now;
this.lastReported = currentReported;
}

private long PerSecond(long start, long end, TimeSpan time)
{
return ((end - start) * 1000) / Math.Max(1,(long)time.TotalMilliseconds);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Transactions.Abstractions;

namespace Orleans.Transactions
{
Expand Down Expand Up @@ -34,28 +31,29 @@ public class TransactionRateLoadSheddingOptions

public class TransactionOverloadDetector : ITransactionOverloadDetector
{
private readonly TransactionAgentStatistics statistics;
private readonly ITransactionAgentStatistics statistics;
private readonly TransactionRateLoadSheddingOptions options;
private readonly PeriodicAction monitor;
private long transactionStartedAtLastCheck;
private ITransactionAgentStatistics lastStatistics;
private double transactionStartedPerSecond;
private DateTime lastCheckTime;
private static readonly TimeSpan MetricsCheck = TimeSpan.FromSeconds(15);
public TransactionOverloadDetector(TransactionAgentStatistics statistics, IOptions<TransactionRateLoadSheddingOptions> options)
public TransactionOverloadDetector(ITransactionAgentStatistics statistics, IOptions<TransactionRateLoadSheddingOptions> options)
{
this.statistics = statistics;
this.options = options.Value;
this.monitor = new PeriodicAction(MetricsCheck, this.RecordStatistics);
this.transactionStartedAtLastCheck = statistics.TransactionStartedCounter;
this.lastStatistics = TransactionAgentStatistics.Copy(statistics);
this.lastCheckTime = DateTime.UtcNow;
}

private void RecordStatistics()
{
long startCounter = this.statistics.TransactionStartedCounter;
ITransactionAgentStatistics current = TransactionAgentStatistics.Copy(this.statistics);
DateTime now = DateTime.UtcNow;
this.transactionStartedPerSecond = CalculateTps(this.transactionStartedAtLastCheck, this.lastCheckTime, startCounter, now);
this.transactionStartedAtLastCheck = startCounter;

this.transactionStartedPerSecond = CalculateTps(this.lastStatistics.TransactionsStarted, this.lastCheckTime, current.TransactionsStarted, now);
this.lastStatistics = current;
this.lastCheckTime = now;
}

Expand All @@ -66,8 +64,7 @@ public bool IsOverloaded()

DateTime now = DateTime.UtcNow;
this.monitor.TryAction(now);
long startCounter = this.statistics.TransactionStartedCounter;
double txPerSecondCurrently = CalculateTps(this.transactionStartedAtLastCheck, this.lastCheckTime, startCounter, now);
double txPerSecondCurrently = CalculateTps(this.lastStatistics.TransactionsStarted, this.lastCheckTime, this.statistics.TransactionsStarted, now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do you track ITransactionAgentStatistic.Throttled ? did I miss anything. I thought it will be tracked here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tracked in the transaction agent, where the IsOverloaded is called. The agent is responsible for updating it's statistics.

//decaying utilization for tx per second
var aggregratedTxPerSecond = (this.transactionStartedPerSecond + (2.0 * txPerSecondCurrently)) / 3.0;

Expand Down
16 changes: 9 additions & 7 deletions src/Orleans.Transactions/Hosting/SiloBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,27 @@ public static class SiloBuilderExtensions
/// <summary>
/// Configure cluster to use the distributed TM algorithm
/// </summary>
public static ISiloHostBuilder UseDistributedTM(this ISiloHostBuilder builder)
/// <param name="builder">Silo host builder</param>
/// <param name="withStatisticsReporter">Configure a transaction statistics reporter. Set to false if you want to configure your own transaction statistics reporting or don't want transaction statistics reported</param>
/// <returns></returns>
public static ISiloHostBuilder UseDistributedTM(this ISiloHostBuilder builder, bool withStatisticsReporter = true)
{
return builder.ConfigureServices(services => services.UseDistributedTM());
return builder.ConfigureServices(services => services.UseDistributedTM(withStatisticsReporter));
}

/// <summary>
/// Configure cluster to use the distributed TM algorithm
/// </summary>
public static IServiceCollection UseDistributedTM(this IServiceCollection services)
internal static IServiceCollection UseDistributedTM(this IServiceCollection services, bool withReporter)
{
services.TryAddSingleton<IClock,Clock>();
services.TryAddSingleton<TransactionAgentStatistics>();
services.TryAddSingleton<ITransactionAgentStatistics, TransactionAgentStatistics>();
services.TryAddSingleton<ITransactionOverloadDetector,TransactionOverloadDetector>();
services.AddSingleton<ITransactionAgent, TransactionAgent>();
services.TryAddSingleton(typeof(ITransactionDataCopier<>), typeof(DefaultTransactionDataCopier<>));
services.AddSingleton<IAttributeToFactoryMapper<TransactionalStateAttribute>, TransactionalStateAttributeMapper>();
services.TryAddTransient<ITransactionalStateFactory, TransactionalStateFactory>();
services.TryAddTransient<INamedTransactionalStateStorageFactory, NamedTransactionalStateStorageFactory>();
services.AddTransient(typeof(ITransactionalState<>), typeof(TransactionalState<>));
if (withReporter)
services.AddSingleton<ILifecycleParticipant<ISiloLifecycle>, TransactionAgentStatisticsReporter>();
return services;
}
}
Expand Down
Loading