Skip to content

Commit be06c48

Browse files
committed
nit fix
nit fix
1 parent 1f0240a commit be06c48

File tree

7 files changed

+24
-12
lines changed

7 files changed

+24
-12
lines changed

csharp/src/Drivers/Databricks/BaseDatabricksReader.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717

1818
using System;
19-
using System.Threading.Tasks;
20-
using System.Threading;
2119
using Apache.Arrow.Adbc.Tracing;
2220

2321
namespace Apache.Arrow.Adbc.Drivers.Databricks

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ protected override void Dispose(bool disposing)
180180
this.downloadManager.Dispose();
181181
this.downloadManager = null;
182182
}
183+
base.Dispose(disposing);
183184
}
184185
}
185186
}

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using System.Linq;
2323
using System.Threading;
2424
using System.Threading.Tasks;
25+
using Apache.Arrow.Adbc.Drivers.Apache;
2526
using Apache.Hive.Service.Rpc.Thrift;
2627

2728
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
@@ -160,8 +161,11 @@ public async Task StopAsync()
160161

161162
request.StartRowOffset = offset;
162163

164+
// Cancelling mid-request breaks the client; Dispose() should not break the underlying client
165+
CancellationToken expiringToken = ApacheUtility.GetCancellationToken(DatabricksConstants.DefaultCloudFetchRequestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
166+
163167
// Fetch results
164-
TFetchResultsResp response = await _statement.Client.FetchResults(request, cancellationToken);
168+
TFetchResultsResp response = await _statement.Client.FetchResults(request, expiringToken);
165169

166170
// Process the results
167171
if (response.Status.StatusCode == TStatusCode.SUCCESS_STATUS &&
@@ -247,10 +251,6 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken)
247251
_downloadQueue.Add(EndOfResultsGuard.Instance, cancellationToken);
248252
_isCompleted = true;
249253
}
250-
catch (OperationCanceledException)
251-
{
252-
// downloadQueue could be full, would block indefinitely
253-
}
254254
catch (Exception ex)
255255
{
256256
Debug.WriteLine($"Unhandled error in fetcher: {ex.Message}");
@@ -261,7 +261,7 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken)
261261
// Add the end of results guard to the queue even in case of error
262262
try
263263
{
264-
_downloadQueue.Add(EndOfResultsGuard.Instance, CancellationToken.None);
264+
_downloadQueue.TryAdd(EndOfResultsGuard.Instance, 0);
265265
}
266266
catch (Exception)
267267
{
@@ -286,7 +286,8 @@ private async Task FetchNextResultBatchAsync(long? offset, CancellationToken can
286286
TFetchResultsResp response;
287287
try
288288
{
289-
response = await _statement.Client.FetchResults(request, cancellationToken).ConfigureAwait(false);
289+
CancellationToken expiringToken = ApacheUtility.GetCancellationToken(DatabricksConstants.DefaultCloudFetchRequestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
290+
response = await _statement.Client.FetchResults(request, expiringToken).ConfigureAwait(false);
290291
}
291292
catch (Exception ex)
292293
{

csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ protected override void Dispose(bool disposing)
130130
_activeReader?.Dispose();
131131
DisposeOperationStatusPoller();
132132
}
133+
_activeReader = null;
133134
base.Dispose(disposing);
134135
}
135136

csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void Dispose()
101101
try
102102
{
103103
if (_operationStatusPollingTask != null)
104-
_operationStatusPollingTask?.ConfigureAwait(false).GetAwaiter().GetResult();
104+
_operationStatusPollingTask?.GetAwaiter().GetResult();
105105
}
106106
catch (OperationCanceledException)
107107
{
@@ -110,6 +110,8 @@ public void Dispose()
110110

111111
_internalCts.Dispose();
112112
}
113+
_internalCts = null;
114+
_operationStatusPollingTask = null;
113115
}
114116
}
115117
}

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ public class DatabricksConstants
235235
/// </summary>
236236
public const int DefaultOperationStatusRequestTimeoutSeconds = 30;
237237

238+
/// <summary>
239+
/// Default timeout in seconds for CloudFetch requests
240+
/// </summary>
241+
public const int DefaultCloudFetchRequestTimeoutSeconds = 30;
242+
238243
/// <summary>
239244
/// OAuth grant type constants
240245
/// </summary>

csharp/test/Drivers/Databricks/E2E/StatementTests.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,14 @@ public async Task AllStatementTypesDisposeWithoutErrors(string statementType, st
187187
var batch = await queryResult.Stream.ReadNextRecordBatchAsync();
188188
// Note: batch might be null for empty results, that's OK
189189

190+
// test disposing the stream does not throw
191+
var streamException = Record.Exception(() => queryResult.Stream.Dispose());
192+
Assert.Null(streamException);
193+
190194
// The critical test: disposal should not throw any exceptions
191195
// This specifically tests the fix for the GetColumns bug where _directResults wasn't set
192-
var exception = Record.Exception(() => statement.Dispose());
193-
Assert.Null(exception);
196+
var statementException = Record.Exception(() => statement.Dispose());
197+
Assert.Null(statementException);
194198
}
195199
catch (Exception ex)
196200
{

0 commit comments

Comments
 (0)