Skip to content

Commit d3d068e

Browse files
authored
fix(csharp/src/Drivers/Databricks): Reader Refactors (#3254)
Some refactors to make it easier to unit test. (follow-up pr: #3255) 1. Move reader-related logic to Databricks/Reader 2. TracingStatement extends ITracingStatement - this lets us mock IHiveServer2Statement more easily 3. IOperationStatusPoller
1 parent a739955 commit d3d068e

24 files changed

+131
-34
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
namespace Apache.Arrow.Adbc.Tracing
19+
{
20+
/// <summary>
21+
/// Interface for statements that support tracing capabilities.
22+
/// This interface provides the essential properties needed by TracingReader
23+
/// and other tracing-aware components.
24+
/// </summary>
25+
public interface ITracingStatement : IActivityTracer
26+
{
27+
}
28+
}

csharp/src/Apache.Arrow.Adbc/Tracing/TracingReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ namespace Apache.Arrow.Adbc.Tracing
2424
{
2525
public abstract class TracingReader : IArrowArrayStream, IActivityTracer
2626
{
27-
private readonly TracingStatement _statement;
27+
private readonly ITracingStatement _statement;
2828

29-
protected TracingReader(TracingStatement statement)
29+
protected TracingReader(ITracingStatement statement)
3030
{
3131
_statement = statement;
3232
}

csharp/src/Apache.Arrow.Adbc/Tracing/TracingStatement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
namespace Apache.Arrow.Adbc.Tracing
2121
{
22-
public abstract class TracingStatement : AdbcStatement, IActivityTracer
22+
public abstract class TracingStatement : AdbcStatement, ITracingStatement
2323
{
2424
private readonly ActivityTrace _trace;
2525

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
using Apache.Arrow.Adbc.Drivers.Apache.Hive2.Client;
2929
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
3030
using Apache.Arrow.Adbc.Drivers.Databricks.Auth;
31+
using Apache.Arrow.Adbc.Drivers.Databricks.Reader;
3132
using Apache.Arrow.Ipc;
3233
using Apache.Hive.Service.Rpc.Thrift;
3334
using Thrift.Protocol;

csharp/src/Drivers/Databricks/DatabricksStatement.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
using Apache.Arrow.Adbc.Drivers.Apache;
2525
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
2626
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
27-
using Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch;
2827
using Apache.Arrow.Adbc.Drivers.Databricks.Result;
2928
using Apache.Arrow.Types;
3029
using Apache.Hive.Service.Rpc.Thrift;
@@ -144,6 +143,12 @@ public TSparkDirectResults? DirectResults
144143
// Expose QueryTimeoutSeconds for IHiveServer2Statement
145144
int IHiveServer2Statement.QueryTimeoutSeconds => base.QueryTimeoutSeconds;
146145

146+
// Expose BatchSize through the interface
147+
long IHiveServer2Statement.BatchSize => BatchSize;
148+
149+
// Expose Connection through the interface
150+
HiveServer2Connection IHiveServer2Statement.Connection => Connection;
151+
147152
public override void SetOption(string key, string value)
148153
{
149154
switch (key)

csharp/src/Drivers/Databricks/CloudFetch/IHiveServer2Statement.cs renamed to csharp/src/Drivers/Databricks/IHiveServer2Statement.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18+
using Apache.Arrow.Adbc.Tracing;
19+
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
1820
using Apache.Hive.Service.Rpc.Thrift;
1921

20-
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
22+
namespace Apache.Arrow.Adbc.Drivers.Databricks
2123
{
2224
/// <summary>
2325
/// Interface for accessing HiveServer2Statement properties needed by CloudFetchResultFetcher.
2426
/// </summary>
25-
internal interface IHiveServer2Statement
27+
internal interface IHiveServer2Statement : ITracingStatement
2628
{
2729
/// <summary>
2830
/// Gets the operation handle.
@@ -49,5 +51,15 @@ internal interface IHiveServer2Statement
4951
/// Gets the query timeout in seconds.
5052
/// </summary>
5153
int QueryTimeoutSeconds { get; }
54+
55+
/// <summary>
56+
/// Gets the batch size for fetching results.
57+
/// </summary>
58+
long BatchSize { get; }
59+
60+
/// <summary>
61+
/// Gets the connection associated with this statement.
62+
/// </summary>
63+
HiveServer2Connection Connection { get; }
5264
}
5365
}

csharp/src/Drivers/Databricks/BaseDatabricksReader.cs renamed to csharp/src/Drivers/Databricks/Reader/BaseDatabricksReader.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,23 @@
1616
*/
1717

1818
using System;
19+
using Apache.Arrow.Adbc.Drivers.Databricks;
1920
using Apache.Arrow.Adbc.Tracing;
2021

21-
namespace Apache.Arrow.Adbc.Drivers.Databricks
22+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
2223
{
2324
/// <summary>
2425
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader
2526
/// </summary>
2627
internal abstract class BaseDatabricksReader : TracingReader
2728
{
28-
protected DatabricksStatement statement;
29+
protected IHiveServer2Statement statement;
2930
protected readonly Schema schema;
3031
protected readonly bool isLz4Compressed;
3132
protected bool hasNoMoreRows = false;
3233
private bool isDisposed;
3334

34-
protected BaseDatabricksReader(DatabricksStatement statement, Schema schema, bool isLz4Compressed)
35+
protected BaseDatabricksReader(IHiveServer2Statement statement, Schema schema, bool isLz4Compressed)
3536
: base(statement)
3637
{
3738
this.schema = schema;

csharp/src/Drivers/Databricks/CloudFetch/Clock.cs renamed to csharp/src/Drivers/Databricks/Reader/CloudFetch/Clock.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
using System;
1919

20-
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
20+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
2121
{
2222
/// <summary>
2323
/// Abstraction for time operations to enable testing with controlled time.

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs renamed to csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
using System.Threading;
2323
using System.Threading.Tasks;
2424
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
25+
using Apache.Arrow.Adbc.Drivers.Databricks;
2526
using Apache.Hive.Service.Rpc.Thrift;
2627

27-
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
28+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
2829
{
2930
/// <summary>
3031
/// Manages the CloudFetch download pipeline.
@@ -41,7 +42,7 @@ internal sealed class CloudFetchDownloadManager : ICloudFetchDownloadManager
4142
private const int DefaultMaxUrlRefreshAttempts = 3;
4243
private const int DefaultUrlExpirationBufferSeconds = 60;
4344

44-
private readonly DatabricksStatement _statement;
45+
private readonly IHiveServer2Statement _statement;
4546
private readonly Schema _schema;
4647
private readonly bool _isLz4Compressed;
4748
private readonly ICloudFetchMemoryBufferManager _memoryManager;
@@ -60,7 +61,7 @@ internal sealed class CloudFetchDownloadManager : ICloudFetchDownloadManager
6061
/// <param name="statement">The HiveServer2 statement.</param>
6162
/// <param name="schema">The Arrow schema.</param>
6263
/// <param name="isLz4Compressed">Whether the results are LZ4 compressed.</param>
63-
public CloudFetchDownloadManager(DatabricksStatement statement, Schema schema, TFetchResultsResp? initialResults, bool isLz4Compressed, HttpClient httpClient)
64+
public CloudFetchDownloadManager(IHiveServer2Statement statement, Schema schema, TFetchResultsResp? initialResults, bool isLz4Compressed, HttpClient httpClient)
6465
{
6566
_statement = statement ?? throw new ArgumentNullException(nameof(statement));
6667
_schema = schema ?? throw new ArgumentNullException(nameof(schema));

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs renamed to csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
using System.Threading.Tasks;
2525
using K4os.Compression.LZ4.Streams;
2626

27-
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
27+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
2828
{
2929
/// <summary>
3030
/// Downloads files from URLs.

0 commit comments

Comments
 (0)