Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add gzip support #33

Merged
merged 6 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.2.0 [unreleased]

### Features

1. [#32](https://github.com/InfluxCommunity/influxdb3-csharp/pull/33): Add GZIP support

## 0.1.0 [2023-06-09]

- initial release of new client version
20 changes: 20 additions & 0 deletions Client.Test.Integration/QueryWriteTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using InfluxDB3.Client.Write;
using NUnit.Framework;

using WriteOptions = InfluxDB3.Client.Config.WriteOptions;

namespace InfluxDB3.Client.Test.Integration;

public class QueryWriteTest
Expand Down Expand Up @@ -106,4 +108,22 @@ public async Task CanDisableCertificateValidation()

await client.WritePointAsync(PointData.Measurement("cpu").AddTag("tag", "c"));
}


[Test]
public async Task WriteDataGzipped()
{
using var client = new InfluxDBClient(new InfluxDBClientConfigs
{
HostUrl = _hostUrl,
Database = _database,
AuthToken = _authToken,
WriteOptions = new WriteOptions
{
GzipThreshold = 1
}
});

await client.WritePointAsync(PointData.Measurement("cpu").AddTag("tag", "c").AddField("user", 14.34));
}
}
43 changes: 42 additions & 1 deletion Client.Test/InfluxDBClientWriteTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using InfluxDB3.Client.Config;
using InfluxDB3.Client.Write;
using WireMock.Matchers;
using WireMock.RequestBuilders;
using WireMock.ResponseBuilders;

Expand Down Expand Up @@ -74,6 +75,43 @@ public async Task BodyNull()
Assert.That(requests[0].RequestMessage.BodyData?.BodyAsString, Is.EqualTo("mem,tag=a field=1"));
}

[Test]
public async Task BodyNonDefaultGzipped()
{
MockServer
.Given(Request.Create().WithPath("/api/v2/write").WithHeader("Content-Encoding", "gzip").UsingPost())
.RespondWith(Response.Create().WithStatusCode(204));

_client = new InfluxDBClient(new InfluxDBClientConfigs
{
HostUrl = MockServerUrl,
Organization = "org",
Database = "database",
WriteOptions = new WriteOptions
{
GzipThreshold = 1
}
});

await _client.WriteRecordAsync("mem,tag=a field=1");
var requests = MockServer.LogEntries.ToList();
Assert.That(requests[0].RequestMessage.BodyData?.BodyAsString, Is.EqualTo("mem,tag=a field=1"));
}

[Test]
public async Task BodyDefaultNotGzipped()
{
MockServer
.Given(Request.Create().WithPath("/api/v2/write").WithHeader("Content-Encoding", ".*", MatchBehaviour.RejectOnMatch).UsingPost())
.RespondWith(Response.Create().WithStatusCode(204));

_client = new InfluxDBClient(MockServerUrl, null, "org", "database");

await _client.WriteRecordAsync("mem,tag=a field=1");
var requests = MockServer.LogEntries.ToList();
Assert.That(requests[0].RequestMessage.BodyData?.BodyAsString, Is.EqualTo("mem,tag=a field=1"));
}

[Test]
public void AlreadyDisposed()
{
Expand Down Expand Up @@ -167,7 +205,10 @@ public async Task PrecisionOptions()
HostUrl = MockServerUrl,
Organization = "org",
Database = "database",
WritePrecision = WritePrecision.Ms
WriteOptions = new WriteOptions
{
Precision = WritePrecision.Ms
}
});
MockServer
.Given(Request.Create().WithPath("/api/v2/write").UsingPost())
Expand Down
15 changes: 10 additions & 5 deletions Client/Config/InfluxDBClientConfigs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ public string HostUrl
/// </summary>
public string? Database { get; set; }

/// <summary>
/// The default precision to use for the timestamp of points if no precision is specified in the write API call.
/// </summary>
public WritePrecision? WritePrecision { get; set; }

/// <summary>
/// Timeout to wait before the HTTP request times out. Default to '10 seconds'.
/// </summary>
Expand All @@ -58,11 +53,21 @@ public string HostUrl
/// </summary>
public bool DisableServerCertificateValidation { get; set; }

/// <summary>
/// Write options.
/// </summary>
public WriteOptions? WriteOptions { get; set; }

internal void Validate()
{
if (string.IsNullOrEmpty(HostUrl))
{
throw new ArgumentException("The hostname or IP address of the InfluxDB server has to be defined.");
}
}

internal WritePrecision WritePrecision
{
get => WriteOptions != null ? WriteOptions.Precision ?? WritePrecision.Ns : WritePrecision.Ns;
}
}
22 changes: 22 additions & 0 deletions Client/Config/WriteOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using InfluxDB3.Client.Write;

