Skip to content

Commit

Permalink
feat(csharp/src/Drivers): introduce drivers for Apache systems built …
Browse files Browse the repository at this point in the history
…on Thrift (apache#1710)

This PR introduces new drivers built on the Thrift protocol: Hive,
Impala, and Spark. The main focus has been on Spark (including
Databricks) for the initial set of tests.

---------

Co-authored-by: David Coe <coedavid@umich.edu>
Co-authored-by: vikrantpuppala <vikrant.puppala@databricks.com>
Co-authored-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Vikrant Puppala <vikrantpuppala@gmail.com>
Co-authored-by: Gopal Lal <135012033+gopalldb@users.noreply.github.com>
Co-authored-by: Jade Wang <111902719+jadewang-db@users.noreply.github.com>
Co-authored-by: yunbodeng-db <104732431+yunbodeng-db@users.noreply.github.com>
Co-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Bruce Irschick <bruce.irschick@improving.com>
  • Loading branch information
10 people authored and cocoa-xu committed Apr 24, 2024
1 parent afa6843 commit f23ec51
Show file tree
Hide file tree
Showing 152 changed files with 44,808 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ python/doc/
# Egg metadata
*.egg-info

.vs/
.vscode
.idea/
.pytest_cache/
Expand Down
14 changes: 14 additions & 0 deletions csharp/Apache.Arrow.Adbc.sln
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Drivers.I
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Tests.Drivers.Interop.Snowflake", "test\Drivers\Interop\Snowflake\Apache.Arrow.Adbc.Tests.Drivers.Interop.Snowflake.csproj", "{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Drivers.Apache", "src\Drivers\Apache\Apache.Arrow.Adbc.Drivers.Apache.csproj", "{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Tests.Drivers.Apache", "test\Drivers\Apache\Apache.Arrow.Adbc.Tests.Drivers.Apache.csproj", "{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -70,6 +74,14 @@ Global
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}.Release|Any CPU.Build.0 = Release|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Release|Any CPU.Build.0 = Release|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -84,6 +96,8 @@ Global
{EA43BB7C-BC00-4701-BDF4-367880C2495C} = {C7290227-E925-47E7-8B6B-A8B171645D58}
{30024B6F-7BC1-4574-BE5A-924FBD6EAF83} = {FEB257A0-4FD3-495E-9A47-9E1649755445}
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7} = {C7290227-E925-47E7-8B6B-A8B171645D58}
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903} = {FEB257A0-4FD3-495E-9A47-9E1649755445}
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3} = {C7290227-E925-47E7-8B6B-A8B171645D58}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4795CF16-0FDB-4BE0-9768-5CF31564DC03}
Expand Down
6 changes: 3 additions & 3 deletions csharp/src/Client/SchemaConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public static DataTable ConvertArrowSchema(Schema schema, AdbcStatement adbcStat
{
if (f.Metadata.TryGetValue("precision", out string precisionValue))
{
if(!string.IsNullOrEmpty(precisionValue))
if (!string.IsNullOrEmpty(precisionValue))
row[SchemaTableColumn.NumericPrecision] = Convert.ToInt32(precisionValue);
}

if(f.Metadata.TryGetValue("scale", out string scaleValue))
if (f.Metadata.TryGetValue("scale", out string scaleValue))
{
if(!string.IsNullOrEmpty(scaleValue))
if (!string.IsNullOrEmpty(scaleValue))
row[SchemaTableColumn.NumericScale] = Convert.ToInt32(scaleValue);
}
}
Expand Down
15 changes: 15 additions & 0 deletions csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net472;net6.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ApacheThrift" Version="0.19.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Apache.Arrow.Adbc\Apache.Arrow.Adbc.csproj" />
</ItemGroup>

</Project>
170 changes: 170 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
using Thrift.Transport;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Connection : AdbcConnection
{
const string userAgent = "AdbcExperimental/0.0";

protected TOperationHandle operationHandle;
protected IReadOnlyDictionary<string, string> properties;
internal TTransport transport;
internal TCLIService.Client client;
internal TSessionHandle sessionHandle;

internal HiveServer2Connection() : this(null)
{

}

internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
{
this.properties = properties;
}

public void Open()
{
TProtocol protocol = CreateProtocol();
this.transport = protocol.Transport;
this.client = new TCLIService.Client(protocol);

var s0 = this.client.OpenSession(CreateSessionRequest()).Result;
this.sessionHandle = s0.SessionHandle;
}

protected abstract TProtocol CreateProtocol();
protected abstract TOpenSessionReq CreateSessionRequest();

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string catalogPattern, string dbSchemaPattern, string tableNamePattern, List<string> tableTypes, string columnNamePattern)
{
throw new NotImplementedException();
}

public override IArrowArrayStream GetInfo(List<int> codes)
{
throw new NotImplementedException();
}

public override IArrowArrayStream GetTableTypes()
{
throw new NotImplementedException();
}

protected void PollForResponse()
{
TGetOperationStatusResp statusResponse = null;
do
{
if (statusResponse != null) { Thread.Sleep(500); }
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle);
statusResponse = this.client.GetOperationStatus(request).Result;
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
}


public override void Dispose()
{
if (this.client != null)
{
TCloseSessionReq r6 = new TCloseSessionReq(this.sessionHandle);
this.client.CloseSession(r6).Wait();

this.transport.Close();
this.client.Dispose();
this.transport = null;
this.client = null;
}
}

protected Schema GetSchema()
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = this.client.GetResultSetMetadata(request).Result;
return SchemaParser.GetArrowSchema(response.Schema);
}

