Skip to content

Commit

Permalink
Bug fixes (#33)
Browse files Browse the repository at this point in the history
Fix #30 Treat NanoSeconds as default precision
Fix #31 Partial writes raising IndexOutOfRangeException

* Proper implmentation of Chunking.
  • Loading branch information
mvadu authored Jun 4, 2017
1 parent 5ac5c37 commit 418fe4d
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 42 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
## v0.6.1 [6/03/2017]

### Release Notes
Allow points to be passed without explictly setting time or precision. It also fixes an issue with previopus implementation of the chunking support.


### Bugfixes

- [#31](https://github.com/AdysTech/InfluxDB.Client.Net/issues/31): IndexOutOfRangeException thrown for partial writes

### Features

- [#30](https://github.com/AdysTech/InfluxDB.Client.Net/issues/30): Use NanoSeconds as default precision


### Breaking Change
- Library will silently drop points older than retention period. This is similar to InfluDB behavios where it will reject those points with an `{"error":"partial write: points beyond retention policy dropped=225"}`


## v0.6.1 [3/28/2017]

### Release Notes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ It currently supports
</ItemGroup>

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="10.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
</PropertyGroup>
<ItemGroup>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.10.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<HintPath>..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
Expand Down
4 changes: 2 additions & 2 deletions src/AdysTech.InfluxDB.Client.Net/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("0.6.1.0")]
[assembly: AssemblyFileVersion("0.6.1.0")]
[assembly: AssemblyVersion("0.6.5.0")]
[assembly: AssemblyFileVersion("0.6.5.0")]
2 changes: 1 addition & 1 deletion src/AdysTech.InfluxDB.Client.Net/packages.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Newtonsoft.Json" version="10.0.1" targetFramework="net451" />
<package id="Newtonsoft.Json" version="10.0.2" targetFramework="net451" />
</packages>
49 changes: 32 additions & 17 deletions src/DataStructures/InfluxDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using AdysTech.InfluxDB.Client.Net.DataContracts;
using Newtonsoft.Json;
using System.IO;

namespace AdysTech.InfluxDB.Client.Net
{
Expand Down Expand Up @@ -89,7 +90,7 @@ public int Port

#region private methods

private async Task<HttpResponseMessage> GetAsync(Dictionary<string, string> Query)
private async Task<HttpResponseMessage> GetAsync(Dictionary<string, string> Query, HttpCompletionOption completion = HttpCompletionOption.ResponseContentRead)
{
var querybaseUrl = new Uri($"{InfluxUrl}/query?");
var builder = new UriBuilder(querybaseUrl);
Expand All @@ -102,7 +103,7 @@ private async Task<HttpResponseMessage> GetAsync(Dictionary<string, string> Quer

try
{
HttpResponseMessage response = await _client.GetAsync(builder.Uri);
HttpResponseMessage response = await _client.GetAsync(builder.Uri, completion);

if (response.StatusCode == HttpStatusCode.OK)
{
Expand Down Expand Up @@ -169,9 +170,10 @@ private async Task<bool> PostPointsAsync(string dbName, TimePrecision precision,
line.Remove(line.Length - 1, 1);

ByteArrayContent requestContent = new ByteArrayContent(Encoding.UTF8.GetBytes(line.ToString()));
var endPoint = new Dictionary<string, string>() {
{ "db", dbName },
{ "precision", precisionLiterals[(int)precision] } };
var endPoint = new Dictionary<string, string>() { { "db", dbName } };
if (precision > 0)
endPoint.Add("precision", precisionLiterals[(int)precision]);

if (!String.IsNullOrWhiteSpace(retention))
endPoint.Add("rp", retention);
HttpResponseMessage response = await PostAsync(endPoint, requestContent);
Expand All @@ -195,11 +197,18 @@ private async Task<bool> PostPointsAsync(string dbName, TimePrecision precision,
else
parts = oneLinePattern.Matches(content.Substring(content.IndexOf("partial write:\\n") + 16)).ToList();

if (parts.Count == 0)
throw new InfluxDBException("Partial Write", new Regex(@"\""error\"":\""(.*?)\""").Match(content).Groups[1].Value);

if (parts[1].Contains("\\n"))
l = parts[1].Substring(0, parts[1].IndexOf("\\n")).Unescape();
else
l = parts[1].Unescape();
}
catch (InfluxDBException e)
{
throw e;
}
catch (Exception)
{

Expand Down Expand Up @@ -393,7 +402,7 @@ public async Task<bool> PostPointsAsync(string dbName, IEnumerable<IInfluxDatapo
{
int maxBatchSize = 255;
bool finalResult = true, result;
foreach (var group in Points.GroupBy(p => new { p.Precision, p.Retention?.Name }))
foreach (var group in Points.Where(p => p.Retention == null || p.UtcTimestamp > DateTime.UtcNow - p.Retention.Duration).GroupBy(p => new { p.Precision, p.Retention?.Name }))
{

var pointsGroup = group.AsEnumerable().Select((point, index) => new { Index = index, Point = point })//get the index of each point
Expand Down Expand Up @@ -563,26 +572,32 @@ public async Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, stri
{ "q", measurementQuery },
{"chunked", "true" },
{"chunk_size", ChunkSize.ToString() },
{ "epoch", precisionLiterals[(int)precision] } });
{ "epoch", precisionLiterals[(int)precision] } }, HttpCompletionOption.ResponseHeadersRead);
if (response == null) throw new ServiceUnavailableException();
if (response.StatusCode == HttpStatusCode.OK)
{
var results = new List<IInfluxSeries>();
//Hack for https://github.com/influxdata/influxdb/issues/8212
foreach (var str in (await response.Content.ReadAsStringAsync()).Split('\n'))

var stream = await response.Content.ReadAsStreamAsync();

using (var reader = new StreamReader(stream))
{
var rawResult = JsonConvert.DeserializeObject<InfluxResponse>(str);
if (rawResult?.Results[0]?.Series != null)
do
{
foreach (var series in rawResult?.Results[0]?.Series)
var str = await reader.ReadLineAsync();
var rawResult = JsonConvert.DeserializeObject<InfluxResponse>(str);
if (rawResult?.Results[0]?.Series != null)
{
InfluxSeries result = GetInfluxSeries(precision, series);
results.Add(result);
foreach (var series in rawResult?.Results[0]?.Series)
{
InfluxSeries result = GetInfluxSeries(precision, series);
results.Add(result);
}
}
}
if (!rawResult.Results[0].Partial) break;
}
if (!rawResult.Results[0].Partial) break;
} while (!reader.EndOfStream);

}
return results;
}
return null;
Expand Down
34 changes: 17 additions & 17 deletions src/Extensions/EpochHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,35 @@ namespace AdysTech.InfluxDB.Client.Net
{
public static class EpochHelper
{
private static readonly DateTime Origin = new DateTime (new DateTime (1970, 1, 1).Ticks, DateTimeKind.Utc);
private static readonly DateTime Origin = new DateTime(new DateTime(1970, 1, 1).Ticks, DateTimeKind.Utc);

public static long ToEpoch (this DateTime time, TimePrecision precision)
public static long ToEpoch(this DateTime time, TimePrecision precision)
{
TimeSpan t = time - Origin;
switch (precision)
{
case TimePrecision.Hours: return (long) t.TotalHours;
case TimePrecision.Minutes: return (long) t.TotalMinutes;
case TimePrecision.Seconds: return (long) t.TotalSeconds;
case TimePrecision.Milliseconds: return (long) t.TotalMilliseconds;
case TimePrecision.Microseconds: return (long) (t.TotalMilliseconds * 1000);
case TimePrecision.Nanoseconds: return (long) t.Ticks * 100; //1 tick = 100 nano sec
case TimePrecision.Hours: return (long)t.TotalHours;
case TimePrecision.Minutes: return (long)t.TotalMinutes;
case TimePrecision.Seconds: return (long)t.TotalSeconds;
case TimePrecision.Milliseconds: return (long)t.TotalMilliseconds;
case TimePrecision.Microseconds: return (long)(t.TotalMilliseconds * 1000);
case TimePrecision.Nanoseconds:
default: return (long)t.Ticks * 100; //1 tick = 100 nano sec
}
return 0;
}

public static DateTime FromEpoch (this string time, TimePrecision precision)
public static DateTime FromEpoch(this string time, TimePrecision precision)
{
long duration = long.Parse (time);
long duration = long.Parse(time);
DateTime t = Origin;
switch (precision)
{
case TimePrecision.Hours: return t.AddHours (duration);
case TimePrecision.Minutes: return t.AddMinutes (duration);
case TimePrecision.Seconds: return t.AddSeconds (duration);
case TimePrecision.Milliseconds: return t.AddMilliseconds (duration);
case TimePrecision.Microseconds: return t.AddTicks (duration * TimeSpan.TicksPerMillisecond * 1000);
case TimePrecision.Nanoseconds: return t.AddTicks (duration / 100); //1 tick = 100 nano sec
case TimePrecision.Hours: return t.AddHours(duration);
case TimePrecision.Minutes: return t.AddMinutes(duration);
case TimePrecision.Seconds: return t.AddSeconds(duration);
case TimePrecision.Milliseconds: return t.AddMilliseconds(duration);
case TimePrecision.Microseconds: return t.AddTicks(duration * TimeSpan.TicksPerMillisecond * 1000);
case TimePrecision.Nanoseconds: return t.AddTicks(duration / 100); //1 tick = 100 nano sec
}
return t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="MSTest.TestAdapter" Version="1.1.13" />
<PackageReference Include="MSTest.TestFramework" Version="1.1.13" />
<PackageReference Include="MSTest.TestAdapter" Version="1.1.18" />
<PackageReference Include="MSTest.TestFramework" Version="1.1.18" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion tests/InfluxDB.Client.Net.TestSetup.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
throw "Unable to create DB"
}

$r = Invoke-WebRequest -Method Post -Uri http://localhost:8086/write?db=prereq -Body "test,date=$($(get-date).ToString('yyyyMMdd')),time=$($(get-date).ToString('hhmm')) value=1234"
$r = Invoke-WebRequest -Method Post -Uri http://localhost:8086/write?db=prereq -Body "test,TestDate=$($(get-date).ToString('yyyyMMdd')),TestTime=$($(get-date).ToString('hhmm')) value=1234"
if($r.StatusCode -ne 204)
{
throw "Unable to create Measurement"
Expand Down
70 changes: 70 additions & 0 deletions tests/InfluxDBClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -667,5 +667,75 @@ public async Task TestQueryMultiSeriesAsync_Chunking_BySize()
}
}

[TestMethod, TestCategory("Post")]
public async Task TestPostPointsAsync_OlderthanRetention()
{
try
{
var client = new InfluxDBClient(influxUrl, dbUName, dbpwd);
var time = DateTime.Now;
var TestDate = time.ToShortDateString();
var TestTime = time.ToShortTimeString();
var points = new List<IInfluxDatapoint>();
InfluxRetentionPolicy retention = new InfluxRetentionPolicy() { Duration = TimeSpan.FromHours(1) };

for (var i = 0; i < 10; i++)
{
await Task.Delay(1);
var point = new InfluxDatapoint<long>();
point.Retention = retention;
point.UtcTimestamp = DateTime.UtcNow.AddDays(-i);
point.MeasurementName = "RetentionTest";
point.Precision = TimePrecision.Nanoseconds;
point.Tags.Add("TestDate", TestDate);
point.Tags.Add("TestTime", TestTime);
point.Fields.Add("Val", i);
points.Add(point);
}

var r = await client.PostPointsAsync(dbName, points);
Assert.IsTrue(r, "PostPointsAsync retunred false");

Assert.IsTrue(points.Count(p => p.Saved) == 1, "PostPointsAsync saved points older than retention policy");

}
catch (Exception e)
{
Assert.Fail($"Unexpected exception of type {e.GetType()} caught: {e.Message}");
return;
}
}

[TestMethod, TestCategory("Post")]
public async Task TestPostPointsAsync_DefaultTimePrecision()
{
try
{
var client = new InfluxDBClient(influxUrl, dbUName, dbpwd);
var time = DateTime.Now;
var TestDate = time.ToShortDateString();
var TestTime = time.ToShortTimeString();
var points = new List<IInfluxDatapoint>();
for (var i = 0; i < 10; i++)
{
await Task.Delay(1);
var point = new InfluxDatapoint<long>();
point.MeasurementName = "PrecisionTest";
point.Tags.Add("TestDate", TestDate);
point.Tags.Add("TestTime", TestTime);
point.Fields.Add("Val", i);
points.Add(point);
}

var r = await client.PostPointsAsync(dbName, points);
Assert.IsTrue(r, "PostPointsAsync retunred false");
}
catch (Exception e)
{
Assert.Fail($"Unexpected exception of type {e.GetType()} caught: {e.Message}");
return;
}
}

}
}

0 comments on commit 418fe4d

Please sign in to comment.