Skip to content

Commit 317c9c9

Browse files
authored
feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch (#2855)
### Problem The Databricks driver's CloudFetch functionality was not properly handling expired cloud file URLs, which could lead to failed downloads and errors during query execution. The system needed a way to track, cache, and refresh presigned URLs before they expire. ### Solution - Improve `CloudFetchResultFetcher` class that: - Manages a cache of cloud file URLs with their expiration times - Proactively refreshes URLs that are about to expire - Provides thread-safe access to URL information - Added an `IClock` interface and implementations to facilitate testing with controlled time - Extended the `IDownloadResult` interface to support URL refreshing and expiration checking - Updated namespace from `Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch` to `Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch` for better organization
1 parent aae84d2 commit 317c9c9

17 files changed

+758
-185
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
20+
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
21+
{
22+
/// <summary>
23+
/// Abstraction for time operations to enable testing with controlled time.
24+
/// </summary>
25+
internal interface IClock
26+
{
27+
/// <summary>
28+
/// Gets the current UTC time.
29+
/// </summary>
30+
DateTime UtcNow { get; }
31+
}
32+
33+
/// <summary>
34+
/// Default implementation that uses system time.
35+
/// </summary>
36+
internal class SystemClock : IClock
37+
{
38+
public DateTime UtcNow => DateTime.UtcNow;
39+
}
40+
}

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@
2222
using System.Threading;
2323
using System.Threading.Tasks;
2424
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
25-
using Apache.Arrow.Adbc.Drivers.Databricks;
2625

27-
namespace Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
26+
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
2827
{
2928
/// <summary>
3029
/// Manages the CloudFetch download pipeline.
@@ -38,6 +37,8 @@ internal sealed class CloudFetchDownloadManager : ICloudFetchDownloadManager
3837
private const bool DefaultPrefetchEnabled = true;
3938
private const int DefaultFetchBatchSize = 2000000;
4039
private const int DefaultTimeoutMinutes = 5;
40+
private const int DefaultMaxUrlRefreshAttempts = 3;
41+
private const int DefaultUrlExpirationBufferSeconds = 60;
4142

4243
private readonly DatabricksStatement _statement;
4344
private readonly Schema _schema;
@@ -151,6 +152,34 @@ public CloudFetchDownloadManager(DatabricksStatement statement, Schema schema, b
151152
}
152153
}
153154

155+
// Parse URL expiration buffer seconds
156+
int urlExpirationBufferSeconds = DefaultUrlExpirationBufferSeconds;
157+
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, out string? urlExpirationBufferStr))
158+
{
159+
if (int.TryParse(urlExpirationBufferStr, out int parsedUrlExpirationBuffer) && parsedUrlExpirationBuffer > 0)
160+
{
161+
urlExpirationBufferSeconds = parsedUrlExpirationBuffer;
162+
}
163+
else
164+
{
165+
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchUrlExpirationBufferSeconds}: {urlExpirationBufferStr}. Expected a positive integer.");
166+
}
167+
}
168+
169+
// Parse max URL refresh attempts
170+
int maxUrlRefreshAttempts = DefaultMaxUrlRefreshAttempts;
171+
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, out string? maxUrlRefreshAttemptsStr))
172+
{
173+
if (int.TryParse(maxUrlRefreshAttemptsStr, out int parsedMaxUrlRefreshAttempts) && parsedMaxUrlRefreshAttempts > 0)
174+
{
175+
maxUrlRefreshAttempts = parsedMaxUrlRefreshAttempts;
176+
}
177+
else
178+
{
179+
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchMaxUrlRefreshAttempts}: {maxUrlRefreshAttemptsStr}. Expected a positive integer.");
180+
}
181+
}
182+
154183
// Initialize the memory manager
155184
_memoryManager = new CloudFetchMemoryBufferManager(memoryBufferSizeMB);
156185

