Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add airbyte trace utility to emit analytics messages & emit messages for MongoDB, Postgres & MySQL #35036

Merged
merged 18 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. |
| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. |
| 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. |
| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.db;

import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;

/**
* Utility class to define constants associated with database source connector analytics events.
* Make sure to add the analytics event to
* https://www.notion.so/Connector-Analytics-Events-892a79a49852465f8d59a18bd84c36de
*/
public class DbAnalyticsUtils {

public static final String CDC_CURSOR_INVALID_KEY = "db-sources-cdc-cursor-invalid";

public static AirbyteAnalyticsTraceMessage cdcCursorInvalidMessage() {
return new AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1");
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package io.airbyte.cdk.integrations.base;

import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteTraceMessage;
import java.time.Instant;
import java.util.function.Consumer;
import org.apache.commons.lang3.exception.ExceptionUtils;

Expand Down Expand Up @@ -50,6 +52,10 @@ public static void emitEstimateTrace(final long byteEstimate,
.withNamespace(streamNamespace))));
}

public static void emitAnalyticsTrace(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) {
emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage));
}

public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType));
}
Expand Down Expand Up @@ -86,6 +92,14 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage(
.withStackTrace(ExceptionUtils.getStackTrace(e))));
}

private static AirbyteMessage makeAnalyticsTraceAirbyteMessage(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) {
return new AirbyteMessage().withType(Type.TRACE)
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
.withTrace(new AirbyteTraceMessage()
.withAnalytics(airbyteAnalyticsTraceMessage)
.withType(AirbyteTraceMessage.Type.ANALYTICS)
.withEmittedAt((double) Instant.now().toEpochMilli()));
}

private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) {
return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.20.5
version=0.20.6
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void setUpOut() {
System.setOut(new PrintStream(outContent, true, StandardCharsets.UTF_8));
}

private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) {
private void assertJsonNodeIsTraceMessage(final JsonNode jsonNode) {
// todo: this check could be better by actually trying to convert the JsonNode to an
// AirbyteTraceMessage instance
Assertions.assertEquals("TRACE", jsonNode.get("type").asText());
Expand All @@ -36,15 +36,15 @@ private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) {
@Test
void testEmitSystemErrorTrace() {
AirbyteTraceMessageUtility.emitSystemErrorTrace(Mockito.mock(RuntimeException.class), "this is a system error");
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
assertJsonNodeIsTraceMessage(outJson);
Assertions.assertEquals("system_error", outJson.get("trace").get("error").get("failure_type").asText());
}

@Test
void testEmitConfigErrorTrace() {
AirbyteTraceMessageUtility.emitConfigErrorTrace(Mockito.mock(RuntimeException.class), "this is a config error");
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
assertJsonNodeIsTraceMessage(outJson);
Assertions.assertEquals("config_error", outJson.get("trace").get("error").get("failure_type").asText());
}
Expand All @@ -58,11 +58,11 @@ void testEmitErrorTrace() {
@Test
void testCorrectStacktraceFormat() {
try {
int x = 1 / 0;
} catch (Exception e) {
final int x = 1 / 0;
} catch (final Exception e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "you exploded the universe");
}
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
Assertions.assertTrue(outJson.get("trace").get("error").get("stack_trace").asText().contains("\n\tat"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.19.0'
cdkVersionRequired = '0.20.6'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.9
dockerImageTag: 1.2.10
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.integrations.source.mongodb.cdc;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
Expand Down Expand Up @@ -112,6 +115,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
optSavedOffset.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient)).isPresent();

if (!savedOffsetIsValid) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh.");
// If the offset in the state is invalid, reset the state to the initial STATE
stateManager.resetState(new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.5'
cdkVersionRequired = '0.20.6'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.6
dockerImageTag: 3.3.7
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql.initialsync;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList;
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
Expand Down Expand Up @@ -109,6 +110,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
savedOffset.isPresent() && mySqlDebeziumStateUtil.savedOffsetStillPresentOnServer(database, savedOffset.get());

if (!savedOffsetStillPresentOnServer) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
cdkVersionRequired = '0.20.6'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.9
dockerImageTag: 3.3.10
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

package io.airbyte.integrations.source.postgres.cdc;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
Expand Down Expand Up @@ -109,6 +111,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
savedOffset);

if (!savedOffsetAfterReplicationSlotLSN) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
// We do not want to acknowledge the WAL logs in debug mode.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes |
| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 |
| 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. |
| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name |
| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 |
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
| 3.3.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. |
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |
Expand Down
Loading