Skip to content

Commit

Permalink
Add count in state message for incremental syncs (#33005)
Browse files Browse the repository at this point in the history
Co-authored-by: xiaohansong <xiaohansong@users.noreply.github.com>
  • Loading branch information
xiaohansong and xiaohansong authored Jan 10, 2024
1 parent 3de9dc9 commit b290208
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 27 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.11.4 | 2024-01-09 | [\#33305](https://github.com/airbytehq/airbyte/pull/33305) | Source stats in incremental syncs |
| 0.11.3 | 2023-01-09 | [\#33658](https://github.com/airbytehq/airbyte/pull/33658) | Always fail when debezium fails, even if it happened during the setup phase. |
| 0.11.2 | 2024-01-09 | [\#33969](https://github.com/airbytehq/airbyte/pull/33969) | Destination state stats implementation |
| 0.11.1 | 2024-01-04 | [\#33727](https://github.com/airbytehq/airbyte/pull/33727) | SSH bastion heartbeats for Destinations |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.11.3
version=0.11.4
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager.DebeziumConnectorType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -144,7 +145,7 @@ protected AirbyteMessage computeNext() {

if (cdcStateHandler.isCdcCheckpointEnabled() && sendCheckpointMessage) {
LOGGER.info("Sending CDC checkpoint state message.");
final AirbyteMessage stateMessage = createStateMessage(checkpointOffsetToSend);
final AirbyteMessage stateMessage = createStateMessage(checkpointOffsetToSend, recordsLastSync);
previousCheckpointOffset.clear();
previousCheckpointOffset.putAll(checkpointOffsetToSend);
resetCheckpointValues();
Expand Down Expand Up @@ -182,7 +183,7 @@ protected AirbyteMessage computeNext() {
}

isSyncFinished = true;
return createStateMessage(offsetManager.read());
return createStateMessage(offsetManager.read(), recordsLastSync);
}

/**
Expand All @@ -201,15 +202,17 @@ private void resetCheckpointValues() {
*
* @return {@link AirbyteStateMessage} which includes offset and schema history if used.
*/
private AirbyteMessage createStateMessage(final Map<String, String> offset) {
private AirbyteMessage createStateMessage(final Map<String, String> offset, final long recordCount) {
if (trackSchemaHistory && schemaHistoryManager == null) {
throw new RuntimeException("Schema History Tracking is true but manager is not initialised");
}
if (offsetManager == null) {
throw new RuntimeException("Offset can not be null");
}

return cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null);
final AirbyteMessage message = cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null);
message.getState().withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
return message;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import java.util.Iterator;
import java.util.Objects;
Expand Down Expand Up @@ -53,6 +54,8 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
*/
private final int stateEmissionFrequency;
private int totalRecordCount = 0;
// In between each state message, recordCountInStateMessage will be reset to 0.
private int recordCountInStateMessage = 0;
private boolean emitIntermediateState = false;
private AirbyteMessage intermediateStateMessage = null;
private boolean hasCaughtException = false;
Expand Down Expand Up @@ -128,6 +131,7 @@ protected AirbyteMessage computeNext() {
}

totalRecordCount++;
recordCountInStateMessage++;
// Use try-catch to catch Exception that could occur when connection to the database fails
try {
final AirbyteMessage message = messageIterator.next();
Expand All @@ -139,7 +143,7 @@ protected AirbyteMessage computeNext() {
if (stateEmissionFrequency > 0 && !Objects.equals(currentMaxCursor, initialCursor) && messageIterator.hasNext()) {
// Only create an intermediate state when it is not the first or last record message.
// The last state message will be processed seperately.
intermediateStateMessage = createStateMessage(false, totalRecordCount);
intermediateStateMessage = createStateMessage(false, recordCountInStateMessage);
}
currentMaxCursor = cursorCandidate;
currentMaxCursorRecordCount = 1L;
Expand All @@ -164,7 +168,7 @@ protected AirbyteMessage computeNext() {
return optionalIntermediateMessage.orElse(endOfData());
}
} else if (!hasEmittedFinalState) {
return createStateMessage(true, totalRecordCount);
return createStateMessage(true, recordCountInStateMessage);
} else {
return endOfData();
}
Expand All @@ -185,6 +189,7 @@ protected final Optional<AirbyteMessage> getIntermediateMessage() {
if (emitIntermediateState && intermediateStateMessage != null) {
final AirbyteMessage message = intermediateStateMessage;
intermediateStateMessage = null;
recordCountInStateMessage = 0;
emitIntermediateState = false;
return Optional.of(message);
}
Expand All @@ -196,14 +201,15 @@ protected final Optional<AirbyteMessage> getIntermediateMessage() {
* read up so far
*
* @param isFinalState marker for if the final state of the iterator has been reached
* @param totalRecordCount count of read messages
* @param recordCount count of read messages
* @return AirbyteMessage which includes information on state of records read so far
*/
public AirbyteMessage createStateMessage(final boolean isFinalState, final int totalRecordCount) {
public AirbyteMessage createStateMessage(final boolean isFinalState, final int recordCount) {
final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, currentMaxCursor, currentMaxCursorRecordCount);
final Optional<CursorInfo> cursorInfo = stateManager.getCursorInfo(pair);

// logging once every 100 messages to reduce log verbosity
if (totalRecordCount % 100 == 0) {
if (recordCount % 100 == 0) {
LOGGER.info("State report for stream {} - original: {} = {} (count {}) -> latest: {} = {} (count {})",
pair,
cursorInfo.map(CursorInfo::getOriginalCursorField).orElse(null),
Expand All @@ -213,6 +219,10 @@ public AirbyteMessage createStateMessage(final boolean isFinalState, final int t
cursorInfo.map(CursorInfo::getCursor).orElse(null),
cursorInfo.map(CursorInfo::getCursorRecordCount).orElse(null));
}

if (stateMessage != null) {
stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
}
if (isFinalState) {
hasEmittedFinalState = true;
if (stateManager.getCursor(pair).isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ protected AirbyteMessage computeNext() {
} else if (!hasEmittedFinalState) {
hasEmittedFinalState = true;
final AirbyteStateMessage finalStateMessage = sourceStateIteratorManager.createFinalStateMessage();
finalStateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
recordCount = 0L;
return new AirbyteMessage()
.withType(Type.STATE)
.withState(finalStateMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import java.sql.SQLException;
import java.util.Collections;
Expand Down Expand Up @@ -69,7 +70,8 @@ private static AirbyteMessage createStateMessage(final String recordValue) {
return new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue))));
.withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue)))
.withSourceStats(new AirbyteStateStats().withRecordCount(1.0)));
}

private Iterator<AirbyteMessage> createExceptionIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {

protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);

protected abstract void assertExpectedStateMessagesWithTotalCount(final List<AirbyteStateMessage> stateMessages, final long totalRecordCount);

@BeforeEach
protected void setup() {
testdb = createTestDatabase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
Expand Down Expand Up @@ -601,7 +602,7 @@ protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String
.withCursorField(List.of(COL_ID))
.withCursor("5")
.withCursorRecordCount(1L);
expectedMessages.addAll(createExpectedTestMessages(List.of(state)));
expectedMessages.addAll(createExpectedTestMessages(List.of(state), 2L));
return expectedMessages;
}

Expand Down Expand Up @@ -671,9 +672,9 @@ protected void testReadMultipleTablesIncrementally() throws Exception {
.withCursorRecordCount(1L));

final List<AirbyteMessage> expectedMessagesFirstSync = new ArrayList<>(getTestMessages());
expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams1.get(0), expectedStateStreams1));
expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams1.get(0), expectedStateStreams1, 3L));
expectedMessagesFirstSync.addAll(secondStreamExpectedMessages);
expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams2.get(1), expectedStateStreams2));
expectedMessagesFirstSync.add(createStateMessage(expectedStateStreams2.get(1), expectedStateStreams2, 3L));

setEmittedAtToNull(actualMessagesFirstSync);

Expand Down Expand Up @@ -854,7 +855,7 @@ protected void incrementalCursorCheck(
final List<DbStreamState> expectedStreams = List.of(buildStreamState(airbyteStream, cursorField, endCursorValue));

final List<AirbyteMessage> expectedMessages = new ArrayList<>(expectedRecordMessages);
expectedMessages.addAll(createExpectedTestMessages(expectedStreams));
expectedMessages.addAll(createExpectedTestMessages(expectedStreams, expectedRecordMessages.size()));

assertEquals(expectedMessages.size(), actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
Expand Down Expand Up @@ -934,15 +935,16 @@ protected List<AirbyteMessage> getTestMessages() {
COL_UPDATED_AT, "2006-10-19")))));
}