@@ -161,23 +190,27 @@ public CloudFetchDownloadManager(DatabricksStatement statement, Schema schema, b
161190
_httpClient = httpClient;
162191
_httpClient.Timeout = TimeSpan.FromMinutes(timeoutMinutes);
163192

164-
// Initialize the result fetcher
193+
// Initialize the result fetcher with URL management capabilities
165194
_resultFetcher = new CloudFetchResultFetcher(
166195
_statement,
167196
_memoryManager,
168197
_downloadQueue,
169-
DefaultFetchBatchSize);
198+
DefaultFetchBatchSize,
199+
urlExpirationBufferSeconds);
170200

171201
// Initialize the downloader
172202
_downloader = new CloudFetchDownloader(
173203
_downloadQueue,
174204
_resultQueue,
175205
_memoryManager,
176206
_httpClient,
207+
_resultFetcher,
177208
parallelDownloads,
178209
_isLz4Compressed,
179210
maxRetries,
180-
retryDelayMs);
211+
retryDelayMs,
212+
maxUrlRefreshAttempts,
213+
urlExpirationBufferSeconds);
181214
}
182215

183216
/// <summary>

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
using System.Threading.Tasks;
2525
using K4os.Compression.LZ4.Streams;
2626

27-
namespace Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
27+
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
2828
{
2929
/// <summary>
3030
/// Downloads files from URLs.
@@ -35,10 +35,13 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
3535
private readonly BlockingCollection<IDownloadResult> _resultQueue;
3636
private readonly ICloudFetchMemoryBufferManager _memoryManager;
3737
private readonly HttpClient _httpClient;
38+
private readonly ICloudFetchResultFetcher _resultFetcher;
3839
private readonly int _maxParallelDownloads;
3940
private readonly bool _isLz4Compressed;
4041
private readonly int _maxRetries;
4142
private readonly int _retryDelayMs;
43+
private readonly int _maxUrlRefreshAttempts;
44+
private readonly int _urlExpirationBufferSeconds;
4245
private readonly SemaphoreSlim _downloadSemaphore;
4346
private Task? _downloadTask;
4447
private CancellationTokenSource? _cancellationTokenSource;
@@ -53,29 +56,37 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
5356
/// <param name="resultQueue">The queue to add completed downloads to.</param>
5457
/// <param name="memoryManager">The memory buffer manager.</param>
5558
/// <param name="httpClient">The HTTP client to use for downloads.</param>
59+
/// <param name="resultFetcher">The result fetcher that manages URLs.</param>
5660
/// <param name="maxParallelDownloads">The maximum number of parallel downloads.</param>
5761
/// <param name="isLz4Compressed">Whether the results are LZ4 compressed.</param>
58-
/// <param name="logger">The logger instance.</param>
5962
/// <param name="maxRetries">The maximum number of retry attempts.</param>
6063
/// <param name="retryDelayMs">The delay between retry attempts in milliseconds.</param>
64+
/// <param name="maxUrlRefreshAttempts">The maximum number of URL refresh attempts.</param>
65+
/// <param name="urlExpirationBufferSeconds">Buffer time in seconds before URL expiration to trigger refresh.</param>
6166
public CloudFetchDownloader(
6267
BlockingCollection<IDownloadResult> downloadQueue,
6368
BlockingCollection<IDownloadResult> resultQueue,
6469
ICloudFetchMemoryBufferManager memoryManager,
6570
HttpClient httpClient,
71+
ICloudFetchResultFetcher resultFetcher,
6672
int maxParallelDownloads,
6773
bool isLz4Compressed,
6874
int maxRetries = 3,
69-
int retryDelayMs = 500)
75+
int retryDelayMs = 500,
76+
int maxUrlRefreshAttempts = 3,
77+
int urlExpirationBufferSeconds = 60)
7078
{
7179
_downloadQueue = downloadQueue ?? throw new ArgumentNullException(nameof(downloadQueue));
7280
_resultQueue = resultQueue ?? throw new ArgumentNullException(nameof(resultQueue));
7381
_memoryManager = memoryManager ?? throw new ArgumentNullException(nameof(memoryManager));
7482
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
83+
_resultFetcher = resultFetcher ?? throw new ArgumentNullException(nameof(resultFetcher));
7584
_maxParallelDownloads = maxParallelDownloads > 0 ? maxParallelDownloads : throw new ArgumentOutOfRangeException(nameof(maxParallelDownloads));
7685
_isLz4Compressed = isLz4Compressed;
7786
_maxRetries = maxRetries > 0 ? maxRetries : throw new ArgumentOutOfRangeException(nameof(maxRetries));
7887
_retryDelayMs = retryDelayMs > 0 ? retryDelayMs : throw new ArgumentOutOfRangeException(nameof(retryDelayMs));
88+
_maxUrlRefreshAttempts = maxUrlRefreshAttempts > 0 ? maxUrlRefreshAttempts : throw new ArgumentOutOfRangeException(nameof(maxUrlRefreshAttempts));
89+
_urlExpirationBufferSeconds = urlExpirationBufferSeconds > 0 ? urlExpirationBufferSeconds : throw new ArgumentOutOfRangeException(nameof(urlExpirationBufferSeconds));
7990
_downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads);
8091
_isCompleted = false;
8192
}
@@ -237,6 +248,19 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
237248
break;
238249
}
239250

