Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DB sources] : Reduce CDC state compression limit to 1MB #35511

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Add Junit progress information to the test logs |
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
| 0.21.2 | 2024-02-20 | [\#34978](https://github.com/airbytehq/airbyte/pull/34978) | Reduce log noise in NormalizationLogParser. |
| 0.21.1 | 2024-02-20 | [\#35199](https://github.com/airbytehq/airbyte/pull/35199) | Add thread names to the logs. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.21.3
version=0.21.4
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class AirbyteSchemaHistoryStorage {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteSchemaHistoryStorage.class);
private static final long SIZE_LIMIT_TO_COMPRESS_MB = 3;
private static final long SIZE_LIMIT_TO_COMPRESS_MB = 2;
public static final int ONE_MB = 1024 * 1024;
private static final Charset UTF8 = StandardCharsets.UTF_8;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class AirbyteSchemaHistoryStorageTest {

@Test
public void testForContentBiggerThan3MBLimit() throws IOException {
final String contentReadDirectlyFromFile = MoreResources.readResource("dbhistory_greater_than_3_mb.dat");
final String contentReadDirectlyFromFile = MoreResources.readResource("dbhistory_greater_than_2_mb.dat");

final AirbyteSchemaHistoryStorage schemaHistoryStorageFromUncompressedContent = AirbyteSchemaHistoryStorage.initializeDBHistory(
new SchemaHistory<>(Optional.of(Jsons.jsonNode(contentReadDirectlyFromFile)),
Expand All @@ -46,14 +46,14 @@ public void testForContentBiggerThan3MBLimit() throws IOException {
@Test
public void sizeTest() throws IOException {
assertEquals(5.881045341491699,
AirbyteSchemaHistoryStorage.calculateSizeOfStringInMB(MoreResources.readResource("dbhistory_greater_than_3_mb.dat")));
AirbyteSchemaHistoryStorage.calculateSizeOfStringInMB(MoreResources.readResource("dbhistory_greater_than_2_mb.dat")));
assertEquals(0.0038671493530273438,
AirbyteSchemaHistoryStorage.calculateSizeOfStringInMB(MoreResources.readResource("dbhistory_less_than_3_mb.dat")));
AirbyteSchemaHistoryStorage.calculateSizeOfStringInMB(MoreResources.readResource("dbhistory_less_than_2_mb.dat")));
}

@Test
public void testForContentLessThan3MBLimit() throws IOException {
final String contentReadDirectlyFromFile = MoreResources.readResource("dbhistory_less_than_3_mb.dat");
final String contentReadDirectlyFromFile = MoreResources.readResource("dbhistory_less_than_2_mb.dat");

final AirbyteSchemaHistoryStorage schemaHistoryStorageFromUncompressedContent = AirbyteSchemaHistoryStorage.initializeDBHistory(
new SchemaHistory<>(Optional.of(Jsons.jsonNode(contentReadDirectlyFromFile)),
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes in this file seems unrelated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted. Not sure why they showed up

Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,116 @@ public class MySqlInitialReadUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialReadUtil.class);

public static List<AutoCloseableIterator<AirbyteMessage>> getInitialLoadIterators(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt,
final String quoteString,
final boolean isCdc) {
final JsonNode sourceConfig = database.getSourceConfig();
// Determine the streams that need to be loaded via primary key sync.
final List<AutoCloseableIterator<AirbyteMessage>> initialLoadIterator = new ArrayList<>();

// Construct the initial state for MySQL. If there is already existing state, we use that instead
// since that is associated with the debezium
// state associated with the initial sync.
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
final JsonNode initialDebeziumState = mySqlDebeziumStateUtil.constructInitialDebeziumState(
MySqlCdcProperties.getDebeziumProperties(database), catalog, database);

final JsonNode state =
(stateManager.getCdcStateManager().getCdcState() == null || stateManager.getCdcStateManager().getCdcState().getState() == null)
? initialDebeziumState
: Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState());

final Optional<MysqlDebeziumStateAttributes> savedOffset = mySqlDebeziumStateUtil.savedOffset(
MySqlCdcProperties.getDebeziumProperties(database), catalog, state.get(MYSQL_CDC_OFFSET), sourceConfig);

final boolean savedOffsetStillPresentOnServer =
savedOffset.isPresent() && mySqlDebeziumStateUtil.savedOffsetStillPresentOnServer(database, savedOffset.get());

if (!savedOffsetStillPresentOnServer) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get(
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
throw new ConfigErrorException(
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
}
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
}

final InitialLoadStreams initialLoadStreams = cdcStreamsForInitialPrimaryKeyLoad(stateManager.getCdcStateManager(), catalog,
savedOffsetStillPresentOnServer);

final CdcState stateToBeUsed = (!savedOffsetStillPresentOnServer || (stateManager.getCdcStateManager().getCdcState() == null
|| stateManager.getCdcStateManager().getCdcState().getState() == null)) ? new CdcState().withState(initialDebeziumState)
: stateManager.getCdcStateManager().getCdcState();

final MySqlCdcConnectorMetadataInjector metadataInjector = MySqlCdcConnectorMetadataInjector.getInstance(emittedAt);

// If there are streams to sync via primary key load, build the relevant iterators.
if (!initialLoadStreams.streamsForInitialLoad().isEmpty()) {
LOGGER.info("Streams to be synced via primary key : {}", initialLoadStreams.streamsForInitialLoad().size());
LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(initialLoadStreams.streamsForInitialLoad()));
final MySqlInitialLoadStateManager initialLoadStateManager =
new MySqlInitialLoadGlobalStateManager(initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString),
stateToBeUsed, catalog);
final MysqlDebeziumStateAttributes stateAttributes = MySqlDebeziumStateUtil.getStateAttributesFromDB(database);

final MySqlInitialLoadSourceOperations sourceOperations =
new MySqlInitialLoadSourceOperations(
Optional.of(new CdcMetadataInjector(emittedAt.toString(), stateAttributes, metadataInjector)));
final MySqlInitialLoadHandler initialLoadHandler = new MySqlInitialLoadHandler(sourceConfig, database,
sourceOperations,
quoteString,
initialLoadStateManager,
namespacePair -> Jsons.emptyObject(),
getTableSizeInfoForStreams(database, initialLoadStreams.streamsForInitialLoad(), quoteString));

initialLoadIterator.addAll(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),
tableNameToTable,
emittedAt));
} else {
LOGGER.info("No streams will be synced via primary key");
}

if (isCdc) {
final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig);
final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(sourceConfig);
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
// Build the incremental CDC iterators.
final AirbyteDebeziumHandler<MySqlCdcPosition> handler = new AirbyteDebeziumHandler<>(
sourceConfig,
MySqlCdcTargetPosition.targetPosition(database),
true,
firstRecordWaitTime,
subsequentRecordWaitTime,
AirbyteDebeziumHandler.QUEUE_CAPACITY,
false);
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(
MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog);
final var eventConverter = new RelationalDbDebeziumEventConverter(metadataInjector, emittedAt);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, new MySqlCdcSavedInfoFetcher(stateToBeUsed), new MySqlCdcStateHandler(stateManager));

// This starts processing the binglogs as soon as initial sync is complete, this is a bit different
// from the current cdc syncs.
// We finish the current CDC once the initial snapshot is complete and the next sync starts
// processing the binlogs
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(
Stream
.of(initialLoadIterator, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)))
.flatMap(Collection::stream)
.collect(Collectors.toList()),
AirbyteTraceMessageUtility::emitStreamStatusTrace));
}
}

/*
* Returns the read iterators associated with : 1. Initial cdc read snapshot via primary key
* queries. 2. Incremental cdc reads via debezium.
Expand Down
Loading