protected List<AirbyteMessage> createExpectedTestMessages(final List<DbStreamState> states) {
protected List<AirbyteMessage> createExpectedTestMessages(final List<DbStreamState> states, final long numRecords) {
return states.stream()
.map(s -> new AirbyteMessage().withType(Type.STATE)
.withState(
new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
.withStreamState(Jsons.jsonNode(s)))
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(states)))))
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(states)))
.withSourceStats(new AirbyteStateStats().withRecordCount((double) numRecords))))
.collect(
Collectors.toList());
}
Expand Down Expand Up @@ -1062,15 +1064,16 @@ protected JsonNode extractState(final AirbyteMessage airbyteMessage) {
return Jsons.jsonNode(List.of(airbyteMessage.getState()));
}

protected AirbyteMessage createStateMessage(final DbStreamState dbStreamState, final List<DbStreamState> legacyStates) {
protected AirbyteMessage createStateMessage(final DbStreamState dbStreamState, final List<DbStreamState> legacyStates, final long recordCount) {
return new AirbyteMessage().withType(Type.STATE)
.withState(
new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withNamespace(dbStreamState.getStreamNamespace())
.withName(dbStreamState.getStreamName()))
.withStreamState(Jsons.jsonNode(dbStreamState)))
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(legacyStates))));
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(legacyStates)))
.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)));
}