251+
// Check if the URL is expired or about to expire
252+
if (downloadResult.IsExpiredOrExpiringSoon(_urlExpirationBufferSeconds))
253+
{
254+
// Get a refreshed URL before starting the download
255+
var refreshedLink = await _resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, cancellationToken);
256+
if (refreshedLink != null)
257+
{
258+
// Update the download result with the refreshed link
259+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
260+
Trace.TraceInformation($"Updated URL for file at offset {refreshedLink.StartRowOffset} before download");
261+
}
262+
}
263+
240264
// Acquire a download slot
241265
await _downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
242266

@@ -341,6 +365,37 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
341365
HttpCompletionOption.ResponseHeadersRead,
342366
cancellationToken).ConfigureAwait(false);
343367

368+
// Check if the response indicates an expired URL (typically 403 or 401)
369+
if (response.StatusCode == System.Net.HttpStatusCode.Forbidden ||
370+
response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
371+
{
372+
// If we've already tried refreshing too many times, fail
373+
if (downloadResult.RefreshAttempts >= _maxUrlRefreshAttempts)
374+
{
375+
throw new InvalidOperationException($"Failed to download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
376+
}
377+
378+
// Try to refresh the URL
379+
var refreshedLink = await _resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, cancellationToken);
380+
if (refreshedLink != null)
381+
{
382+
// Update the download result with the refreshed link
383+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
384+
url = refreshedLink.FileLink;
385+
sanitizedUrl = SanitizeUrl(url);
386+
387+
Trace.TraceInformation($"URL for file at offset {refreshedLink.StartRowOffset} was refreshed after expired URL response");
388+
389+
// Continue to the next retry attempt with the refreshed URL
390+
continue;
391+
}
392+
else
393+
{
394+
// If refresh failed, throw an exception
395+
throw new InvalidOperationException("Failed to refresh expired URL.");
396+
}
397+
}
398+
344399
response.EnsureSuccessStatusCode();
345400

346401
// Log the download size if available from response headers

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchMemoryBufferManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
using System.Threading;
2020
using System.Threading.Tasks;
2121

22-
namespace Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
22+
namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
2323
{
2424
/// <summary>
2525
/// Manages memory allocation for prefetched files.

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
using System.Net.Http;
2222
using System.Threading;
2323
using System.Threading.Tasks;
24-
using Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch;
25-
using Apache.Arrow.Adbc.Drivers.Databricks;
2624
using Apache.Arrow.Ipc;
2725
using Apache.Hive.Service.Rpc.Thrift;
2826

0 commit comments

Comments
 (0)