Skip to content

Commit ec41720

Browse files
committed
error handling for operation status poller
1 parent 38dc738 commit ec41720

File tree

3 files changed

+34
-5
lines changed

3 files changed

+34
-5
lines changed

csharp/src/Drivers/Databricks/BaseDatabricksReader.cs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
*/
1717

1818
using System;
19-
using Apache.Arrow.Adbc.Drivers.Apache;
19+
using System.Threading.Tasks;
20+
using System.Threading;
2021
using Apache.Arrow.Adbc.Tracing;
2122

2223
namespace Apache.Arrow.Adbc.Drivers.Databricks
@@ -94,5 +95,35 @@ protected void ThrowIfDisposed()
9495
public override string AssemblyName => DatabricksConnection.s_assemblyName;
9596

9697
public override string AssemblyVersion => DatabricksConnection.s_assemblyVersion;
98+
99+
public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
100+
{
101+
try
102+
{
103+
var result = await ReadNextRecordBatchInternalAsync(cancellationToken);
104+
105+
// Stop the poller when we've reached the end of results
106+
if (result == null)
107+
{
108+
StopOperationStatusPoller();
109+
}
110+
111+
return result;
112+
}
113+
catch
114+
{
115+
// Stop the poller immediately on any exception to prevent unnecessary polling
116+
StopOperationStatusPoller();
117+
throw;
118+
}
119+
}
120+
121+
/// <summary>
122+
/// Internal method that derived classes implement to read the next record batch.
123+
/// This method is called by ReadNextRecordBatchAsync which provides exception handling.
124+
/// </summary>
125+
/// <param name="cancellationToken">The cancellation token.</param>
126+
/// <returns>The next record batch, or null if there are no more batches.</returns>
127+
protected abstract ValueTask<RecordBatch?> ReadNextRecordBatchInternalAsync(CancellationToken cancellationToken);
97128
}
98129
}

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public CloudFetchReader(DatabricksStatement statement, Schema schema, TFetchResu
8181
/// </summary>
8282
/// <param name="cancellationToken">The cancellation token.</param>
8383
/// <returns>The next record batch, or null if there are no more batches.</returns>
84-
public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
84+
protected override async ValueTask<RecordBatch?> ReadNextRecordBatchInternalAsync(CancellationToken cancellationToken = default)
8585
{
8686
return await this.TraceActivityAsync(async _ =>
8787
{
@@ -155,7 +155,6 @@ public CloudFetchReader(DatabricksStatement statement, Schema schema, TFetchResu
155155
}
156156
}
157157

158-
StopOperationStatusPoller();
159158
// If we get here, there are no more files
160159
return null;
161160
}

csharp/src/Drivers/Databricks/DatabricksReader.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public DatabricksReader(DatabricksStatement statement, Schema schema, TFetchResu
4747
}
4848
}
4949

50-
public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
50+
protected override async ValueTask<RecordBatch?> ReadNextRecordBatchInternalAsync(CancellationToken cancellationToken = default)
5151
{
5252
return await this.TraceActivity(async activity =>
5353
{
@@ -77,7 +77,6 @@ public DatabricksReader(DatabricksStatement statement, Schema schema, TFetchResu
7777

7878
if (this.hasNoMoreRows)
7979
{
80-
StopOperationStatusPoller();
8180
return null;
8281
}
8382

0 commit comments

Comments
 (0)