protected List<String> extractSpecificFieldFromCombinedMessages(final List<AirbyteMessage> messages,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.10.3'
cdkVersionRequired = '0.11.4'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.1
dockerImageTag: 3.3.2
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair,
}

@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair,
final JsonNode streamStateForIncrementalRun) {
streamsThatHaveCompletedSnapshot.add(pair);
final List<AirbyteStreamState> streamStates = new ArrayList<>();
streamsThatHaveCompletedSnapshot.forEach(stream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public interface MySqlInitialLoadStateManager {
void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair, final PrimaryKeyLoadStatus pkLoadStatus);

// Returns the final state message for the initial sync.
AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);
AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair,
final JsonNode streamStateForIncrementalRun);

// Returns the previous state emitted, represented as a {@link PrimaryKeyLoadStatus} associated with
// the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ protected void assertExpectedStateMessages(final List<AirbyteStateMessage> state
assertStateTypes(stateMessages, 4);
}

@Override
protected void assertExpectedStateMessagesWithTotalCount(final List<AirbyteStateMessage> stateMessages, final long totalRecordCount) {
long actualRecordCount = 0L;
for (final AirbyteStateMessage message : stateMessages) {
actualRecordCount += message.getSourceStats().getRecordCount();
}
assertEquals(actualRecordCount, totalRecordCount);
}

@Override
protected void assertExpectedStateMessagesFromIncrementalSync(final List<AirbyteStateMessage> stateMessages) {
assertEquals(1, stateMessages.size());
Expand Down Expand Up @@ -433,6 +442,7 @@ public void syncWouldWorkWithDBWithInvalidTimezone() throws Exception {

assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages);
assertExpectedStateMessages(stateMessages);
assertExpectedStateMessagesWithTotalCount(stateMessages, 6);
}

@Test
Expand All @@ -451,6 +461,7 @@ public void testCompositeIndexInitialLoad() throws Exception {
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1);
assertExpectedStateMessages(stateMessages1);
assertExpectedStateMessagesWithTotalCount(stateMessages1, 6);

// Re-run the sync with state associated with record w/ id = 15 (second to last record).
// We expect to read 2 records, since in the case of a composite PK we issue a >= query.
Expand Down Expand Up @@ -514,6 +525,8 @@ public void testTwoStreamSync() throws Exception {
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertEquals(13, stateMessages1.size());
assertExpectedStateMessagesWithTotalCount(stateMessages1, 12);

JsonNode sharedState = null;
StreamDescriptor firstStreamInState = null;
for (int i = 0; i < stateMessages1.size(); i++) {
Expand Down Expand Up @@ -582,6 +595,8 @@ public void testTwoStreamSync() throws Exception {
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);

assertEquals(6, stateMessages2.size());
// State was reset to the 7th; thus 5 remaining records were expected to be reloaded.
assertExpectedStateMessagesWithTotalCount(stateMessages2, 5);
for (int i = 0; i < stateMessages2.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages2.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
Expand Down
Loading

0 comments on commit b290208

Please sign in to comment.