Skip to content

Commit

Permalink
Backward-compatible fix for "wide table" bulk copy issue (#50)
Browse files Browse the repository at this point in the history
* Backward-compatible fix for "wide table" bulk copy issue (#45)
* Added feature flags for tests (DateTime64, Decimal, IPv6) to disable them on incompatible version
* Switched feature support flag methods to internal
  • Loading branch information
DarkWanderer authored Oct 24, 2020
1 parent 406e730 commit 811ac54
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 38 deletions.
1 change: 0 additions & 1 deletion ClickHouse.Client.Tests/ORM/DapperTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using NUnit.Framework;
using Dapper;
using System.Linq;
Expand Down
1 change: 0 additions & 1 deletion ClickHouse.Client.Tests/ParameterizedInsertTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using ClickHouse.Client.Utility;
using NUnit.Framework;

Expand Down
1 change: 0 additions & 1 deletion ClickHouse.Client.Tests/RawResultReaderAsyncTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using NUnit.Framework;

namespace ClickHouse.Client.Tests
Expand Down
2 changes: 1 addition & 1 deletion ClickHouse.Client.Tests/SqlInlineParametersSelectTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private class OldClickHouseVersionConnection : ClickHouseConnection
{
public OldClickHouseVersionConnection(string connectionString) : base(connectionString) { }

public override Task<bool> SupportsHttpParameters() => Task.FromResult(false);
internal override Task<bool> SupportsHttpParameters() => Task.FromResult(false);
}

private readonly ClickHouseConnection connection;
Expand Down
3 changes: 3 additions & 0 deletions ClickHouse.Client.Tests/SqlSimpleSelectTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public async Task DateTimeSelectShouldHaveCorrectTimezone()
[Test]
public async Task DateTime64SelectShouldHaveCorrectTimezone()
{
if (!TestUtilities.FeatureFlags.DateTime64Supported)
Assert.Inconclusive("Server does not support DateTime64");

using var reader = await connection.ExecuteReaderAsync("SELECT toDateTime64(1577836800, 3, 'Asia/Sakhalin')");

reader.AssertHasFieldCount(1);
Expand Down
58 changes: 35 additions & 23 deletions ClickHouse.Client.Tests/TestUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Data.Common;
using System.Linq;
using System.Net;
using System.Threading;
using ClickHouse.Client.ADO;
using ClickHouse.Client.Types;
using NUnit.Framework;
Expand All @@ -11,6 +12,23 @@ namespace ClickHouse.Client.Tests
{
public static class TestUtilities
{
public static class FeatureFlags
{
public static bool DateTime64Supported = true;
public static bool IPv6Supported = true;
public static bool DecimalSupported = true;
}

static TestUtilities()
{
using var connection = GetTestClickHouseConnection();
connection.Open();
var serverVersion = Version.Parse(connection.ServerVersion);
FeatureFlags.DateTime64Supported = connection.SupportsDateTime64().Result;
FeatureFlags.IPv6Supported = serverVersion >= new Version(20, 0);
FeatureFlags.DecimalSupported = serverVersion >= new Version(20, 0);
}

/// <summary>
/// Utility method to allow to redirect ClickHouse connections to different machine, in case of Windows development environment
/// </summary>
Expand Down Expand Up @@ -79,21 +97,11 @@ public static IEnumerable<DataTypeSample> GetDataTypeSamples()

yield return new DataTypeSample("IPv4", typeof(IPAddress), "toIPv4('1.2.3.4')", IPAddress.Parse("1.2.3.4"));
yield return new DataTypeSample("IPv4", typeof(IPAddress), "toIPv4('255.255.255.255')", IPAddress.Parse("255.255.255.255"));
yield return new DataTypeSample("IPv6", typeof(IPAddress), "toIPv6('2001:0db8:85a3:0000:0000:8a2e:0370:7334')", IPAddress.Parse("2001:0db8:85a3:0000:0000:8a2e:0370:7334"));

yield return new DataTypeSample("Enum('a' = 1, 'b' = 2)", typeof(string), "CAST('a', 'Enum(\\'a\\' = 1, \\'b\\' = 2)')", "a");
yield return new DataTypeSample("Enum8('a' = -1, 'b' = 127)", typeof(string), "CAST('a', 'Enum8(\\'a\\' = -1, \\'b\\' = 127)')", "a");
yield return new DataTypeSample("Enum16('a' = -32768, 'b' = 32767)", typeof(string), "CAST('a', 'Enum16(\\'a\\' = -32768, \\'b\\' = 32767)')", "a");

yield return new DataTypeSample("Decimal32(3)", typeof(decimal), "toDecimal32(123.45, 3)", new decimal(123.45));
yield return new DataTypeSample("Decimal32(3)", typeof(decimal), "toDecimal32(-123.45, 3)", new decimal(-123.45));

yield return new DataTypeSample("Decimal64(7)", typeof(decimal), "toDecimal64(1.2345, 7)", new decimal(1.2345));
yield return new DataTypeSample("Decimal64(7)", typeof(decimal), "toDecimal64(-1.2345, 7)", new decimal(-1.2345));

yield return new DataTypeSample("Decimal128(9)", typeof(decimal), "toDecimal128(12.34, 9)", new decimal(12.34));
yield return new DataTypeSample("Decimal128(9)", typeof(decimal), "toDecimal128(-12.34, 9)", new decimal(-12.34));

yield return new DataTypeSample("Array(Int32)", typeof(int[]), "array(1, 2, 3)", new[] { 1, 2, 3 });
yield return new DataTypeSample("Array(String)", typeof(int[]), "array('a', 'b', 'c')", new[] { "a", "b", "c" });
yield return new DataTypeSample("Array(Nullable(Int32))", typeof(int?[]), "array(1, 2, NULL)", new int?[] { 1, 2, null });
Expand All @@ -110,20 +118,24 @@ public static IEnumerable<DataTypeSample> GetDataTypeSamples()

yield return new DataTypeSample("Date", typeof(DateTime), "toDateOrNull('1999-11-12')", new DateTime(1999, 11, 12, 0, 0, 0, DateTimeKind.Utc));
yield return new DataTypeSample("DateTime", typeof(DateTime), "toDateTime('1988-08-28 11:22:33')", new DateTime(1988, 08, 28, 11, 22, 33, DateTimeKind.Utc));
yield return new DataTypeSample("DateTime64(7)", typeof(DateTime), "toDateTime64('2043-03-01 18:34:04.4444444', 9)", new DateTime(644444444444444444, DateTimeKind.Utc));
}

[Test]
public static void EnsureAllTypesAreCovered()
{
var testedTypes = GetDataTypeSamples()
.Select(s => s.ClickHouseType)
.Select(TypeConverter.ParseClickHouseType)
.Select(t => t.TypeCode)
.Distinct()
.ToList();

CollectionAssert.AreEquivalent(TypeConverter.RegisteredTypes.Where(tc => tc != ClickHouseTypeCode.Nested).Distinct(), testedTypes);
if (FeatureFlags.DateTime64Supported)
yield return new DataTypeSample("DateTime64(7)", typeof(DateTime), "toDateTime64('2043-03-01 18:34:04.4444444', 9)", new DateTime(644444444444444444, DateTimeKind.Utc));

if (FeatureFlags.DecimalSupported)
{
yield return new DataTypeSample("Decimal32(3)", typeof(decimal), "toDecimal32(123.45, 3)", new decimal(123.45));
yield return new DataTypeSample("Decimal32(3)", typeof(decimal), "toDecimal32(-123.45, 3)", new decimal(-123.45));

yield return new DataTypeSample("Decimal64(7)", typeof(decimal), "toDecimal64(1.2345, 7)", new decimal(1.2345));
yield return new DataTypeSample("Decimal64(7)", typeof(decimal), "toDecimal64(-1.2345, 7)", new decimal(-1.2345));

yield return new DataTypeSample("Decimal128(9)", typeof(decimal), "toDecimal128(12.34, 9)", new decimal(12.34));
yield return new DataTypeSample("Decimal128(9)", typeof(decimal), "toDecimal128(-12.34, 9)", new decimal(-12.34));
}

if (FeatureFlags.IPv6Supported)
yield return new DataTypeSample("IPv6", typeof(IPAddress), "toIPv6('2001:0db8:85a3:0000:0000:8a2e:0370:7334')", IPAddress.Parse("2001:0db8:85a3:0000:0000:8a2e:0370:7334"));
}

[Test]
Expand Down
43 changes: 35 additions & 8 deletions ClickHouse.Client/ADO/ClickHouseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ internal async Task<HttpResponseMessage> PostSqlQueryAsync(string sqlQuery, Canc
return await HandleError(response, sqlQuery).ConfigureAwait(false);
}

internal async Task PostStreamAsync(Stream data, bool isCompressed, CancellationToken token)
internal async Task PostStreamAsync(string sql, Stream data, bool isCompressed, CancellationToken token)
{
var builder = CreateUriBuilder();
var builder = CreateUriBuilder(sql);
using var postMessage = new HttpRequestMessage(HttpMethod.Post, builder.ToString());
AddDefaultHttpHeaders(postMessage.Headers);

Expand All @@ -173,7 +173,7 @@ internal async Task PostStreamAsync(Stream data, bool isCompressed, Cancellation
}

using var response = await httpClient.SendAsync(postMessage, HttpCompletionOption.ResponseContentRead, token).ConfigureAwait(false);
await HandleError(response, null).ConfigureAwait(false);
await HandleError(response, sql).ConfigureAwait(false);
}

private static async Task<HttpResponseMessage> HandleError(HttpResponseMessage response, string query)
Expand Down Expand Up @@ -255,16 +255,12 @@ public override async Task OpenAsync(CancellationToken token)

public new ClickHouseCommand CreateCommand() => new ClickHouseCommand(this);

protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => throw new NotSupportedException();

protected override DbCommand CreateDbCommand() => CreateCommand();

/// <summary>
/// Detects whether server supports parameters through URI
/// ClickHouse Release 19.11.3.11, 2019-07-18: New Feature: Added support for prepared statements. #5331 (Alexander) #5630 (alexey-milovidov)
/// </summary>
/// <returns>whether parameters are supported</returns>
public virtual async Task<bool> SupportsHttpParameters()
internal virtual async Task<bool> SupportsHttpParameters()
{
if (State != ConnectionState.Open)
await OpenAsync();
Expand All @@ -273,6 +269,37 @@ public virtual async Task<bool> SupportsHttpParameters()
return Version.Parse(ServerVersion) >= new Version(19, 11, 3, 11);
}

/// <summary>
/// Detects whether server supports putting query into POST body along with binary data
/// Added somewhere in ClickHouse 20.5
/// </summary>
/// <returns>whether parameters are supported</returns>
internal virtual async Task<bool> SupportsInlineQuery()
{
if (State != ConnectionState.Open)
await OpenAsync();
if (string.IsNullOrWhiteSpace(ServerVersion))
throw new InvalidOperationException("Connection does not define server version");
return Version.Parse(ServerVersion) >= new Version(20, 5);
}

/// <summary>
/// 20.1.2.4 Add DateTime64 datatype with configurable sub-second precision. #7170 (Vasily Nemkov)
/// </summary>
/// <returns></returns>
internal virtual async Task<bool> SupportsDateTime64()
{
if (State != ConnectionState.Open)
await OpenAsync();
if (string.IsNullOrWhiteSpace(ServerVersion))
throw new InvalidOperationException("Connection does not define server version");
return Version.Parse(ServerVersion) >= new Version(20, 1, 2, 4);
}

protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => throw new NotSupportedException();

protected override DbCommand CreateDbCommand() => CreateCommand();

private void AddDefaultHttpHeaders(HttpRequestHeaders headers)
{
headers.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
Expand Down
10 changes: 7 additions & 3 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,17 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollec
private async Task PushBatch(ICollection<object[]> rows, ClickHouseType[] columnTypes, string[] columnNames, CancellationToken token)
{
var query = $"INSERT INTO {DestinationTableName} ({string.Join(", ", columnNames)}) FORMAT RowBinary";
bool useInlineQuery = await connection.SupportsInlineQuery();

using var stream = new MemoryStream() { Capacity = 512 * 1024 };
using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
{
// Write query at start of data, so that we are not limited by URI length
using (var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true))
if (useInlineQuery)
{
using var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true);
textWriter.WriteLine(query);
query = null; // Query was already written to POST body
}

using var writer = new ExtendedBinaryWriter(gzipStream);
using var streamer = new BinaryStreamWriter(writer);
Expand All @@ -155,7 +159,7 @@ private async Task PushBatch(ICollection<object[]> rows, ClickHouseType[] column
}
stream.Seek(0, SeekOrigin.Begin);

await connection.PostStreamAsync(stream, isCompressed: true, token).ConfigureAwait(false);
await connection.PostStreamAsync(query, stream, true, token).ConfigureAwait(false);
Interlocked.Add(ref rowsWritten, rows.Count);
}
}
Expand Down

0 comments on commit 811ac54

Please sign in to comment.