-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Checkpointing: Partial Success in BufferedStreamConsumer (Destination) #3555
Conversation
f917e65
to
e7fa9e5
Compare
bb6cd74
to
bdb60ab
Compare
5c6cdad
to
5792690
Compare
3e53b2f
to
9bd3e6d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome!
I just realised this approach might not work for copy destinations in their current form since the actual db insert, via the copy command, only happens at the end of the sync. Storing state in the middle of the sync means we would miss some records on the next sync. Does that make sense?
@davinchia I don't think I understand why we would miss records. Could you explain the problematic scenario? My understanding:
|
What I was thinking of:
I think the approach you wrote out makes sense if we always try to copy the uploaded file to the tmp and final table - might be missing something, but I don't think we are doing that yet. Maybe we should add integration tests cases for this? |
@davinchia I think we're talking past each other. i don't think the case you're describing is possible given the way the code is written. so one of us missing something. 😅 let's discuss offline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple of changes requested but overall the logic seems sound
* buffered records are flushed out of memory using the user-provided recordWriter. When this flush | ||
* happens, a state message is moved from pending to flushed. On close, if the user-provided onClose | ||
* function is successful, then the flushed state record is considered committed and is then | ||
* emitted. We expect this class to only ever emit either 1 state message (in the case of a full or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this class be emitting state messages potentially continuously to allow halfway failure checkpointing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is the hangup here on normalization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalization is orthogonal to this.
Emitting state as we go would be another valid strategy for approaching checkpointing in destinations.
The current strategy is to consume records and put them in a temporary table in the destination until something does wrong. As soon as something does wrong (or we consume all records) attempt to commit everything in the temporary table into the final tables. This current approach gets us checkpointing behavior, but it is vulnerable to the case where the commit would have been successful for a subset of the flushed records but not all of them. I'm not sure to what extent this is really a high frequency failure point.
The reason to not go for emitting state as we go is just expedience. If we emit state as we go then we will need to be committing records from the temporary tables to the final tables as we go. So now each sync can potentially have multiple temporary tables. It is definitely doable and something we should probably shoot for, but at the time I didn't have appetite for the added complexity.
...ava/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
Outdated
Show resolved
Hide resolved
...ava/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
Outdated
Show resolved
Hide resolved
// if any state message flushed that means we can still go for at least a partial success. if none | ||
// was emitted, if there were still no failures, then we can still succeed. the latter case is full | ||
// refresh. | ||
onClose.accept(lastFlushedState == null && hasFailed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've re-read this line like 5 times to map out the boolean table in my head. Do you mind unfurling this e.g:
if (lastFlushedState == null) {
onClose.accept(hasFailed)
} else {
outputRecordCollector.accept(lastFlushedState)
onClose.accept( false )
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hahaha. no i don't mind at all. it took me a long time to write this expression (which should have been a sign that it was too hard to think about)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (lastFlushedState == null) {
onClose.accept(hasFailed)
} else {
onClose.accept( false )
}
if (lastFlushedState != null) {
outputRecordCollector.accept(lastFlushedState);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why separate them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notes from talking with davin:
- hasFailed
- clarify the lastFlushedState concept (especially clarifying in the context of the copy strategy)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also need to add re throwing the exception on close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spoke offline: went through code and confirmed this works for Copy.
One note here to rename hasFailed
to something closer to commit
or discard
since this no longer represents failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada they need to be separate because the condidiontal is not the same. one is lastFlushedState == null
and one is lastFlushedState != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't it go in the else
clause?
} | ||
|
||
@Test | ||
void testExceptionDuringOnClose() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test (and potentially others) should verify that this line throws an exception (also that line should throw an exception)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
* function is successful, then the flushed state record is considered committed and is then | ||
* emitted. We expect this class to only ever emit either 1 state message (in the case of a full or | ||
* partial success) or 0 state messages (in the case where the onClose step was never reached or did | ||
* not complete without exception). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spoke offline: make clearer 'flushing' can mean very different things. Link to Copy Strategy implementation.
570927c
to
900efc0
Compare
@davinchia, I added the clarification around close destinations. I started trying to change |
@davinchia also I feel a little saner about this bug. We had mentioned that we would have expected to see this bug really frequently. Really the only case it appears is if the close function has a failure (which I can convince myself isn't too frequent). Other failures thrown during the lifecycle of the consumer still are thrown and thus cause the process to exist with a non-zero status code. The reason I was confused about this yesterday was because I forgot that |
@sherifnada I think this should be ready for you to take another look. |
ee723cc
to
368533a
Compare
/test connector=connectors/destination-mssql
|
/test connector=connectors/destination-mysql
|
/test connector=connectors/destination-jdbc
|
local |
/test connector=connectors/destination-local-json
|
/test connector=connectors/destination-meilisearch
|
368533a
to
c6fe7ae
Compare
/publish connector=connectors/destination-snowflake
|
/publish connector=connectors/destination-redshift
|
/publish connector=connectors/destination-postgres
|
/publish connector=connectors/destination-oracle
|
/publish connector=connectors/destination-meilisearch
|
/publish connector=connectors/destination-local-json
|
/publish connector=connectors/destination-mysql
|
/publish connector=connectors/destination-mssql
|
/publish connector=connectors/destination-csv
|
/publish connector=connectors/destination-bigquery
|
/publish connector=connectors/destination-bigquery-denormalized
|
What
BufferedStreamConsumer
).Pre-merge Checklist
┆Issue is synchronized with this Asana task by Unito