Skip to content

Commit

Permalink
update for code review comments #3
Browse files Browse the repository at this point in the history
  • Loading branch information
birschick-bq committed Sep 3, 2024
1 parent 9c6f3ed commit 3c5a4db
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 65 deletions.
20 changes: 10 additions & 10 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal abstract class HiveServer2Connection : AdbcConnection
private readonly Lazy<string> _vendorVersion;
private readonly Lazy<string> _vendorName;

public HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
{
Properties = properties;
// Note: "LazyThreadSafetyMode.PublicationOnly" is thread-safe initialization where
Expand All @@ -47,18 +47,18 @@ public HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
_vendorName = new Lazy<string>(() => GetInfoTypeStringValue(TGetInfoType.CLI_DBMS_NAME), LazyThreadSafetyMode.PublicationOnly);
}

public TCLIService.Client Client
internal TCLIService.Client Client
{
get { return _client ?? throw new InvalidOperationException("connection not open"); }
}

public string VendorVersion => _vendorVersion.Value;
internal string VendorVersion => _vendorVersion.Value;

public string VendorName => _vendorName.Value;
internal string VendorName => _vendorName.Value;

public IReadOnlyDictionary<string, string> Properties { get; }
internal IReadOnlyDictionary<string, string> Properties { get; }

public async Task OpenAsync()
internal async Task OpenAsync()
{
TTransport transport = await CreateTransportAsync();
TProtocol protocol = await CreateProtocolAsync(transport);
Expand All @@ -69,17 +69,17 @@ public async Task OpenAsync()
SessionHandle = session.SessionHandle;
}

public TSessionHandle? SessionHandle { get; private set; }
internal TSessionHandle? SessionHandle { get; private set; }

protected abstract Task<TTransport> CreateTransportAsync();

protected abstract Task<TProtocol> CreateProtocolAsync(TTransport transport);

protected abstract TOpenSessionReq CreateSessionRequest();

public abstract SchemaParser SchemaParser { get; }
internal abstract SchemaParser SchemaParser { get; }

public abstract IArrowArrayStream NewReader<T>(T statement, Schema schema) where T : HiveServer2Statement;
internal abstract IArrowArrayStream NewReader<T>(T statement, Schema schema) where T : HiveServer2Statement;

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern)
{
Expand Down Expand Up @@ -135,7 +135,7 @@ public override void Dispose()
}
}

public static async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
internal static async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
{
TGetResultSetMetadataReq request = new(operationHandle);
TGetResultSetMetadataResp response = await client.GetResultSetMetadata(request, cancellationToken);
Expand Down
4 changes: 2 additions & 2 deletions csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public override IArrowArrayStream GetTableTypes()

public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName) => throw new System.NotImplementedException();

public override SchemaParser SchemaParser => throw new NotImplementedException();
internal override SchemaParser SchemaParser => throw new NotImplementedException();

public override IArrowArrayStream NewReader<T>(T statement, Schema schema) => throw new NotImplementedException();
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema) => throw new NotImplementedException();
}
}
22 changes: 11 additions & 11 deletions csharp/src/Drivers/Apache/Spark/SparkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -996,17 +996,17 @@ protected static Uri GetBaseAddress(string? uri, string? hostName, string? path,
protected abstract void ValidateConnection();
protected abstract void ValidateAuthentication();

public abstract Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response);
public abstract Task<TRowSet> GetRowSetAsync(TGetColumnsResp response);
public abstract Task<TRowSet> GetRowSetAsync(TGetTablesResp response);
public abstract Task<TRowSet> GetRowSetAsync(TGetCatalogsResp getCatalogsResp);
public abstract Task<TRowSet> GetRowSetAsync(TGetSchemasResp getSchemasResp);
public abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response);
public abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response);
public abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response);
public abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response);

public abstract SparkServerType ServerType { get; }
protected abstract Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response);
protected abstract Task<TRowSet> GetRowSetAsync(TGetColumnsResp response);
protected abstract Task<TRowSet> GetRowSetAsync(TGetTablesResp response);
protected abstract Task<TRowSet> GetRowSetAsync(TGetCatalogsResp getCatalogsResp);
protected abstract Task<TRowSet> GetRowSetAsync(TGetSchemasResp getSchemasResp);
protected abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response);
protected abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response);
protected abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response);
protected abstract Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response);

internal abstract SparkServerType ServerType { get; }

internal struct TableInfo(string type)
{
Expand Down
24 changes: 12 additions & 12 deletions csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public SparkDatabricksConnection(IReadOnlyDictionary<string, string> properties)
{
}

public override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new SparkDatabricksReader(statement, schema);
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new SparkDatabricksReader(statement, schema);

public override SchemaParser SchemaParser => new DatabricksSchemaParser();
internal override SchemaParser SchemaParser => new DatabricksSchemaParser();

public override SparkServerType ServerType => SparkServerType.Databricks;
internal override SparkServerType ServerType => SparkServerType.Databricks;

protected override TOpenSessionReq CreateSessionRequest()
{
Expand All @@ -45,24 +45,24 @@ protected override TOpenSessionReq CreateSessionRequest()
return req;
}

public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);

public override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response) =>
Task.FromResult(response.DirectResults.ResultSet.Results);
public override Task<TRowSet> GetRowSetAsync(TGetColumnsResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetColumnsResp response) =>
Task.FromResult(response.DirectResults.ResultSet.Results);
public override Task<TRowSet> GetRowSetAsync(TGetTablesResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetTablesResp response) =>
Task.FromResult(response.DirectResults.ResultSet.Results);
public override Task<TRowSet> GetRowSetAsync(TGetCatalogsResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetCatalogsResp response) =>
Task.FromResult(response.DirectResults.ResultSet.Results);
public override Task<TRowSet> GetRowSetAsync(TGetSchemasResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp response) =>
Task.FromResult(response.DirectResults.ResultSet.Results);

