Skip to content

Commit

Permalink
feat: Optimized mapping of measurements into PointData (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Apr 6, 2020
1 parent 3317c8c commit 7ce73b4
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.7.0 [unreleased]

### Features
1. [#70](https://github.com/influxdata/influxdb-client-csharp/pull/70): Optimized mapping of measurements into `PointData`

### Bugs
1. [#69](https://github.com/influxdata/influxdb-client-csharp/pull/69): Write buffer uses correct flush interval and batch size under heavy load

## 1.6.0 [2020-03-13]
Expand Down
77 changes: 77 additions & 0 deletions Client.Test/ItWriteManyMeasurements.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core;
using NUnit.Framework;

namespace InfluxDB.Client.Test
{
public class HistoryBarConstant
{
public static readonly string Bucket = "my-bucket";
public static readonly string OrgId = "my-org";
}

[Measurement("history_bar_3")]
public class HistoryBar
{
[Column("value")] public double? Value { get; set; }

[Column(IsTimestamp = true)] public DateTime Date { get; set; }
}

[TestFixture]
[Ignore("Only example")]
public class ItWriteManyMeasurements
{
private static readonly int MaxBarsPerRequest = 50_000;
private static readonly int CountToWrite = 2_000_000;
private List<HistoryBar> bars = new List<HistoryBar>();

[SetUp]
public void SetUp()
{
for (var i = 0; i < CountToWrite; i++)
{
bars.Add(new HistoryBar {Value = i, Date = DateTime.UnixEpoch.Add(TimeSpan.FromSeconds(i))});
}
}

[Test]
public async Task Write()
{
var m_client = InfluxDBClientFactory.Create("http://localhost:9999", "my-token".ToCharArray());
var api = m_client.GetWriteApi(WriteOptions.CreateNew().BatchSize(MaxBarsPerRequest).FlushInterval(10_000).Build());

var start = 0;
for (;;)
{
var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
if (historyBars.Length == 0)
{
break;
}
if (start != 0) {
Trace.WriteLine("Delaying...");
await Task.Delay(100);
}

start += MaxBarsPerRequest;
Trace.WriteLine(
$"Add bars to buffer From: {historyBars.First().Date}, To: {historyBars.Last().Date}. Remaining {CountToWrite-start}");
api.WriteMeasurements(HistoryBarConstant.Bucket, HistoryBarConstant.OrgId, WritePrecision.S,
historyBars);
}

Trace.WriteLine("Flushing data...");

m_client.Dispose();

Trace.WriteLine("Finished");
}

}
}
25 changes: 25 additions & 0 deletions Client.Test/MeasurementMapperTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core;
using InfluxDB.Client.Internal;
Expand Down Expand Up @@ -66,6 +69,28 @@ public void DefaultToString()

Assert.AreEqual("poco,tag=value value=\"to-string\"", lineProtocol);
}

[Test]
public void HeavyLoad()
{
var measurements = new List<Poco>();

for (var i = 0; i < 500_000; i++)
{
measurements.Add(new Poco{Value = i, Tag = "Europe", Timestamp = DateTime.UnixEpoch.Add(TimeSpan.FromSeconds(i))});
}

var stopWatch = new Stopwatch();
stopWatch.Start();

var enumerable = measurements.Select(it => _mapper.ToPoint(it, WritePrecision.S).ToLineProtocol());
var _ = string.Join("\n", enumerable);

var ts = stopWatch.Elapsed;
var elapsedTime = $"{ts.Hours:00}:{ts.Minutes:00}:{ts.Seconds:00}.{ts.Milliseconds / 10:00}";

Assert.LessOrEqual(ts.Seconds, 10, $"Elapsed time: {elapsedTime}");
}

private class MyClass
{
Expand Down
52 changes: 36 additions & 16 deletions Client/Internal/MeasurementMapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using InfluxDB.Client.Api.Domain;
Expand All @@ -12,18 +15,28 @@
"95804a1aeeb0de18ac3728782f9dc8dbae2e806167a8bb64c0402278edcefd78c13dbe7f8d13de36eb362" +
"21ec215c66ee2dfe7943de97b869c5eea4d92f92d345ced67de5ac8fc3cd2f8dd7e3c0c53bdb0cc433af8" +
"59033d069cad397a7")]

namespace InfluxDB.Client.Internal
{
internal class PropertyInfoColumn
{
internal PropertyInfo Property;
internal Column Column;
}

internal class MeasurementMapper
{
private IDictionary<string, PropertyInfoColumn[]> CACHE = new ConcurrentDictionary<string, PropertyInfoColumn[]>();

internal PointData ToPoint<TM>(TM measurement, WritePrecision precision)
{
Arguments.CheckNotNull(measurement, nameof(measurement));
Arguments.CheckNotNull(precision, nameof(precision));

var measurementAttribute = (Measurement) measurement.GetType()
.GetCustomAttribute(typeof(Measurement));

var measurementType = measurement.GetType();
CacheMeasurementClass(measurementType);

var measurementAttribute = (Measurement) measurementType.GetCustomAttribute(typeof(Measurement));
if (measurementAttribute == null)
{
throw new InvalidOperationException(
Expand All @@ -32,26 +45,20 @@ internal PointData ToPoint<TM>(TM measurement, WritePrecision precision)

var point = PointData.Measurement(measurementAttribute.Name);

foreach (var property in measurement.GetType().GetProperties())
foreach (var propertyInfo in CACHE[measurementType.Name])
{
var column = (Column) property.GetCustomAttribute(typeof(Column));
if (column == null)
{
continue;
}

var value = property.GetValue(measurement);
var value = propertyInfo.Property.GetValue(measurement);
if (value == null)
{
continue;
}

var name = !string.IsNullOrEmpty(column.Name) ? column.Name : property.Name;
if (column.IsTag)
var name = !string.IsNullOrEmpty(propertyInfo.Column.Name) ? propertyInfo.Column.Name : propertyInfo.Property.Name;
if (propertyInfo.Column.IsTag)
{
point.Tag(name, value.ToString());
}
else if (column.IsTimestamp)
else if (propertyInfo.Column.IsTimestamp)
{
if (value is long l)
{
Expand All @@ -60,11 +67,11 @@ internal PointData ToPoint<TM>(TM measurement, WritePrecision precision)
else if (value is TimeSpan span)
{
point.Timestamp(span, precision);
}
}
else if (value is DateTime date)
{
point.Timestamp(date, precision);
}
}
else if (value is DateTimeOffset offset)
{
point.Timestamp(offset, precision);
Expand Down Expand Up @@ -137,5 +144,18 @@ internal PointData ToPoint<TM>(TM measurement, WritePrecision precision)

return point;
}

private void CacheMeasurementClass(Type measurementType)
{
if (CACHE.ContainsKey(measurementType.Name))
{
return;
}

CACHE[measurementType.Name] = measurementType.GetProperties()
.Select(property => new PropertyInfoColumn {Column = (Column) property.GetCustomAttribute(typeof(Column)), Property = property})
.Where(propertyInfo => propertyInfo.Column != null)
.ToArray();
}
}
}

0 comments on commit 7ce73b4

Please sign in to comment.