Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #82: QueryMultiSeriesAsync should return "partial" #85

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# ASP.NET Core
# Build and test ASP.NET Core projects targeting .NET Core.
# Add steps that run tests, create a NuGet package, deploy, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/dotnet-core

# the build will trigger on any changes to the master branch
trigger:
- master

# the build will run on a Microsoft hosted agent, using the lastest Windows VM Image
pool:
vmImage: 'windows-latest'

# these variables are available throughout the build file
# just the build configuration is defined, in this case we are building Release packages
variables:
buildConfiguration: 'Release'

#The build has 3 seperate tasks run under 1 step
steps:

# The first task is the dotnet command build, pointing to our csproj file
- task: DotNetCoreCLI@2
displayName: 'dotnet build'
inputs:
command: 'build'
arguments: '--configuration $(buildConfiguration)'
projects: 'src/AdysTech.InfluxDB.Client.Net.Core/AdysTech.InfluxDB.Client.Net.Core.csproj'

# The second task is dotnet pack command again pointing to the csproj file
# The nobuild means the project will not be compiled before running pack, because its already built in above step
- task: DotNetCoreCLI@2
displayName: "dotnet pack"
inputs:
command: 'pack'
arguments: '--configuration $(buildConfiguration)'
packagesToPack: 'src/AdysTech.InfluxDB.Client.Net.Core/AdysTech.InfluxDB.Client.Net.Core.csproj'
nobuild: true
versioningScheme: 'off'

# The last task is a nuget command, nuget push
# This will push any .nupkg files to the 'TestFeed' artifact feed
# allowPackageConflicts allows us to build the same version and not throw an error when trying to push
# instead it just ingores the latest package unless the version changes
- task: NuGetCommand@2
displayName: 'nuget push'
inputs:
command: 'push'
feedsToUse: 'select'
packagesToPush: '$(Build.ArtifactStagingDirectory)/**/*.nupkg;!$(Build.ArtifactStagingDirectory)/**/*.symbols.nupkg'
nuGetFeedType: 'internal'
publishVstsFeed: 'herrenknecht-up'
versioningScheme: 'off'
allowPackageConflicts: true
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Description>.Net client for InfluxDB. Supports all InfluxDB version from 0.9 onwards. Supports both .Net 4.6.1+ and .Net Core 2.0+.</Description>
<Description>Herrenknecht specific .Net client for InfluxDB. Supports all InfluxDB version from 0.9 onwards. Supports both .Net 4.6.1+ and .Net Core 2.0+.</Description>
<Product>AdysTech.InfluxDB.Client.Net</Product>
<Company>AdysTech</Company>
<Authors>AdysTech;mvadu</Authors>
<Version>0.9.0.0</Version>
<PackageId>AdysTech.InfluxDB.Client.Net.Core</PackageId>
<Authors>AdysTech;mvadu;Herrenknecht AG</Authors>
<Version>0.9.0.4</Version>
<PackageId>AdysTech.InfluxDB.Client.Net.Core.HK</PackageId>
<Copyright>© AdysTech 2016-2019</Copyright>
<PackageProjectUrl>https://github.com/AdysTech/InfluxDB.Client.Net</PackageProjectUrl>
<RepositoryUrl>https://github.com/AdysTech/InfluxDB.Client.Net</RepositoryUrl>
Expand Down Expand Up @@ -45,6 +45,7 @@ It currently supports
<Compile Include="..\DataStructures\InfluxMeasurement.cs" Link="DataStructures\InfluxMeasurement.cs" />
<Compile Include="..\DataStructures\InfluxRetentionPolicy.cs" Link="DataStructures\InfluxRetentionPolicy.cs" />
<Compile Include="..\DataStructures\InfluxSeries.cs" Link="DataStructures\InfluxSeries.cs" />
<Compile Include="..\DataStructures\InfluxResult.cs" Link="DataStructures\InfluxResult.cs" />
<Compile Include="..\DataStructures\InfluxValueField.cs" Link="DataStructures\InfluxValueField.cs" />
<Compile Include="..\DataStructures\IInfluxValueField.cs" Link="DataStructures\IInfluxValueField.cs" />
<Compile Include="..\DataStructures\ServiceUnavailableException.cs" Link="DataStructures\ServiceUnavailableException.cs" />
Expand All @@ -64,4 +65,4 @@ It currently supports
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
</ItemGroup>