internal class DatabricksSchemaParser : SchemaParser
Expand Down
24 changes: 12 additions & 12 deletions csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected override void ValidateConnection()
};
}

public override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new HiveServer2Reader(statement, schema);
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new HiveServer2Reader(statement, schema);

protected override Task<TTransport> CreateTransportAsync()
{
Expand Down Expand Up @@ -195,23 +195,23 @@ protected override TOpenSessionReq CreateSessionRequest()
return req;
}

public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response) =>
GetResultSetMetadataAsync(response.OperationHandle, Client);
public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response) =>
GetResultSetMetadataAsync(response.OperationHandle, Client);
public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetColumnsResp response) =>
GetResultSetMetadataAsync(response.OperationHandle, Client);
public override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response) =>
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetTablesResp response) =>
GetResultSetMetadataAsync(response.OperationHandle, Client);
public override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp response) =>
FetchResultsAsync(response.OperationHandle);
public override Task<TRowSet> GetRowSetAsync(TGetColumnsResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetColumnsResp response) =>
FetchResultsAsync(response.OperationHandle);
public override Task<TRowSet> GetRowSetAsync(TGetTablesResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetTablesResp response) =>
FetchResultsAsync(response.OperationHandle);
public override Task<TRowSet> GetRowSetAsync(TGetCatalogsResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetCatalogsResp response) =>
FetchResultsAsync(response.OperationHandle);
public override Task<TRowSet> GetRowSetAsync(TGetSchemasResp response) =>
protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp response) =>
FetchResultsAsync(response.OperationHandle);

private async Task<TRowSet> FetchResultsAsync(TOperationHandle operationHandle, long batchSize = BatchSizeDefault, CancellationToken cancellationToken = default)
Expand All @@ -234,9 +234,9 @@ internal static async Task<TFetchResultsResp> FetchNextAsync(TOperationHandle op
return response;
}

public override SchemaParser SchemaParser => new HiveServer2SchemaParser();
internal override SchemaParser SchemaParser => new HiveServer2SchemaParser();

public override SparkServerType ServerType => SparkServerType.Http;
internal override SparkServerType ServerType => SparkServerType.Http;

internal class HiveServer2SchemaParser : SchemaParser
{
Expand Down
2 changes: 1 addition & 1 deletion csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@ protected override TOpenSessionReq CreateSessionRequest()
return request;
}

public override SparkServerType ServerType => SparkServerType.Standard;
internal override SparkServerType ServerType => SparkServerType.Standard;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ namespace Apache.Hive.Service.Rpc.Thrift

public partial class TBinaryColumn : TBase
{
private const int IntSize = 4;

public BinaryArray Values { get; set; }

public TBinaryColumn()
Expand Down Expand Up @@ -84,14 +82,12 @@ public TBinaryColumn DeepCopy()

values = new ArrowBuffer.Builder<byte>();
int offset = 0;
offsetBuffer = new byte[(length + 1) * IntSize];
offsetBuffer = new byte[(length + 1) * sizeof(int)];
var memory = offsetBuffer.AsMemory();
var typedMemory = Unsafe.As<Memory<byte>, Memory<int>>(ref memory).Slice(0, length + 1);

for(int _i197 = 0; _i197 < length; ++_i197)
{
//typedMemory.Span[_i197] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, _i197 * IntSize);
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, _i197 * sizeof(int));
var size = await iprot.ReadI32Async(cancellationToken);
offset += size;

Expand All @@ -110,12 +106,7 @@ public TBinaryColumn DeepCopy()
await transport.ReadExactlyAsync(tmp.AsMemory(0, size), cancellationToken);
values.Append(tmp.AsMemory(0, size).Span);
}
#if NET6_0_OR_GREATER
typedMemory.Span[length] = offset;
#else
BitConverter.GetBytes(offset).CopyTo(offsetBuffer, length * IntSize);
#endif
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, length * IntSize);
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, length * sizeof(int));

await iprot.ReadListEndAsync(cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public TI16Column DeepCopy()

buffer = new byte[length * sizeof(short)];
var memory = buffer.AsMemory();
var typedMemory = Unsafe.As<Memory<byte>, Memory<short>>(ref memory).Slice(0, length);
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
await transport.ReadExactlyAsync(memory, cancellationToken);
for (int _i152 = 0; _i152 < length; ++_i152)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public TI32Column DeepCopy()
length = _list160.Count;
buffer = new byte[length * sizeof(int)];
var memory = buffer.AsMemory();
var typedMemory = Unsafe.As<Memory<byte>, Memory<int>>(ref memory).Slice(0, length);
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
await transport.ReadExactlyAsync(memory, cancellationToken);
for (int _i161 = 0; _i161 < length; ++_i161)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ public TStringColumn DeepCopy()
int offset = 0;
offsetBuffer = new byte[(length + 1) * 4];
var memory = offsetBuffer.AsMemory();
var typedMemory = Unsafe.As<Memory<byte>, Memory<int>>(ref memory).Slice(0, length + 1);

for(int _i188 = 0; _i188 < length; ++_i188)
{
//typedMemory.Span[_i188] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, _i188 * 4);

var size = await iprot.ReadI32Async(cancellationToken);
Expand All @@ -109,7 +107,6 @@ public TStringColumn DeepCopy()
await transport.ReadExactlyAsync(tmp.AsMemory(0, size), cancellationToken);
values.Append(tmp.AsMemory(0, size).Span);
}
//typedMemory.Span[length] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, length * 4);

await iprot.ReadListEndAsync(cancellationToken);
Expand Down

0 comments on commit 3c5a4db

Please sign in to comment.