Skip to content

Commit

Permalink
Port weighted sample from JavaMetrics ( dropwizard/metrics#421 )
Browse files Browse the repository at this point in the history
  • Loading branch information
etishor committed Sep 24, 2014
1 parent 6224694 commit 9dc6b65
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 99 deletions.
13 changes: 7 additions & 6 deletions Metrics.sln
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
VisualStudioVersion = 12.0.30501.0
MinimumVisualStudioVersion = 10.0.40219.1
# Visual Studio 2012
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Metrics", "Src\Metrics\Metrics.csproj", "{95E29D40-DBEC-49E2-9CC5-26B88966DADE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Metrics.Tests", "Src\Metrics.Tests\Metrics.Tests.csproj", "{4E0F8CB9-6919-48B8-8212-BCE75D04E0D9}"
Expand Down Expand Up @@ -166,11 +164,14 @@ Global
{D08A63D0-F440-477C-9565-1264AE0879D0} = {30108C17-0671-4786-90D0-0564718C3F70}
{7416F90D-FFD6-4E34-8638-5234C8B9EC39} = {30108C17-0671-4786-90D0-0564718C3F70}
{A9074D3E-3421-483D-AF98-4AAEEED11ECA} = {30108C17-0671-4786-90D0-0564718C3F70}
{FBFF51D8-52F4-4EEF-8498-122A14B37D6C} = {443860DA-6378-47D0-A4AE-4E79975C5E1C}
{DAF70394-CE63-4D2F-B52B-839941827B4E} = {30108C17-0671-4786-90D0-0564718C3F70}
{18E808CB-98F9-4754-9872-D3D00F9CE72B} = {30108C17-0671-4786-90D0-0564718C3F70}
{828188F9-26C2-4C93-A156-D9EA13D7E2FA} = {30108C17-0671-4786-90D0-0564718C3F70}
{FBFF51D8-52F4-4EEF-8498-122A14B37D6C} = {443860DA-6378-47D0-A4AE-4E79975C5E1C}
{68F25A01-AF00-4D57-9D1A-81213E4EF56F} = {443860DA-6378-47D0-A4AE-4E79975C5E1C}
{025CD6D0-1A1E-4B14-A7DB-AF5DFF887210} = {443860DA-6378-47D0-A4AE-4E79975C5E1C}
{828188F9-26C2-4C93-A156-D9EA13D7E2FA} = {30108C17-0671-4786-90D0-0564718C3F70}
EndGlobalSection
EndGlobalSection
GlobalSection(Performance) = preSolution
HasPerformanceSessions = true
EndGlobalSection
EndGlobal
2 changes: 1 addition & 1 deletion Src/Metrics.Tests/DefaultMetricContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public class CustomReservoir : Reservoir
public void Update(long value) { this.values.Add(value); }
public Snapshot Snapshot
{
get { return new Snapshot(this.values); }
get { return new UniformSnapshot(this.values); }
}

public void Reset()
Expand Down
82 changes: 82 additions & 0 deletions Src/Metrics.Tests/ExponentiallyDecayingReservoirTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,87 @@ public void EDRlongPeriodsOfInactivityShouldNotCorruptSamplingState()
// TODO: double check the Skip first value - sometimes first value is 2000 - which might or not be correct
finalSnapshot.Values.Skip(1).Should().OnlyContain(v => 3000 <= v && v < 4000);
}

[Fact]
public void EDRSpotLift()
{
TestClock clock = new TestClock();
TestScheduler scheduler = new TestScheduler(clock);
ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, 0.015, clock, scheduler);

int valuesRatePerMinute = 10;
int valuesIntervalMillis = (int)(TimeUnit.Minutes.ToMilliseconds(1) / valuesRatePerMinute);
// mode 1: steady regime for 120 minutes
for (int i = 0; i < 120 * valuesRatePerMinute; i++)
{
reservoir.Update(177);
clock.Advance(TimeUnit.Milliseconds, valuesIntervalMillis);
}

// switching to mode 2: 10 minutes more with the same rate, but larger value
for (int i = 0; i < 10 * valuesRatePerMinute; i++)
{
reservoir.Update(9999);
clock.Advance(TimeUnit.Milliseconds, valuesIntervalMillis);
}

// expect that quantiles should be more about mode 2 after 10 minutes
reservoir.Snapshot.Median.Should().Be(9999);
}

[Fact]
public void EDRSpotFall()
{
TestClock clock = new TestClock();
TestScheduler scheduler = new TestScheduler(clock);
ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, 0.015, clock, scheduler);