</Project>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
<Compile Include="..\DataStructures\InfluxRetentionPolicy.cs">
<Link>DataStructures\InfluxRetentionPolicy.cs</Link>
</Compile>
<Compile Include="..\DataStructures\InfluxResult.cs">
<Link>DataStructures\InfluxResult.cs</Link>
</Compile>
<Compile Include="..\DataStructures\InfluxSeries.cs">
<Link>DataStructures\InfluxSeries.cs</Link>
</Compile>
Expand Down
88 changes: 85 additions & 3 deletions src/DataStructures/InfluxDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,34 @@ private async Task<HttpResponseMessage> GetAsync(Dictionary<string, string> Quer
return null;
}

private async Task<HttpResponseMessage> PostQueryAsync(Dictionary<string, string> EndPoint, string query)
{
var querybaseUrl = new Uri(InfluxUrl);
var builder = new UriBuilder(querybaseUrl);
builder.Path += "query";

builder.Query = await new FormUrlEncodedContent(EndPoint).ReadAsStringAsync();

try
{
var body = new StringContent($"q={WebUtility.UrlEncode(query)}", Encoding.UTF8,
"application/x-www-form-urlencoded");

HttpResponseMessage response = await _client.PostAsync(builder.Uri, body);

if (response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.BadGateway || (response.StatusCode == HttpStatusCode.InternalServerError && response.ReasonPhrase == "INKApi Error")) //502 Connection refused
throw new UnauthorizedAccessException("InfluxDB needs authentication. Check uname, pwd parameters");

return response;
}
catch (HttpRequestException e)
{
if (e.InnerException.Message == "Unable to connect to the remote server")
throw new ServiceUnavailableException();
}
return null;
}