sealed class GetObjectsReader : IArrowArrayStream
{
HiveServer2Connection connection;
Schema schema;
List<TSparkArrowBatch> batches;
int index;
IArrowReader reader;

public GetObjectsReader(HiveServer2Connection connection, Schema schema)
{
this.connection = connection;
this.schema = schema;
}

public Schema Schema { get { return schema; } }

public async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
while (true)
{
if (this.reader != null)
{
RecordBatch next = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
if (next != null)
{
return next;
}
this.reader = null;
}

if (this.batches != null && this.index < this.batches.Count)
{
this.reader = new ArrowStreamReader(new ChunkStream(this.schema, this.batches[this.index++].Batch));
continue;
}

this.batches = null;
this.index = 0;

if (this.connection == null)
{
return null;
}

TFetchResultsReq request = new TFetchResultsReq(this.connection.operationHandle, TFetchOrientation.FETCH_NEXT, 50000);
TFetchResultsResp response = await this.connection.client.FetchResults(request, cancellationToken);
this.batches = response.Results.ArrowBatches;

if (!response.HasMoreRows)
{
this.connection = null;
}
}
}

public void Dispose()
{
}
}
}
}
69 changes: 69 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Exception.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public class HiveServer2Exception : AdbcException
{
private string _sqlState;
private int _nativeError;

public HiveServer2Exception()
{
}

public HiveServer2Exception(string message) : base(message)
{
}

public HiveServer2Exception(string message, AdbcStatusCode statusCode) : base(message, statusCode)
{
}

public HiveServer2Exception(string message, Exception innerException) : base(message, innerException)
{
}

public HiveServer2Exception(string message, AdbcStatusCode statusCode, Exception innerException) : base(message, statusCode, innerException)
{
}

public override string SqlState
{
get { return _sqlState; }
}

public override int NativeError
{
get { return _nativeError; }
}

internal HiveServer2Exception SetSqlState(string sqlState)
{
_sqlState = sqlState;
return this;
}

internal HiveServer2Exception SetNativeError(int nativeError)
{
_nativeError = nativeError;
return this;
}
}
}
Loading

0 comments on commit f23ec51

Please sign in to comment.