int valuesRatePerMinute = 10;
int valuesIntervalMillis = (int)(TimeUnit.Minutes.ToMilliseconds(1) / valuesRatePerMinute);
// mode 1: steady regime for 120 minutes
for (int i = 0; i < 120 * valuesRatePerMinute; i++)
{
reservoir.Update(9998);
clock.Advance(TimeUnit.Milliseconds, valuesIntervalMillis);
}

// switching to mode 2: 10 minutes more with the same rate, but smaller value
for (int i = 0; i < 10 * valuesRatePerMinute; i++)
{
reservoir.Update(178);
clock.Advance(TimeUnit.Milliseconds, valuesIntervalMillis);
}

// expect that quantiles should be more about mode 2 after 10 minutes
reservoir.Snapshot.Percentile95.Should().Be(178);
}

[Fact]
public void EDRQuantiliesShouldBeBasedOnWeights()
{
TestClock clock = new TestClock();
TestScheduler scheduler = new TestScheduler(clock);
ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, 0.015, clock, scheduler);

for (int i = 0; i < 40; i++)
{
reservoir.Update(177);
}

clock.Advance(TimeUnit.Seconds, 120);

for (int i = 0; i < 10; i++)
{
reservoir.Update(9999);
}

reservoir.Snapshot.Size.Should().Be(50);

// the first added 40 items (177) have weights 1
// the next added 10 items (9999) have weights ~6
// so, it's 40 vs 60 distribution, not 40 vs 10
reservoir.Snapshot.Median.Should().Be(9999);
reservoir.Snapshot.Percentile75.Should().Be(9999);
}
}
}
14 changes: 7 additions & 7 deletions Src/Metrics.Tests/SnapshotTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