private async Task<HttpResponseMessage> PostAsync(Dictionary<string, string> EndPoint, byte[] requestContent)
{
var querybaseUrl = new Uri(InfluxUrl);
Expand Down Expand Up @@ -574,6 +602,7 @@ public async Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, stri
{
var results = new List<IInfluxSeries>();
var rawResult = JsonConvert.DeserializeObject<InfluxResponse>(await response.Content.ReadAsStringAsync());
var partialResult = rawResult.Results?.Any(r => r.Partial == true);

if (rawResult?.Results?.Count > 1)
throw new ArgumentException("The query is resulting in a format, which is not supported by this method yet");
Expand All @@ -582,7 +611,7 @@ public async Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, stri
{
foreach (var series in rawResult?.Results[0]?.Series)
{
InfluxSeries result = GetInfluxSeries(precision, series);
InfluxSeries result = GetInfluxSeries(precision, series, partialResult);
results.Add(result);
}
}
Expand All @@ -591,6 +620,54 @@ public async Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, stri
return null;
}

/// <summary>
/// Queries Influx DB and gets multiple time series data back. Ideal for fetching measurement values.
/// The return list is of InfluxResult, that contains multiple InfluxSeries and each element in there will have properties named after columns in series.
/// </summary>
/// <param name="dbName">Name of the database</param>
/// <param name="measurementQuery">Query text, Only results with single series are supported for now</param>
/// <param name="precision">epoch precision of the data set</param>
/// <returns>List of InfluxResult that contains multiple InfluxSeries.</returns>
/// <seealso cref="InfluxResult"/>
public async Task<List<InfluxResult>> QueryMultiSeriesMultiResultAsync(string dbName, string measurementQuery, string retentionPolicy = null, TimePrecision precision = TimePrecision.Nanoseconds)
{
var endPoint = new Dictionary<string, string>() { { "db", dbName }, { "epoch", precisionLiterals[(int)precision] } };

if (retentionPolicy != null)
{
endPoint.Add("rp", retentionPolicy);
}
var response = await PostQueryAsync(endPoint, measurementQuery);

if (response == null) throw new ServiceUnavailableException();
if (response.StatusCode == HttpStatusCode.OK)
{
var multiResult = new List<InfluxResult>();
var rawResult = JsonConvert.DeserializeObject<InfluxResponse>(await response.Content.ReadAsStringAsync());
var partialResult = rawResult.Results?.Any(r => r.Partial);

rawResult?.Results?.ForEach(currentResult =>
{
if (currentResult?.Series != null)
{
var influxResult = new InfluxResult();
influxResult.StatementID = currentResult.StatementID;
influxResult.Partial = currentResult.Partial;

foreach (var series in currentResult.Series)
influxResult.InfluxSeries = GetInfluxSeries(precision, series, partialResult);

if(influxResult.InfluxSeries.Entries.Any())
multiResult.Add(influxResult);
}
});

return multiResult;
}
throw new Exception($"InfluxDB returned status code {response.StatusCode}: " +
$"{await response.Content.ReadAsStringAsync()}");
}

/// <summary>
/// Queries Influx DB and gets a time series data back. Ideal for fetching measurement values.
/// The return list is of InfluxSeries, and each element in there will have properties named after columns in series
Expand Down Expand Up @@ -629,11 +706,12 @@ public async Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, stri
{
var str = await reader.ReadLineAsync();
var rawResult = JsonConvert.DeserializeObject<InfluxResponse>(str);
var partialResult = rawResult?.Results?.Any(r => r.Partial == true);
if (rawResult?.Results[0]?.Series != null)
{
foreach (var series in rawResult?.Results[0]?.Series)
{
InfluxSeries result = GetInfluxSeries(precision, series);
InfluxSeries result = GetInfluxSeries(precision, series, partialResult);
results.Add(result);
}
}
Expand All @@ -650,9 +728,11 @@ public async Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, stri
/// </summary>
/// <param name="precision"></param>
/// <param name="series"></param>
/// <param name="partialResult"></param>
/// <param name="SafePropertyNames">If true the first letter of each property name will be Capital, making them safer to use in C#</param>
/// <returns></returns>
private static InfluxSeries GetInfluxSeries(TimePrecision precision, Series series, bool SafePropertyNames = true)
private static InfluxSeries GetInfluxSeries(TimePrecision precision, Series series, bool? partialResult,
bool SafePropertyNames = true)
{
var result = new InfluxSeries()
{
Expand All @@ -661,6 +741,8 @@ private static InfluxSeries GetInfluxSeries(TimePrecision precision, Series seri

result.SeriesName = series.Name;
result.Tags = series.Tags;
result.Partial = partialResult ?? false;

var entries = new List<dynamic>();
for (var row = 0; row < series?.Values?.Count; row++)
{
Expand Down
20 changes: 20 additions & 0 deletions src/DataStructures/InfluxResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace AdysTech.InfluxDB.Client.Net
{
/// <summary>
/// Extend the results returned by Query end point of the InfluxDB engine with the informations of Partial
/// and StatementID.
/// </summary>
public class InfluxResult
{
public int StatementID { get; set; }

public IInfluxSeries InfluxSeries { get; set; }

/// <summary>
/// True if the influx query was answered with a partial response due to e.g. exceeding a configured
/// max-row-limit in the InfluxDB. As we don't know which series was truncated by InfluxDB, all series
/// of the response will be flagged with Partial=true.
/// </summary>
public bool Partial { get; set; }
}
}
14 changes: 14 additions & 0 deletions src/DataStructures/InfluxSeries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,19 @@ public class InfluxSeries : IInfluxSeries
/// The objects will have columns as Peoperties with their current values
/// </summary>
public IReadOnlyList<dynamic> Entries { get; internal set; }

/// <summary>
/// True if the influx query was answered with a partial response due to e.g. exceeding a configured
/// max-row-limit in the InfluxDB. As we don't know which series was truncated by InfluxDB, all series
/// of the response will be flagged with Partial=true.
/// </summary>
public bool Partial { get; set; }

public InfluxSeries() { }

public InfluxSeries(IReadOnlyList<dynamic> entries)
{
Entries = entries;
}
}
}
10 changes: 10 additions & 0 deletions src/Interfaces/IInfluxDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ public interface IInfluxDBClient
/// <returns>List of InfluxSeries</returns>
Task<List<IInfluxSeries>> QueryMultiSeriesAsync(string dbName, string measurementQuery, string retentionPolicy = null, TimePrecision precision = TimePrecision.Nanoseconds);

/// <summary>
/// Queries Influx DB and gets multiple time series data back. Ideal for fetching measurement values.
/// The return list is of InfluxResult, that contains multiple InfluxSeries and each element in there will have properties named after columns in series.
/// </summary>
/// <param name="dbName">Name of the database</param>
/// <param name="measurementQuery">Query text, Supports multi series results</param>
/// <param name="retentionPolicy">retention policy containing the measurement</param>
/// <param name="precision">epoch precision of the data set</param>
/// <returns>List of InfluxResult that contains multiple InfluxSeries.</returns>
Task<List<InfluxResult>> QueryMultiSeriesMultiResultAsync(string dbName, string measurementQuery, string retentionPolicy = null, TimePrecision precision = TimePrecision.Nanoseconds);

/// <summary>
/// Queries Influx DB and gets a time series data back. Ideal for fetching measurement values.
Expand Down
7 changes: 7 additions & 0 deletions src/Interfaces/IInfluxSeries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@ public interface IInfluxSeries
/// Dictionary of tags, and their respective values.
/// </summary>
IReadOnlyDictionary<string, string> Tags { get; }

/// <summary>
/// True if the influx query was answered with a partial response due to e.g. exceeding a configured
/// max-row-limit in the InfluxDB. As we don't know which series was truncated by InfluxDB, all series
/// of the response will be flagged with Partial=true.
/// </summary>
bool Partial { get; set; }
}
}
Loading