Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Introduce StateIteratorProcessor in CDK #33312

Merged
merged 18 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.airbyte.cdk.integrations.source.relationaldb.state;

import com.google.common.collect.AbstractIterator;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import java.time.Instant;
import java.util.Iterator;
import javax.annotation.CheckForNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceStateIterator<T> extends AbstractIterator<AirbyteMessage> implements Iterator<AirbyteMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceStateIterator.class);
private final Iterator<T> messageIterator;
private boolean hasEmittedFinalState = false;
private long recordCount = 0L;
private Instant lastCheckpoint = Instant.now();

private final SourceStateIteratorProcessor sourceStateIteratorProcessor;

public SourceStateIterator(final Iterator<T> messageIterator,
final SourceStateIteratorProcessor sourceStateIteratorProcessor) {
this.messageIterator = messageIterator;
this.sourceStateIteratorProcessor = sourceStateIteratorProcessor;
}

@CheckForNull
@Override
protected AirbyteMessage computeNext() {
boolean iteratorHasNextValue = false;
try {
iteratorHasNextValue = messageIterator.hasNext();
} catch (Exception ex) {
LOGGER.info("Caught exception while trying to get the next from message iterator. Treating hasNext to false. ", ex);
}
if (iteratorHasNextValue) {
if (sourceStateIteratorProcessor.shouldEmitStateMessage(recordCount, lastCheckpoint)) {
AirbyteStateMessage stateMessage = sourceStateIteratorProcessor.generateStateMessageAtCheckpoint();
stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));

recordCount = 0L;
lastCheckpoint = Instant.now();
return new AirbyteMessage()
.withType(Type.STATE)
.withState(stateMessage);
}
// Use try-catch to catch Exception that could occur when connection to the database fails
try {
final T message = messageIterator.next();
final AirbyteMessage processedMessage = sourceStateIteratorProcessor.processRecordMessage(message);
recordCount++;
return processedMessage;
} catch (final Exception e) {
throw new RuntimeException(e);
}
} else if (!hasEmittedFinalState) {
hasEmittedFinalState = true;
final AirbyteStateMessage finalStateMessage = sourceStateIteratorProcessor.createFinalStateMessage();
return new AirbyteMessage()
.withType(Type.STATE)
.withState(finalStateMessage);
} else {
return endOfData();
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.airbyte.cdk.integrations.source.relationaldb.state;

import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import java.time.Instant;

public interface SourceStateIteratorProcessor<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the implementation of this is MySqlInitialSyncStateIteratorProcessor - this seems to be a wrapper around StateManager? Perhaps we should rename this to SourceStateManager?

A more stretch goal is to see if there is a way to combine parts of this code and the various StateManagers since there is definitely some duplicate code (move certain fields to generics)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and we decide to keep this as is-

I didn’t make the stateManager change, because:

  • Abstracting all logic from various statemanager is difficult because each has different function signature to generate state message
  • Merging Processor logic into stateManager within the same connector is problematic; because then we would have to create an abstract class of the stateManager on top of global and PerStream so we don’t duplicate the Processor logic in it, and since we are creating another layer anyway we would prefer composition than inheritance.

/**
* Returns a state message that should be emitted at checkpoint.
*/
AirbyteStateMessage generateStateMessageAtCheckpoint();

/**
* For the incoming record message, this method defines how the connector will consume it.
*/
AirbyteMessage processRecordMessage(final T message);

/**
* At the end of the iteration, this method will be called and it will generate the final state message.
* @return
*/
AirbyteStateMessage createFinalStateMessage();

/**
* Determines if the iterator has reached checkpoint or not,
* based on the time and number of record messages it has been processed since the last checkpoint.
*/
boolean shouldEmitStateMessage(final long recordCount, final Instant lastCheckpoint);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.airbyte.cdk.integrations.source.relationaldb.state;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.util.Iterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class SourceStateIteratorTest {

SourceStateIteratorProcessor mockProcessor;
Iterator<AirbyteMessage> messageIterator;

SourceStateIterator sourceStateIterator;

@BeforeEach
void setup() {
mockProcessor = mock(SourceStateIteratorProcessor.class);
messageIterator = mock(Iterator.class);
sourceStateIterator = new SourceStateIterator(messageIterator, mockProcessor);
}

// Provides a way to generate a record message and will verify corresponding spied functions have been called.
void processRecordMessage() {
doReturn(true).when(messageIterator).hasNext();
doReturn(false).when(mockProcessor).shouldEmitStateMessage(anyLong(), any());
doNothing().when(mockProcessor).processRecordMessage(any());
AirbyteMessage message = new AirbyteMessage().withType(Type.RECORD).withRecord(new AirbyteRecordMessage());
doReturn(message).when(messageIterator).next();

assertEquals(message, sourceStateIterator.computeNext());
verify(mockProcessor, atLeastOnce()).processRecordMessage(message);
verify(mockProcessor, atLeastOnce()).shouldEmitStateMessage(eq(0L), any());
}
@Test
void testShouldProcessRecordMessage() {
processRecordMessage();
}

@Test
void testShouldEmitStateMessage() {
processRecordMessage();
doReturn(true).when(mockProcessor).shouldEmitStateMessage(anyLong(), any());
final AirbyteStateMessage stateMessage = new AirbyteStateMessage();
doReturn(stateMessage).when(mockProcessor).generateStateMessageAtCheckpoint();
AirbyteMessage expectedMessage = new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
expectedMessage.getState().withSourceStats(new AirbyteStateStats().withRecordCount(1.0));
assertEquals(expectedMessage, sourceStateIterator.computeNext());
}

@Test
void testShouldEmitFinalStateMessage() {
processRecordMessage();
processRecordMessage();
doReturn(false).when(messageIterator).hasNext();
final AirbyteStateMessage stateMessage = new AirbyteStateMessage();
doReturn(stateMessage).when(mockProcessor).createFinalStateMessage();
AirbyteMessage expectedMessage = new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
expectedMessage.getState().withSourceStats(new AirbyteStateStats().withRecordCount(2.0));
assertEquals(expectedMessage, sourceStateIterator.computeNext());
}

@Test
void testShouldSendEndOfData() {
processRecordMessage();
doReturn(false).when(messageIterator).hasNext();
doReturn(new AirbyteStateMessage()).when(mockProcessor).createFinalStateMessage();
sourceStateIterator.computeNext();

// After sending the final state, if iterator was called again, we will return null.
assertEquals(null, sourceStateIterator.computeNext());
}
}
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.6.4'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

configurations.all {
Expand All @@ -30,6 +30,7 @@ dependencies {

implementation 'mysql:mysql-connector-java:8.0.30'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation project(path: ':airbyte-cdk:java:airbyte-cdk:db-sources')

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.mysql.cj.MysqlType;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIteratorProcessor;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -180,13 +183,15 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseab

final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: MySqlInitialSyncStateIterator.SYNC_CHECKPOINT_DURATION;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: MySqlInitialSyncStateIterator.SYNC_CHECKPOINT_RECORDS;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;

final SourceStateIteratorProcessor processor = new MySqlInitialSyncStateIteratorProcessor(pair, initialLoadStateManager, incrementalState,
syncCheckpointDuration, syncCheckpointRecords);

return AutoCloseableIterators.transformIterator(
r -> new MySqlInitialSyncStateIterator(r, pair, initialLoadStateManager, incrementalState,
syncCheckpointDuration, syncCheckpointRecords),
r -> new SourceStateIterator(r, processor),
recordIterator, pair);
}

Expand Down

This file was deleted.

Loading
Loading