namespace InfluxDB3.Client.Config;

public class WriteOptions
alespour marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// The default precision to use for the timestamp of points if no precision is specified in the write API call.
/// </summary>
public WritePrecision? Precision { get; set; }

/// <summary>
/// The threshold in bytes for gzipping the body.
/// </summary>
public int GzipThreshold { get; set; }

internal static readonly WriteOptions DefaultOptions = new()
{
Precision = WritePrecision.Ns,
GzipThreshold = 1000
};
}
27 changes: 18 additions & 9 deletions Client/InfluxDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class InfluxDBClient : IInfluxDBClient
private readonly HttpClient _httpClient;
private readonly FlightSqlClient _flightSqlClient;
private readonly RestClient _restClient;
private readonly GzipHandler _gzipHandler;

/// <summary>
/// This class provides an interface for interacting with an InfluxDB server,
Expand All @@ -107,6 +108,7 @@ public InfluxDBClient(string hostUrl, string? authToken = null, string? organiza
Organization = organization,
Database = database,
AuthToken = authToken,
WriteOptions = WriteOptions.DefaultOptions
})
{
}
Expand All @@ -129,6 +131,7 @@ public InfluxDBClient(InfluxDBClientConfigs configs)
_httpClient = CreateAndConfigureHttpClient(_configs);
_flightSqlClient = new FlightSqlClient(configs: _configs, httpClient: _httpClient);
_restClient = new RestClient(configs: _configs, httpClient: _httpClient);
_gzipHandler = new GzipHandler(configs.WriteOptions != null ? configs.WriteOptions.GzipThreshold : 0);
}

/// <summary>
Expand Down Expand Up @@ -247,15 +250,16 @@ private async Task WriteData(IEnumerable<object> data, string? organization = nu
throw new ObjectDisposedException(nameof(InfluxDBClient));
}

var precisionNotNull = precision ?? _configs.WritePrecision ?? WritePrecision.Ns;
var precisionNotNull = precision ?? _configs.WritePrecision;
var sb = ToLineProtocolBody(data, precisionNotNull);
if (sb.Length == 0)
{
Trace.WriteLine($"The writes: {data} doesn't contains any Line Protocol, skipping");
return;
}

var content = new StringContent(sb.ToString(), Encoding.UTF8, "text/plain");
var body = sb.ToString();
var content = _gzipHandler.Process(body) ?? new StringContent(body, Encoding.UTF8, "text/plain");
var queryParams = new Dictionary<string, string?>()
{
{
Expand Down Expand Up @@ -314,19 +318,24 @@ private static StringBuilder ToLineProtocolBody(IEnumerable<object?> data, Write

internal static HttpClient CreateAndConfigureHttpClient(InfluxDBClientConfigs configs)
{
var handler = new HttpClientHandler
var handler = new HttpClientHandler();
if (handler.SupportsRedirectConfiguration)
{
AllowAutoRedirect = configs.AllowHttpRedirects
};

handler.AllowAutoRedirect = configs.AllowHttpRedirects;
}
if (handler.SupportsAutomaticDecompression)
{
handler.AutomaticDecompression = System.Net.DecompressionMethods.GZip | System.Net.DecompressionMethods.Deflate;
}
if (configs.DisableServerCertificateValidation)
{
handler.ServerCertificateCustomValidationCallback = (_, _, _, _) => true;
}

var client = new HttpClient(handler);

client.Timeout = configs.Timeout;
var client = new HttpClient(handler)
{
Timeout = configs.Timeout
};
client.DefaultRequestHeaders.UserAgent.ParseAdd($"influxdb3-csharp/{AssemblyHelper.GetVersion()}");
if (!string.IsNullOrEmpty(configs.AuthToken))
{
Expand Down
41 changes: 41 additions & 0 deletions Client/Internal/GzipHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net.Http;
using System.Text;

namespace InfluxDB3.Client.Internal;

internal class GzipHandler
{
private readonly int _threshold;

public GzipHandler(int threshold)
{
_threshold = threshold;
}

public HttpContent? Process(string body)
{
if (_threshold > 0 && body.Length < _threshold)
{
return null;
}

using (var msi = new MemoryStream(Encoding.UTF8.GetBytes(body)))
using (var mso = new MemoryStream())
{
using (var gs = new GZipStream(mso, CompressionMode.Compress))
{
msi.CopyTo(gs);
gs.Flush();
}

var content = new ByteArrayContent(mso.ToArray());
content.Headers.Add("Content-Type", "text/plain; charset=utf-8");
content.Headers.Add("Content-Encoding", "gzip");
return content;
}
}
}
Loading