diff --git a/CHANGELOG.md b/CHANGELOG.md index 5efb01856..19c748478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.7.0 [unreleased] +### Features +1. [#70](https://github.com/influxdata/influxdb-client-csharp/pull/70): Optimized mapping of measurements into `PointData` + +### Bugs 1. [#69](https://github.com/influxdata/influxdb-client-csharp/pull/69): Write buffer uses correct flush interval and batch size under heavy load ## 1.6.0 [2020-03-13] diff --git a/Client.Test/ItWriteManyMeasurements.cs b/Client.Test/ItWriteManyMeasurements.cs new file mode 100644 index 000000000..cd86ca8b2 --- /dev/null +++ b/Client.Test/ItWriteManyMeasurements.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using InfluxDB.Client.Api.Domain; +using InfluxDB.Client.Core; +using NUnit.Framework; + +namespace InfluxDB.Client.Test +{ + public class HistoryBarConstant + { + public static readonly string Bucket = "my-bucket"; + public static readonly string OrgId = "my-org"; + } + + [Measurement("history_bar_3")] + public class HistoryBar + { + [Column("value")] public double? Value { get; set; } + + [Column(IsTimestamp = true)] public DateTime Date { get; set; } + } + + [TestFixture] + [Ignore("Only example")] + public class ItWriteManyMeasurements + { + private static readonly int MaxBarsPerRequest = 50_000; + private static readonly int CountToWrite = 2_000_000; + private List bars = new List(); + + [SetUp] + public void SetUp() + { + for (var i = 0; i < CountToWrite; i++) + { + bars.Add(new HistoryBar {Value = i, Date = DateTime.UnixEpoch.Add(TimeSpan.FromSeconds(i))}); + } + } + + [Test] + public async Task Write() + { + var m_client = InfluxDBClientFactory.Create("http://localhost:9999", "my-token".ToCharArray()); + var api = m_client.GetWriteApi(WriteOptions.CreateNew().BatchSize(MaxBarsPerRequest).FlushInterval(10_000).Build()); + + var start = 0; + for (;;) + { + var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray(); + if (historyBars.Length == 0) + { + break; + } + if (start != 0) { + Trace.WriteLine("Delaying..."); + await Task.Delay(100); + } + + start += MaxBarsPerRequest; + Trace.WriteLine( + $"Add bars to buffer From: {historyBars.First().Date}, To: {historyBars.Last().Date}. Remaining {CountToWrite-start}"); + api.WriteMeasurements(HistoryBarConstant.Bucket, HistoryBarConstant.OrgId, WritePrecision.S, + historyBars); + } + + Trace.WriteLine("Flushing data..."); + + m_client.Dispose(); + + Trace.WriteLine("Finished"); + } + + } +} \ No newline at end of file diff --git a/Client.Test/MeasurementMapperTest.cs b/Client.Test/MeasurementMapperTest.cs index 7d21c8960..1806b8596 100644 --- a/Client.Test/MeasurementMapperTest.cs +++ b/Client.Test/MeasurementMapperTest.cs @@ -1,4 +1,7 @@ using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; using InfluxDB.Client.Api.Domain; using InfluxDB.Client.Core; using InfluxDB.Client.Internal; @@ -66,6 +69,28 @@ public void DefaultToString() Assert.AreEqual("poco,tag=value value=\"to-string\"", lineProtocol); } + + [Test] + public void HeavyLoad() + { + var measurements = new List(); + + for (var i = 0; i < 500_000; i++) + { + measurements.Add(new Poco{Value = i, Tag = "Europe", Timestamp = DateTime.UnixEpoch.Add(TimeSpan.FromSeconds(i))}); + } + + var stopWatch = new Stopwatch(); + stopWatch.Start(); + + var enumerable = measurements.Select(it => _mapper.ToPoint(it, WritePrecision.S).ToLineProtocol()); + var _ = string.Join("\n", enumerable); + + var ts = stopWatch.Elapsed; + var elapsedTime = $"{ts.Hours:00}:{ts.Minutes:00}:{ts.Seconds:00}.{ts.Milliseconds / 10:00}"; + + Assert.LessOrEqual(ts.Seconds, 10, $"Elapsed time: {elapsedTime}"); + } private class MyClass { diff --git a/Client/Internal/MeasurementMapper.cs b/Client/Internal/MeasurementMapper.cs index 3fb10c6e9..c25b038bc 100644 --- a/Client/Internal/MeasurementMapper.cs +++ b/Client/Internal/MeasurementMapper.cs @@ -1,5 +1,8 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Reflection; using System.Runtime.CompilerServices; using InfluxDB.Client.Api.Domain; @@ -12,18 +15,28 @@ "95804a1aeeb0de18ac3728782f9dc8dbae2e806167a8bb64c0402278edcefd78c13dbe7f8d13de36eb362" + "21ec215c66ee2dfe7943de97b869c5eea4d92f92d345ced67de5ac8fc3cd2f8dd7e3c0c53bdb0cc433af8" + "59033d069cad397a7")] + namespace InfluxDB.Client.Internal { + internal class PropertyInfoColumn + { + internal PropertyInfo Property; + internal Column Column; + } + internal class MeasurementMapper { + private IDictionary CACHE = new ConcurrentDictionary(); + internal PointData ToPoint(TM measurement, WritePrecision precision) { Arguments.CheckNotNull(measurement, nameof(measurement)); Arguments.CheckNotNull(precision, nameof(precision)); - var measurementAttribute = (Measurement) measurement.GetType() - .GetCustomAttribute(typeof(Measurement)); - + var measurementType = measurement.GetType(); + CacheMeasurementClass(measurementType); + + var measurementAttribute = (Measurement) measurementType.GetCustomAttribute(typeof(Measurement)); if (measurementAttribute == null) { throw new InvalidOperationException( @@ -32,26 +45,20 @@ internal PointData ToPoint(TM measurement, WritePrecision precision) var point = PointData.Measurement(measurementAttribute.Name); - foreach (var property in measurement.GetType().GetProperties()) + foreach (var propertyInfo in CACHE[measurementType.Name]) { - var column = (Column) property.GetCustomAttribute(typeof(Column)); - if (column == null) - { - continue; - } - - var value = property.GetValue(measurement); + var value = propertyInfo.Property.GetValue(measurement); if (value == null) { continue; } - var name = !string.IsNullOrEmpty(column.Name) ? column.Name : property.Name; - if (column.IsTag) + var name = !string.IsNullOrEmpty(propertyInfo.Column.Name) ? propertyInfo.Column.Name : propertyInfo.Property.Name; + if (propertyInfo.Column.IsTag) { point.Tag(name, value.ToString()); } - else if (column.IsTimestamp) + else if (propertyInfo.Column.IsTimestamp) { if (value is long l) { @@ -60,11 +67,11 @@ internal PointData ToPoint(TM measurement, WritePrecision precision) else if (value is TimeSpan span) { point.Timestamp(span, precision); - } + } else if (value is DateTime date) { point.Timestamp(date, precision); - } + } else if (value is DateTimeOffset offset) { point.Timestamp(offset, precision); @@ -137,5 +144,18 @@ internal PointData ToPoint(TM measurement, WritePrecision precision) return point; } + + private void CacheMeasurementClass(Type measurementType) + { + if (CACHE.ContainsKey(measurementType.Name)) + { + return; + } + + CACHE[measurementType.Name] = measurementType.GetProperties() + .Select(property => new PropertyInfoColumn {Column = (Column) property.GetCustomAttribute(typeof(Column)), Property = property}) + .Where(propertyInfo => propertyInfo.Column != null) + .ToArray(); + } } } \ No newline at end of file