Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

destination-async-framework: make emission of state from FlushWorkers synchronized #35144

Merged
merged 6 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. |
| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. |
| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. |
| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class FlushWorkers implements AutoCloseable {
private final AtomicBoolean isClosing;
private final GlobalAsyncStateManager stateManager;

private final Object LOCK = new Object();

public FlushWorkers(final BufferDequeue bufferDequeue,
final DestinationFlushFunction flushFunction,
final Consumer<AirbyteMessage> outputRecordCollector,
Expand Down Expand Up @@ -238,10 +240,13 @@ public void close() throws Exception {
}

private void emitStateMessages(final List<PartialStateWithDestinationStats> partials) {
for (final PartialStateWithDestinationStats partial : partials) {
final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class);
message.getState().setDestinationStats(partial.stats());
outputRecordCollector.accept(message);
synchronized (LOCK) {
for (final PartialStateWithDestinationStats partial : partials) {
final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class);
message.getState().setDestinationStats(partial.stats());
log.info("State with arrival number {} emitted from thread {}", partial.stateArrivalNumber(), Thread.currentThread().getName());
outputRecordCollector.accept(message);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ public List<PartialStateWithDestinationStats> flushStates() {
if (allRecordsCommitted) {
final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft();
final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue();
LOGGER.info("State with arrival number {} emitted", stateMessage.arrivalNumber);
output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(),
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState)));
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState), stateMessage.arrivalNumber()));
bytesFlushed += oldestState.getRight();

// cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;

public record PartialStateWithDestinationStats(PartialAirbyteMessage stateMessage, AirbyteStateStats stats) {}
public record PartialStateWithDestinationStats(PartialAirbyteMessage stateMessage, AirbyteStateStats stats, long stateArrivalNumber) {}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.20.1
version=0.20.2
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.1'
cdkVersionRequired = '0.20.2'
features = [
'datastore-bigquery',
'db-destinations',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.7
dockerImageTag: 2.4.8
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.1'
cdkVersionRequired = '0.20.2'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.5.9
dockerImageTag: 3.5.10
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ tutorials:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.4.7 | 2024-02-23 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |
| 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 |
| 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |
| 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 |
| 2.4.5 | 2024-02-08 | [34745](https://github.com/airbytehq/airbyte/pull/34745) | Adopt CDK 0.19.0 |
| 2.4.4 | 2024-02-08 | [35027](https://github.com/airbytehq/airbyte/pull/35027) | Upgrade CDK to 0.17.1 |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.10 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 |
| 3.5.9 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |
| 3.5.8 | 2024-02-09 | [34574](https://github.com/airbytehq/airbyte/pull/34574) | Adopt CDK 0.20.0 |
| 3.5.7 | 2024-02-08 | [34747](https://github.com/airbytehq/airbyte/pull/34747) | Adopt CDK 0.19.0 |
Expand Down
Loading