namespace Metrics.Tests
{
public class SnapshotTest
public class UniformSnapshotTest
{
private readonly Snapshot snapshot = new Snapshot(new long[] { 5, 1, 2, 3, 4 });
private readonly UniformSnapshot snapshot = new UniformSnapshot(new long[] { 5, 1, 2, 3, 4 });

[Fact]
public void SnapshotSmallQuantilesAreTheFirstValue()
Expand Down Expand Up @@ -95,35 +95,35 @@ public void SnapshotCalculatesTheStdDev()
[Fact]
public void SnapshotCalculatesAMinOfZeroForAnEmptySnapshot()
{
Snapshot snapshot = new Snapshot(new long[] { });
Snapshot snapshot = new UniformSnapshot(new long[] { });
snapshot.Min.Should().Be(0);
}

[Fact]
public void SnapshotCalculatesAMaxOfZeroForAnEmptySnapshot()
{
Snapshot snapshot = new Snapshot(new long[] { });
Snapshot snapshot = new UniformSnapshot(new long[] { });
snapshot.Max.Should().Be(0);
}

[Fact]
public void SnapshotCalculatesAMeanOfZeroForAnEmptySnapshot()
{
Snapshot snapshot = new Snapshot(new long[] { });
Snapshot snapshot = new UniformSnapshot(new long[] { });
snapshot.Mean.Should().Be(0);
}

[Fact]
public void SnapshotCalculatesAStdDevOfZeroForAnEmptySnapshot()
{
Snapshot snapshot = new Snapshot(new long[] { });
Snapshot snapshot = new UniformSnapshot(new long[] { });
snapshot.StdDev.Should().Be(0);
}

[Fact]
public void SnapshotCalculatesAStdDevOfZeroForASingletonSnapshot()
{
Snapshot snapshot = new Snapshot(new long[] { 1 });
Snapshot snapshot = new UniformSnapshot(new long[] { 1 });
snapshot.StdDev.Should().Be(0);
}
}
Expand Down
29 changes: 17 additions & 12 deletions Src/Metrics/Core/ExponentiallyDecayingReservoir.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public sealed class ExponentiallyDecayingReservoir : Reservoir, IDisposable
private const double DefaultAlpha = 0.015;
private static readonly TimeSpan RescaleInterval = TimeSpan.FromHours(1);

private readonly ConcurrentDictionary<double, long> values = new ConcurrentDictionary<double, long>();
private readonly ConcurrentDictionary<double, WeightedSample> values = new ConcurrentDictionary<double, WeightedSample>();
private readonly ReaderWriterLockSlim @lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly double alpha;
private readonly int size;
Expand Down Expand Up @@ -57,7 +57,7 @@ public Snapshot Snapshot
this.@lock.EnterReadLock();
try
{
return new Snapshot(this.values.Values);
return new WeightedSnapshot(this.values.Values);
}
finally
{
Expand Down Expand Up @@ -91,23 +91,23 @@ private void Update(long value, long timestamp)
this.@lock.EnterReadLock();
try
{
long age = timestamp - startTime.Value;
double weighted = Math.Exp(alpha * age);
double priority = weighted / ThreadLocalRandom.NextDouble();
double itemWeight = Math.Exp(alpha * (timestamp - startTime.Value));
var sample = new WeightedSample(value, itemWeight);
double priority = itemWeight / ThreadLocalRandom.NextDouble();

long newCount = count.Increment();
if (newCount <= size)
{
values.AddOrUpdate(priority, value, (k, v) => value);
this.values.AddOrUpdate(priority, sample, (k, v) => sample);
}
else
{
var first = values.First().Key;
if (first < priority)
{
this.values.AddOrUpdate(priority, value, (k, v) => v);
this.values.AddOrUpdate(priority, sample, (k, v) => v);

long removed;
WeightedSample removed;
// ensure we always remove an item
while (!values.TryRemove(first, out removed))
{
Expand Down Expand Up @@ -153,13 +153,18 @@ private void Rescale()
long oldStartTime = startTime.Value;
this.startTime.SetValue(this.clock.Seconds);

double scalingFactor = Math.Exp(-alpha * (startTime.Value - oldStartTime));

var keys = new List<double>(this.values.Keys);
foreach (var key in keys)
{
long value;
this.values.TryRemove(key, out value);
double newKey = key * Math.Exp(-alpha * (startTime.Value - oldStartTime));
values.AddOrUpdate(newKey, value, (k, v) => value);
WeightedSample sample;
if (this.values.TryRemove(key, out sample))
{
double newKey = key * Math.Exp(-alpha * (startTime.Value - oldStartTime));
var newSample = new WeightedSample(sample.Value, sample.Weight * scalingFactor);
values.AddOrUpdate(newKey, newSample, (k, v) => newSample);
}
}
// make sure the counter is in sync with the number of stored samples.
this.count.SetValue(values.Count);
Expand Down
2 changes: 1 addition & 1 deletion Src/Metrics/Core/SlidingWindowReservoir.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Snapshot Snapshot
{
values[i] = Interlocked.Read(ref this.values[i]);
}
return new Snapshot(values);
return new UniformSnapshot(values);
}
}
}
Expand Down
85 changes: 15 additions & 70 deletions Src/Metrics/Core/Snapshot.cs
Original file line number Diff line number Diff line change
@@ -1,75 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;

using System.Collections.Generic;
namespace Metrics.Core
{
public struct Snapshot
public interface Snapshot
{
private readonly long[] values;

public Snapshot(IEnumerable<long> values)
{
this.values = values.OrderBy(v => v).ToArray();
}

public int Size { get { return this.values.Length; } }

public long Max { get { return this.values.LastOrDefault(); } }
public long Min { get { return this.values.FirstOrDefault(); } }
public double Mean { get { return Size == 0 ? 0.0 : this.values.Average(); } }

public double StdDev
{
get
{
if (this.Size <= 1)
{
return 0;
}

double avg = values.Average();
double sum = values.Sum(d => Math.Pow(d - avg, 2));
return Math.Sqrt((sum) / (values.Count() - 1));
}
}

public double Median { get { return GetValue(0.5d); } }
public double Percentile75 { get { return GetValue(0.75d); } }
public double Percentile95 { get { return GetValue(0.95d); } }
public double Percentile98 { get { return GetValue(0.98d); } }
public double Percentile99 { get { return GetValue(0.99d); } }
public double Percentile999 { get { return GetValue(0.999d); } }

public IEnumerable<long> Values { get { return this.values.AsEnumerable(); } }

public double GetValue(double quantile)
{
if (quantile < 0.0 || quantile > 1.0)
{
throw new ArgumentException(string.Format("{0} is not in [0..1]", quantile));
}

if (this.Size == 0)
{
return 0;
}

double pos = quantile * (values.Length + 1);

if (pos < 1)
{
return values[0];
}

if (pos >= values.Length)
{
return values[values.Length - 1];
}

double lower = values[(int)pos - 1];
double upper = values[(int)pos];
return lower + (pos - Math.Floor(pos)) * (upper - lower);
}
double GetValue(double quantile);
long Max { get; }
double Mean { get; }
double Median { get; }
long Min { get; }
double Percentile75 { get; }
double Percentile95 { get; }
double Percentile98 { get; }
double Percentile99 { get; }
double Percentile999 { get; }
int Size { get; }
double StdDev { get; }
IEnumerable<long> Values { get; }
}
}
2 changes: 1 addition & 1 deletion Src/Metrics/Core/UniformReservoir.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Snapshot Snapshot
{
get
{
return new Snapshot(this.values.Take(Size).Select(v => v.Value));
return new UniformSnapshot(this.values.Take(Size).Select(v => v.Value));
}
}

Expand Down
Loading

0 comments on commit 9dc6b65

Please sign in to comment.