-
Notifications
You must be signed in to change notification settings - Fork 161
fix(csharp/src/Drivers/Databricks): Correct DatabricksCompositeReader and StatusPoller to Stop/Dispose Appropriately #3217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ec41720 to
004a5a7
Compare
|
can you confirm, even without this fix, the polling will stop after statement being disposed, right? if not, we need fix there also |
|
|
||
| // Add the end of results guard to the queue | ||
| _downloadQueue.Add(EndOfResultsGuard.Instance, cancellationToken); | ||
| _isCompleted = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From testing:
Small nit but I think we need to avoid this here, since it's possible that DownloadQueue is full, then exception handling would be stuck. Should I modify the Exception handling below, or was there a reason why it it like this? (line 262) @jadewang-db
catch (Exception ex)
{
try
{
_downloadQueue.Add(EndOfResultsGuard.Instance, CancellationToken.None);
}
}
Alternatively, we can create a new CancellationToken with Timeout for this attempt
CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, a cancellation token looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh just saw this comment, let me implement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually looks like TryAdd is better suited here
9caf8db to
9fd9fea
Compare
d55808c to
74c6ee8
Compare
| { | ||
| var operationHandle = _statement.OperationHandle; | ||
| if (operationHandle == null) break; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use a timeout token here, instead of cancelling when canceltoken is triggered; if an interrupt is triggered prematurely, the TCLI client may still have unsent/unconsumed results in the buffers, affecting subsequent calls with that client (which is any future call in the same Session)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you able to repro this? should we do this to all the thrift rpc calls in the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is because in THTTPTransport (used by SparkHttpConnection -> DatabricksHttpconnection), a new Stream is created when the request is flushed. If cancellation happens before this, that stream doesn't get discarded:
https://github.com/apache/thrift/blob/master/lib/netstd/Thrift/Transport/Client/THttpTransport.cs#L281
Yes, during testing, got some errors. In the proxy logs, I remember seeing requests sent out with both GetOperationStatus and CloseOperationStatus (in the same request) while testing another PR
I think we are safe in HiveServer2Statement, but we might need to adjust CancellationToken in DatabricksReader, CloudFetchResultFetcher, and DatabricksCompositeReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think this depends a bit on how CancellationToken could be used by PBI, too
@CurtHagenlocher will mashup ever trigger cancellationTokens passed into IArrowStreamReader.ReadNextBatchAsync? Do we need to ensure that the connection still remains usable for subsequent statements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for now, I think we can operate this way:
- If the user cancels the token passed in to ReadNextBatchAsync, we should not to break the client
- Dispose() should not break the client either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CurtHagenlocher will mashup ever trigger cancellationTokens passed into IArrowStreamReader.ReadNextBatchAsync? Do we need to ensure that the connection still remains usable for subsequent statements?
This is currently unimplemented but we'll need to implement it before GA for parity with the ODBC implementation. What is probably most important for cancellation is query execution, and unless we manage to push forward the proposed ADBC 1.1 API, currently the only way to cancel a running query is to call AdbcStatement.Cancel. There is currently no implementation of this method for any of the C#-implemented drivers :(.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a Power BI perspective, the most important use of cancellation is for Direct Query because users can generate a lot of queries simply by clicking around in a visual and in-progress queries will need to be cancelled if their output is no longer needed. DQ output tends to be relatively small, so being able to cancel in the middle of reading the output is arguably less important than being able to cancel before the results start coming back.
ecb0771 to
3263cef
Compare
8b88019 to
8e54490
Compare
579e26d to
be06c48
Compare
|
|
||
| // use direct results if available | ||
| if (_statement.HasDirectResults && _statement.DirectResults != null && _statement.DirectResults.__isset.resultSet) | ||
| if (_statement.HasDirectResults && _statement.DirectResults != null && _statement.DirectResults.__isset.resultSet && statement.DirectResults?.ResultSet != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be simplified to if (_statement.HasDirectResults)? It looks like that method is performing the same checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bit helpful for linter
| _operationStatusPollingTask?.Wait(); | ||
| try | ||
| { | ||
| if (_operationStatusPollingTask != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar, this check looks redundant because you are already doing _operationStatusPollingTask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah thanks, good catch
| request.StartRowOffset = offset; | ||
|
|
||
| // Cancelling mid-request breaks the client; Dispose() should not break the underlying client | ||
| CancellationToken expiringToken = ApacheUtility.GetCancellationToken(DatabricksConstants.DefaultCloudFetchRequestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should you respect the connection parameter DatabricksParameters.CloudFetchTimeoutMinutes instead of the default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean I shouldn't create a new constant here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, what I meant is if you should check the value of the connection parameter CloudFetchTimeoutMinutes (adbc.databricks.cloudfetch.timeout_minutes) which can be set by the client and customer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh got it, that makes sense, it should be a configurable parameter. To be consistent with the rest of HiveServer2Statement, I'm just using the QueryTimeout parameter (which is what other FetchResultsRequest uses)
I have some changes in a follow-up PR that will make this change easier to do for DatabricksReader, will leave this as a TODO
| if (!statement.DirectResults.ResultSet.HasMoreRows) | ||
| { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return in the middle in the constructor may lead to some thing only partially initialized? Can we do the other way around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit more efficient for linting, otherwise requires a bunch of null checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I suggested is below
if (statement.DirectResults.ResultSet.HasMoreRows)
{
operationStatusPoller = new DatabricksOperationStatusPoller(statement);
operationStatusPoller.Start();
}so if in the future, you add some other initialization on other private variables after this, they will not be missed when statement.DirectResults.ResultSet.HasMoreRows is false
9242fd2 to
efecc82
Compare
efecc82 to
65f9d0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! The linter error needs to be fixed and I made a few small low-priority suggestions.
| } | ||
| catch (Exception) | ||
| { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove blank line
| if (_statement.DirectResults?.ResultSet.HasMoreRows ?? true) | ||
| { | ||
| operationStatusPoller = new DatabricksOperationStatusPoller(statement); | ||
| operationStatusPoller.Start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some trailing spaces on this line that the linter doesn't like.
|
|
||
| private void StopOperationStatusPoller() | ||
| { | ||
| operationStatusPoller?.Stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider setting to null here instead of the DisposeOperationStatusPoller method to avoid duplicate calls.
| private Task? _operationStatusPollingTask; | ||
|
|
||
| public DatabricksOperationStatusPoller(IHiveServer2Statement statement, int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds) | ||
| public DatabricksOperationStatusPoller(IHiveServer2Statement statement, int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds, int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public DatabricksOperationStatusPoller(IHiveServer2Statement statement, int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds, int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds) | |
| public DatabricksOperationStatusPoller( | |
| IHiveServer2Statement statement, | |
| int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds, | |
| int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds) |
i.e. split across multiple lines
csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
Outdated
Show resolved
Hide resolved
| public async Task StopStopsPolling() | ||
| { | ||
| // Arrange | ||
| var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _heartbeatIntervalSeconds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a using block for poller instead of an explicit Dispose. This will ensure that the Dispose happens even if an exception is thrown inside the using block. Applies to many of the tests in this file, and probably not super-important unless the failure to Dispose in one test could cause another test to fail.
f559692 to
5a48ef2
Compare
5a48ef2 to
4130c83
Compare
Motivation
The following cases are not properly stopping or disposing the status poller:
In addition:
Fixes
DatabricksOperationStatusPollerLogic is now more appropriately managed by DatabricksCompositeReader (moved out of BaseDatabricksReader) to handle all cases where null results (indicating completion) are returned.
Disposing DatabricksCompositeReader appropriately disposes the activeReader and statusPoller
TODO
Follow-up PR - when statement is disposed, it should also dispose the reader (the poller is currently stopped when operationhandle is set to null, but this should also happen explicitly)
Need add some unit testing (follow up pr: #3243)