diff --git a/src/Orleans.Transactions/Abstractions/ITransactionAgentStatistics.cs b/src/Orleans.Transactions/Abstractions/ITransactionAgentStatistics.cs new file mode 100644 index 0000000000..8cd6168c16 --- /dev/null +++ b/src/Orleans.Transactions/Abstractions/ITransactionAgentStatistics.cs @@ -0,0 +1,18 @@ + +namespace Orleans.Transactions.Abstractions +{ + public interface ITransactionAgentStatistics + { + void TrackTransactionStarted(); + long TransactionsStarted { get; } + + void TrackTransactionSucceeded(); + long TransactionsSucceeded { get; } + + void TrackTransactionFailed(); + long TransactionsFailed { get; } + + void TrackTransactionThrottled(); + long TransactionsThrottled { get; } + } +} diff --git a/src/Orleans.Transactions/DistributedTM/TransactionAgent.cs b/src/Orleans.Transactions/DistributedTM/TransactionAgent.cs index b9c7981f8a..bfa1cda73e 100644 --- a/src/Orleans.Transactions/DistributedTM/TransactionAgent.cs +++ b/src/Orleans.Transactions/DistributedTM/TransactionAgent.cs @@ -2,56 +2,14 @@ using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; -using Orleans.Runtime; -using Orleans.Concurrency; using System.Linq; -using System.Threading; -using System.Transactions; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Orleans.Configuration; +using Orleans.Runtime; +using Orleans.Concurrency; using Orleans.Transactions.Abstractions; namespace Orleans.Transactions { - public class TransactionAgentStatistics - { - public long TransactionStartedCounter { get; set; } - - private const string TransactionStartedPerSecondMetric = "TransactionAgent.TransactionStartedPerSecond"; - //Transaction started recorded at when metrics was reported last time - private long transactionStartedAtLastReported; - private DateTime lastReportTime; - private readonly ITelemetryProducer telemetryProducer; - private readonly PeriodicAction monitor; - public TransactionAgentStatistics(ITelemetryProducer telemetryProducer, IOptions options) - { - this.telemetryProducer = telemetryProducer; - this.lastReportTime = DateTime.UtcNow; - this.monitor = new PeriodicAction(options.Value.PerfCountersWriteInterval, this.ReportMetrics); - } - - public void TryReportMetrics(DateTime timestamp) - { - this.monitor.TryAction(timestamp); - } - - private void ReportMetrics() - { - var now = DateTime.UtcNow; - var txStartedDelta = TransactionStartedCounter - transactionStartedAtLastReported; - double transactionStartedPerSecond; - var timelapseSinceLastReport = now - this.lastReportTime; - if (timelapseSinceLastReport.TotalSeconds <= 1) - transactionStartedPerSecond = txStartedDelta; - else transactionStartedPerSecond = txStartedDelta * 1000 / timelapseSinceLastReport.TotalMilliseconds; - this.telemetryProducer.TrackMetric(TransactionStartedPerSecondMetric, transactionStartedPerSecond); - //record snapshot data of this report - transactionStartedAtLastReported = TransactionStartedCounter; - lastReportTime = now; - } - } - [Reentrant] internal class TransactionAgent : ITransactionAgent { @@ -60,10 +18,10 @@ internal class TransactionAgent : ITransactionAgent private readonly ILogger logger; private readonly Stopwatch stopwatch = Stopwatch.StartNew(); private readonly CausalClock clock; - private readonly TransactionAgentStatistics statistics; + private readonly ITransactionAgentStatistics statistics; private readonly ITransactionOverloadDetector overloadDetector; - public TransactionAgent(IClock clock, ILogger logger, TransactionAgentStatistics statistics, ITransactionOverloadDetector overloadDetector) + public TransactionAgent(IClock clock, ILogger logger, ITransactionAgentStatistics statistics, ITransactionOverloadDetector overloadDetector) { this.clock = new CausalClock(clock); this.logger = logger; @@ -73,20 +31,22 @@ public TransactionAgent(IClock clock, ILogger logger, Transact public Task StartTransaction(bool readOnly, TimeSpan timeout) { - this.statistics.TryReportMetrics(DateTime.UtcNow); if (overloadDetector.IsOverloaded()) + { + this.statistics.TrackTransactionThrottled(); throw new OrleansStartTransactionFailedException(new OrleansTransactionOverloadException()); + } var guid = Guid.NewGuid(); DateTime ts = this.clock.UtcNow(); if (logger.IsEnabled(LogLevel.Trace)) logger.Trace($"{stopwatch.Elapsed.TotalMilliseconds:f2} start transaction {guid} at {ts:o}"); - this.statistics.TransactionStartedCounter++; + this.statistics.TrackTransactionStarted(); return Task.FromResult(new TransactionInfo(guid, ts, ts)); } - public Task Commit(ITransactionInfo info) + public async Task Commit(ITransactionInfo info) { var transactionInfo = (TransactionInfo)info; @@ -108,13 +68,21 @@ public Task Commit(ITransactionInfo info) } } - if (writeParticipants == null) + try { - return CommitReadOnlyTransaction(transactionInfo); + TransactionalStatus status = (writeParticipants == null) + ? await CommitReadOnlyTransaction(transactionInfo) + : await CommitReadWriteTransaction(transactionInfo, writeParticipants); + if (status == TransactionalStatus.Ok) + this.statistics.TrackTransactionSucceeded(); + else + this.statistics.TrackTransactionFailed(); + return status; } - else + catch (Exception) { - return CommitReadWriteTransaction(transactionInfo, writeParticipants); + this.statistics.TrackTransactionFailed(); + throw; } } @@ -188,7 +156,7 @@ private async Task CommitReadWriteTransaction(TransactionIn try { // wait for the TM to commit the transaction - var status = await tmPrepareAndCommitTask; + TransactionalStatus status = await tmPrepareAndCommitTask; if (status != TransactionalStatus.Ok) { @@ -227,9 +195,10 @@ private async Task CommitReadWriteTransaction(TransactionIn public void Abort(ITransactionInfo info, OrleansTransactionAbortedException reason) { + this.statistics.TrackTransactionFailed(); var transactionInfo = (TransactionInfo)info; - var participants = transactionInfo.Participants.Keys.ToList(); + List participants = transactionInfo.Participants.Keys.ToList(); if (logger.IsEnabled(LogLevel.Trace)) logger.Trace($"abort {transactionInfo} {string.Join(",", participants.Select(p => p.ToString()))} {reason}"); diff --git a/src/Orleans.Transactions/DistributedTM/TransactionAgentStatistics.cs b/src/Orleans.Transactions/DistributedTM/TransactionAgentStatistics.cs new file mode 100644 index 0000000000..7b30574fbc --- /dev/null +++ b/src/Orleans.Transactions/DistributedTM/TransactionAgentStatistics.cs @@ -0,0 +1,51 @@ +using System.Threading; +using Orleans.Transactions.Abstractions; + +namespace Orleans.Transactions +{ + public class TransactionAgentStatistics : ITransactionAgentStatistics + { + private long transactionsStarted; + public long TransactionsStarted => transactionsStarted; + + private long transactionsSucceeded; + public long TransactionsSucceeded => transactionsSucceeded; + + private long transactionsFailed; + public long TransactionsFailed => transactionsFailed; + + private long transactionsThrottled; + public long TransactionsThrottled => transactionsThrottled; + + public void TrackTransactionStarted() + { + Interlocked.Increment(ref this.transactionsStarted); + } + + public void TrackTransactionSucceeded() + { + Interlocked.Increment(ref this.transactionsSucceeded); + } + + public void TrackTransactionFailed() + { + Interlocked.Increment(ref this.transactionsFailed); + } + + public void TrackTransactionThrottled() + { + Interlocked.Increment(ref this.transactionsThrottled); + } + + public static ITransactionAgentStatistics Copy(ITransactionAgentStatistics initialStatistics) + { + return new TransactionAgentStatistics + { + transactionsStarted = initialStatistics.TransactionsStarted, + transactionsSucceeded = initialStatistics.TransactionsSucceeded, + transactionsFailed = initialStatistics.TransactionsFailed, + transactionsThrottled = initialStatistics.TransactionsThrottled + }; + } + } +} diff --git a/src/Orleans.Transactions/DistributedTM/TransactionAgentStatisticsReporter.cs b/src/Orleans.Transactions/DistributedTM/TransactionAgentStatisticsReporter.cs new file mode 100644 index 0000000000..c0db87ba72 --- /dev/null +++ b/src/Orleans.Transactions/DistributedTM/TransactionAgentStatisticsReporter.cs @@ -0,0 +1,88 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Runtime; +using Orleans.Transactions.Abstractions; + +namespace Orleans.Transactions +{ + public class TransactionAgentStatisticsReporter : ILifecycleParticipant + { + private const string TransactionsStartedTotalMetric = "TransactionAgent.TransactionsStarted.Total"; + private const string TransactionsStartedPerSecondMetric = "TransactionAgent.TransactionsStarted.PerSecond"; + + private const string SuccessfulTransactionsTotalMetric = "TransactionAgent.SuccessfulTransactions.Total"; + private const string SuccessfulTransactionsPerSecondMetric = "TransactionAgent.SuccessfulTransactions.PerSecond"; + + private const string FailedTransactionsTotalMetric = "TransactionAgent.FailedTransactions.Total"; + private const string FailedTransactionsPerSecondMetric = "TransactionAgent.FailedTransactions.PerSecond"; + + private const string ThrottledTransactionsTotalMetric = "TransactionAgent.ThrottledTransactions.Total"; + private const string ThrottledTransactionsPerSecondMetric = "TransactionAgent.ThrottledTransactions.PerSecond"; + + private readonly ITransactionAgentStatistics statistics; + private readonly ITelemetryProducer telemetryProducer; + private readonly StatisticsOptions statisticsOptions; + + private ITransactionAgentStatistics lastReported; + private DateTime lastReportTime; + private IDisposable timer; + + public TransactionAgentStatisticsReporter(ITransactionAgentStatistics statistics, ITelemetryProducer telemetryProducer, IOptions options) + { + this.statistics = statistics ?? throw new ArgumentNullException(nameof(statistics)); + this.telemetryProducer = telemetryProducer ?? throw new ArgumentNullException(nameof(statistics)); + this.statisticsOptions = options.Value; + this.lastReported = TransactionAgentStatistics.Copy(statistics); + this.lastReportTime = DateTime.UtcNow; + } + + public void Participate(ISiloLifecycle lifecycle) + { + lifecycle.Subscribe(ServiceLifecycleStage.Active, OnStart, OnStop); + } + + private Task OnStart(CancellationToken tc) + { + this.timer = new Timer(ReportMetrics, null, this.statisticsOptions.PerfCountersWriteInterval, this.statisticsOptions.PerfCountersWriteInterval); + return Task.CompletedTask; + } + + private Task OnStop(CancellationToken tc) + { + this.timer?.Dispose(); + this.timer = null; + return Task.CompletedTask; + } + + + private void ReportMetrics(object ignore) + { + ITransactionAgentStatistics currentReported = TransactionAgentStatistics.Copy(statistics); + var now = DateTime.UtcNow; + TimeSpan reportPeriod = now - this.lastReportTime; + + this.telemetryProducer.TrackMetric(TransactionsStartedTotalMetric, currentReported.TransactionsStarted); + this.telemetryProducer.TrackMetric(TransactionsStartedPerSecondMetric, PerSecond(this.lastReported.TransactionsStarted, currentReported.TransactionsStarted, reportPeriod)); + + this.telemetryProducer.TrackMetric(SuccessfulTransactionsTotalMetric, currentReported.TransactionsSucceeded); + this.telemetryProducer.TrackMetric(SuccessfulTransactionsPerSecondMetric, PerSecond(this.lastReported.TransactionsSucceeded, currentReported.TransactionsSucceeded, reportPeriod)); + + this.telemetryProducer.TrackMetric(FailedTransactionsTotalMetric, currentReported.TransactionsFailed); + this.telemetryProducer.TrackMetric(FailedTransactionsPerSecondMetric, PerSecond(this.lastReported.TransactionsFailed, currentReported.TransactionsFailed, reportPeriod)); + + this.telemetryProducer.TrackMetric(ThrottledTransactionsTotalMetric, currentReported.TransactionsThrottled); + this.telemetryProducer.TrackMetric(ThrottledTransactionsPerSecondMetric, PerSecond(this.lastReported.TransactionsThrottled, currentReported.TransactionsThrottled, reportPeriod)); + + this.lastReportTime = now; + this.lastReported = currentReported; + } + + private long PerSecond(long start, long end, TimeSpan time) + { + return ((end - start) * 1000) / Math.Max(1,(long)time.TotalMilliseconds); + } + } +} diff --git a/src/Orleans.Transactions/DistributedTM/TransactionOverloadDetector.cs b/src/Orleans.Transactions/DistributedTM/TransactionOverloadDetector.cs index 253bac6a65..86e2e7308b 100644 --- a/src/Orleans.Transactions/DistributedTM/TransactionOverloadDetector.cs +++ b/src/Orleans.Transactions/DistributedTM/TransactionOverloadDetector.cs @@ -1,9 +1,6 @@ using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Orleans.Transactions.Abstractions; namespace Orleans.Transactions { @@ -34,28 +31,29 @@ public class TransactionRateLoadSheddingOptions public class TransactionOverloadDetector : ITransactionOverloadDetector { - private readonly TransactionAgentStatistics statistics; + private readonly ITransactionAgentStatistics statistics; private readonly TransactionRateLoadSheddingOptions options; private readonly PeriodicAction monitor; - private long transactionStartedAtLastCheck; + private ITransactionAgentStatistics lastStatistics; private double transactionStartedPerSecond; private DateTime lastCheckTime; private static readonly TimeSpan MetricsCheck = TimeSpan.FromSeconds(15); - public TransactionOverloadDetector(TransactionAgentStatistics statistics, IOptions options) + public TransactionOverloadDetector(ITransactionAgentStatistics statistics, IOptions options) { this.statistics = statistics; this.options = options.Value; this.monitor = new PeriodicAction(MetricsCheck, this.RecordStatistics); - this.transactionStartedAtLastCheck = statistics.TransactionStartedCounter; + this.lastStatistics = TransactionAgentStatistics.Copy(statistics); this.lastCheckTime = DateTime.UtcNow; } private void RecordStatistics() { - long startCounter = this.statistics.TransactionStartedCounter; + ITransactionAgentStatistics current = TransactionAgentStatistics.Copy(this.statistics); DateTime now = DateTime.UtcNow; - this.transactionStartedPerSecond = CalculateTps(this.transactionStartedAtLastCheck, this.lastCheckTime, startCounter, now); - this.transactionStartedAtLastCheck = startCounter; + + this.transactionStartedPerSecond = CalculateTps(this.lastStatistics.TransactionsStarted, this.lastCheckTime, current.TransactionsStarted, now); + this.lastStatistics = current; this.lastCheckTime = now; } @@ -66,8 +64,7 @@ public bool IsOverloaded() DateTime now = DateTime.UtcNow; this.monitor.TryAction(now); - long startCounter = this.statistics.TransactionStartedCounter; - double txPerSecondCurrently = CalculateTps(this.transactionStartedAtLastCheck, this.lastCheckTime, startCounter, now); + double txPerSecondCurrently = CalculateTps(this.lastStatistics.TransactionsStarted, this.lastCheckTime, this.statistics.TransactionsStarted, now); //decaying utilization for tx per second var aggregratedTxPerSecond = (this.transactionStartedPerSecond + (2.0 * txPerSecondCurrently)) / 3.0; diff --git a/src/Orleans.Transactions/Hosting/SiloBuilderExtensions.cs b/src/Orleans.Transactions/Hosting/SiloBuilderExtensions.cs index ee9964d414..7863d96dd4 100644 --- a/src/Orleans.Transactions/Hosting/SiloBuilderExtensions.cs +++ b/src/Orleans.Transactions/Hosting/SiloBuilderExtensions.cs @@ -13,18 +13,18 @@ public static class SiloBuilderExtensions /// /// Configure cluster to use the distributed TM algorithm /// - public static ISiloHostBuilder UseDistributedTM(this ISiloHostBuilder builder) + /// Silo host builder + /// Configure a transaction statistics reporter. Set to false if you want to configure your own transaction statistics reporting or don't want transaction statistics reported + /// + public static ISiloHostBuilder UseDistributedTM(this ISiloHostBuilder builder, bool withStatisticsReporter = true) { - return builder.ConfigureServices(services => services.UseDistributedTM()); + return builder.ConfigureServices(services => services.UseDistributedTM(withStatisticsReporter)); } - /// - /// Configure cluster to use the distributed TM algorithm - /// - public static IServiceCollection UseDistributedTM(this IServiceCollection services) + internal static IServiceCollection UseDistributedTM(this IServiceCollection services, bool withReporter) { services.TryAddSingleton(); - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.AddSingleton(); services.TryAddSingleton(typeof(ITransactionDataCopier<>), typeof(DefaultTransactionDataCopier<>)); @@ -32,6 +32,8 @@ public static IServiceCollection UseDistributedTM(this IServiceCollection servic services.TryAddTransient(); services.TryAddTransient(); services.AddTransient(typeof(ITransactionalState<>), typeof(TransactionalState<>)); + if (withReporter) + services.AddSingleton, TransactionAgentStatisticsReporter>(); return services; } } diff --git a/test/Benchmarks/Program.cs b/test/Benchmarks/Program.cs index 11b4ca23aa..97c293ec7e 100644 --- a/test/Benchmarks/Program.cs +++ b/test/Benchmarks/Program.cs @@ -38,8 +38,21 @@ class Program "Running Transactions benchmark", () => { - var benchmark = new TransactionBenchmark(); - benchmark.Setup(); + var benchmark = new TransactionBenchmark(2, 20000, 5000); + benchmark.MemorySetup(); + return benchmark; + }, + benchmark => benchmark.RunAsync().GetAwaiter().GetResult(), + benchmark => benchmark.Teardown()); + }, + ["Transactions.Memory.Throttled"] = () => + { + RunBenchmark( + "Running Transactions benchmark", + () => + { + var benchmark = new TransactionBenchmark(2, 200000, 15000); + benchmark.MemoryThrottledSetup(); return benchmark; }, benchmark => benchmark.RunAsync().GetAwaiter().GetResult(), @@ -51,7 +64,33 @@ class Program "Running Transactions benchmark", () => { - var benchmark = new TransactionBenchmark(); + var benchmark = new TransactionBenchmark(2, 20000, 5000); + benchmark.AzureSetup(); + return benchmark; + }, + benchmark => benchmark.RunAsync().GetAwaiter().GetResult(), + benchmark => benchmark.Teardown()); + }, + ["Transactions.Azure.Throttled"] = () => + { + RunBenchmark( + "Running Transactions benchmark", + () => + { + var benchmark = new TransactionBenchmark(2, 200000, 15000); + benchmark.AzureThrottledSetup(); + return benchmark; + }, + benchmark => benchmark.RunAsync().GetAwaiter().GetResult(), + benchmark => benchmark.Teardown()); + }, + ["Transactions.Azure.Overloaded"] = () => + { + RunBenchmark( + "Running Transactions benchmark", + () => + { + var benchmark = new TransactionBenchmark(2, 200000, 15000); benchmark.AzureSetup(); return benchmark; }, @@ -159,8 +198,8 @@ private static void RunBenchmark(string name, Func init, Action benchma var stopWatch = Stopwatch.StartNew(); benchmarkAction(bench); Console.WriteLine($"Elapsed milliseconds: {stopWatch.ElapsedMilliseconds}"); - tearDown(bench); Console.WriteLine("Press any key to continue ..."); + tearDown(bench); Console.ReadLine(); } } diff --git a/test/Benchmarks/Transactions/TransactionBenchmark.cs b/test/Benchmarks/Transactions/TransactionBenchmark.cs index 9fd0c7267e..60167d9bc0 100644 --- a/test/Benchmarks/Transactions/TransactionBenchmark.cs +++ b/test/Benchmarks/Transactions/TransactionBenchmark.cs @@ -5,27 +5,65 @@ using Orleans.TestingHost; using BenchmarkGrainInterfaces.Transaction; using TestExtensions; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Transactions; +using Orleans.Configuration; +using Orleans.Runtime; +using System.Collections.Generic; +using Microsoft.Extensions.Options; +using Microsoft.Extensions.Logging; +using System.Text; namespace Benchmarks.Transactions { public class TransactionBenchmark { private TestCluster host; + private int runs; + private int transactionsPerRun; + private int concurrent; - public void Setup() + public TransactionBenchmark(int runs, int transactionsPerRun, int concurrent) { - var builder = new TestClusterBuilder(); + this.runs = runs; + this.transactionsPerRun = transactionsPerRun; + this.concurrent = concurrent; + } + + public void MemorySetup() + { + var builder = new TestClusterBuilder(4); builder.AddSiloBuilderConfigurator(); builder.AddSiloBuilderConfigurator(); this.host = builder.Build(); this.host.Deploy(); } + public void MemoryThrottledSetup() + { + var builder = new TestClusterBuilder(4); + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + this.host = builder.Build(); + this.host.Deploy(); + } + public void AzureSetup() { - var builder = new TestClusterBuilder(); + var builder = new TestClusterBuilder(4); + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + this.host = builder.Build(); + this.host.Deploy(); + } + + public void AzureThrottledSetup() + { + var builder = new TestClusterBuilder(4); builder.AddSiloBuilderConfigurator(); builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); this.host = builder.Build(); this.host.Deploy(); } @@ -49,32 +87,46 @@ public void Configure(ISiloHostBuilder hostBuilder) } } + public class SiloTransactionThrottlingConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.Configure(options => + { + options.Enabled = true; + options.Limit = 50; + }); + } + } + public async Task RunAsync() { Console.WriteLine($"Cold Run."); await FullRunAsync(); - Console.WriteLine($"Warm Run."); - await FullRunAsync(); + for(int i=0; i RunAsync(i, 2000, 500))); + int runners = Math.Max(1,(int)Math.Sqrt(concurrent)); + int transactionsPerRunner = Math.Max(1, this.transactionsPerRun / runners); + Report[] reports = await Task.WhenAll(Enumerable.Range(0, runners).Select(i => RunAsync(i, transactionsPerRunner, runners))); Report finalReport = new Report(); foreach (Report report in reports) { finalReport.Succeeded += report.Succeeded; finalReport.Failed += report.Failed; + finalReport.Throttled += report.Throttled; finalReport.Elapsed = TimeSpan.FromMilliseconds(Math.Max(finalReport.Elapsed.TotalMilliseconds, report.Elapsed.TotalMilliseconds)); } Console.WriteLine($"{finalReport.Succeeded} transactions in {finalReport.Elapsed.TotalMilliseconds}ms."); - Console.WriteLine($"{finalReport.Succeeded * 1000 / finalReport.Elapsed.TotalMilliseconds} transactions per second."); + Console.WriteLine($"{(int)(finalReport.Succeeded * 1000 / finalReport.Elapsed.TotalMilliseconds)} transactions per second."); Console.WriteLine($"{finalReport.Failed} transactions failed."); - } - - private Task WarmUpAsync() - { - return Task.WhenAll(Enumerable.Range(0, 100).Select(i => RunAsync(i, 100, 10))); + Console.WriteLine($"{finalReport.Throttled} transactions were throttled."); } public async Task RunAsync(int run, int transactiosPerRun, int concurrentPerRun) @@ -84,7 +136,7 @@ public async Task RunAsync(int run, int transactiosPerRun, int concurren Report report = null; while (report == null) { - await Task.Delay(1000); + await Task.Delay(TimeSpan.FromSeconds(10)); report = await load.TryGetReport(); } return report; @@ -99,9 +151,120 @@ public sealed class SiloTransactionConfigurator : ISiloBuilderConfigurator { public void Configure(ISiloHostBuilder hostBuilder) { - hostBuilder.UseDistributedTM(); + hostBuilder + .UseDistributedTM() + .ConfigureServices(services => services.AddSingleton()) + .Configure(options => options.AddConsumer()) + .Configure(options => + { + options.PerfCountersWriteInterval = TimeSpan.FromSeconds(3); + }); } } } + public class TelemetryConsumer : IMetricTelemetryConsumer + { + private readonly ILogger logger; + + public TelemetryConsumer(ILogger logger) + { + this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public void TrackTrace(string message) + { + this.logger.LogInformation(message); + } + + public void TrackTrace(string message, IDictionary properties) + { + TrackTrace(PrintProperties(message, properties)); + } + + public void TrackTrace(string message, Severity severity) + { + TrackTrace(message); + } + + public void TrackTrace(string message, Severity severity, IDictionary properties) + { + TrackTrace(message, properties); + } + + public void TrackMetric(string name, double value, IDictionary properties = null) + { + TrackTrace(PrintProperties(name, value, properties)); + } + + public void TrackMetric(string name, TimeSpan value, IDictionary properties = null) + { + TrackTrace(PrintProperties(name, value, properties)); + } + + public void IncrementMetric(string name) + { + TrackTrace(name + $" - Increment"); + } + + public void IncrementMetric(string name, double value) + { + TrackTrace(PrintProperties(name, value, null)); + } + + public void DecrementMetric(string name) + { + TrackTrace(name + $" - Decrement"); + } + + public void DecrementMetric(string name, double value) + { + TrackTrace(PrintProperties(name, value, null)); + } + + public void Flush() + { + } + + public void Close() + { + } + + private static string PrintProperties(string message, TValue value, IDictionary properties) + { + var sb = new StringBuilder(message + $" - Value: {value}"); + sb = AppendProperties(sb, properties); + return sb.ToString(); + } + + private static string PrintProperties(string message, IDictionary properties) + { + var sb = new StringBuilder(message); + sb = AppendProperties(sb, properties); + return sb.ToString(); + } + + private static StringBuilder AppendProperties(StringBuilder sb, IDictionary properties) + { + if (properties == null || properties.Keys.Count == 0) + return sb; + + sb.Append(" - Properties:"); + sb.Append(" "); + sb.Append("{"); + + foreach (var key in properties.Keys) + { + sb.Append(" "); + sb.Append(key); + sb.Append(" : "); + sb.Append(properties[key]); + sb.Append(","); + } + sb.Remove(sb.Length - 1, 1); + sb.Append(" "); + sb.Append("}"); + return sb; + } + } } \ No newline at end of file diff --git a/test/Grains/BenchmarkGrainInterfaces/Transaction/ILoadGrain.cs b/test/Grains/BenchmarkGrainInterfaces/Transaction/ILoadGrain.cs index 6e5b2c4910..18f5e8f93a 100644 --- a/test/Grains/BenchmarkGrainInterfaces/Transaction/ILoadGrain.cs +++ b/test/Grains/BenchmarkGrainInterfaces/Transaction/ILoadGrain.cs @@ -8,6 +8,7 @@ public class Report { public int Succeeded { get; set; } public int Failed { get; set; } + public int Throttled { get; set; } public TimeSpan Elapsed { get; set; } } diff --git a/test/Grains/BenchmarkGrains/Transaction/LoadGrain.cs b/test/Grains/BenchmarkGrains/Transaction/LoadGrain.cs index a98d180ec8..be6004e463 100644 --- a/test/Grains/BenchmarkGrains/Transaction/LoadGrain.cs +++ b/test/Grains/BenchmarkGrains/Transaction/LoadGrain.cs @@ -1,10 +1,10 @@ using System; using System.Threading.Tasks; using System.Collections.Generic; -using System.Linq; using System.Diagnostics; using Orleans; using BenchmarkGrainInterfaces.Transaction; +using Orleans.Transactions; namespace BenchmarkGrains.Transaction { @@ -15,6 +15,7 @@ public class LoadGrain : Grain, ILoadGrain public Task Generate(int run, int transactions, int conncurrent) { this.runTask = RunGeneration(run, transactions, conncurrent); + this.runTask.Ignore(); return Task.CompletedTask; } @@ -63,7 +64,14 @@ private async Task> ResolvePending(List pending, Report report, { if (t.IsFaulted || t.IsCanceled) { - report.Failed++; + if(t.Exception.Flatten().GetBaseException() is OrleansStartTransactionFailedException) + { + report.Throttled++; + + } else + { + report.Failed++; + } } else if (t.IsCompleted) { @@ -77,9 +85,17 @@ private async Task> ResolvePending(List pending, Report report, return remaining; } - private Task StartTransaction(int index) + private async Task StartTransaction(int index) { - return GrainFactory.GetGrain(Guid.Empty).Run(new List() { index * 2, index * 2 + 1 }); + try + { + await GrainFactory.GetGrain(Guid.Empty).Run(new List() { index * 2, index * 2 + 1 }); + } catch(OrleansStartTransactionFailedException) + { + // Depay before retry + await Task.Delay(TimeSpan.FromSeconds(1)); + throw; + } } } } diff --git a/test/Transactions/Orleans.Transactions.Tests/TransactionOverloadDetectorTests.cs b/test/Transactions/Orleans.Transactions.Tests/TransactionOverloadDetectorTests.cs index 67741c409d..2244f4605b 100644 --- a/test/Transactions/Orleans.Transactions.Tests/TransactionOverloadDetectorTests.cs +++ b/test/Transactions/Orleans.Transactions.Tests/TransactionOverloadDetectorTests.cs @@ -3,8 +3,7 @@ using Microsoft.Extensions.Options; using Xunit.Abstractions; using Xunit; -using Orleans.Configuration; -using Orleans.TestingHost.Utils; +using Orleans.Transactions.Abstractions; namespace Orleans.Transactions.Tests { @@ -26,7 +25,7 @@ public void RateLimitTest(int runTimeInSeconds, double limit) { TimeSpan runTime = TimeSpan.FromSeconds(runTimeInSeconds); TransactionRateLoadSheddingOptions options = new TransactionRateLoadSheddingOptions { Enabled = true, Limit = limit }; - TransactionAgentStatistics statistics = new TransactionAgentStatistics(NullTelemetryProducer.Instance, Options.Create(new StatisticsOptions())); + ITransactionAgentStatistics statistics = new TransactionAgentStatistics(); ITransactionOverloadDetector detector = new TransactionOverloadDetector(statistics, Options.Create(options)); Stopwatch sw = Stopwatch.StartNew(); long total = 0; @@ -35,12 +34,12 @@ public void RateLimitTest(int runTimeInSeconds, double limit) total++; if (!detector.IsOverloaded()) { - statistics.TransactionStartedCounter++; + statistics.TrackTransactionStarted(); } } sw.Stop(); - double averageRate = (statistics.TransactionStartedCounter * 1000) / sw.ElapsedMilliseconds; - this.output.WriteLine($"Average of {averageRate}, with target of {options.Limit}. Performed {statistics.TransactionStartedCounter} transactions of a max of {total} in {sw.ElapsedMilliseconds}ms."); + double averageRate = (statistics.TransactionsStarted * 1000) / sw.ElapsedMilliseconds; + this.output.WriteLine($"Average of {averageRate}, with target of {options.Limit}. Performed {statistics.TransactionsStarted} transactions of a max of {total} in {sw.ElapsedMilliseconds}ms."); // check to make sure average rate is withing rate +- 10% Assert.True(options.Limit * 0.9 <= averageRate); Assert.True(options.Limit * 1.1 >= averageRate);