Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/airbytehq/airbyte into to…
Browse files Browse the repository at this point in the history
…pe/datadog-bldr-cmptbl
  • Loading branch information
topefolorunso committed May 12, 2024
2 parents 9691679 + 4b92f75 commit 933c03a
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 67 deletions.
20 changes: 16 additions & 4 deletions .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ on:
workflow_dispatch:
inputs:
connector_name:
description: "Connector name (e.g. source-faker)"
description: Connector name (e.g. source-faker)
required: true
connection_id:
description: "ID of the connection to test"
description: ID of the connection to test; use "auto" to let the connection retriever choose a connection
required: true
pr_url:
description: "URL of the PR containing the code change"
description: URL of the PR containing the code change
required: true
streams:
description: Streams to include in regression tests

jobs:
regression_tests:
Expand Down Expand Up @@ -61,6 +63,16 @@ jobs:
id: fetch_last_commit_id_wd
run: echo "commit_id=$(git rev-parse origin/${{ steps.extract_branch.outputs.branch }})" >> $GITHUB_OUTPUT

- name: Setup Stream Parameters
if: github.event_name == 'workflow_dispatch'
run: |
if [ -z "${{ github.event.inputs.streams }}" ]; then
echo "STREAM_PARAMS=" >> $GITHUB_ENV
else
STREAMS=$(echo "${{ github.event.inputs.streams }}" | sed 's/,/ --connector_regression_tests.selected-streams=/g')
echo "STREAM_PARAMS=--connector_regression_tests.selected-streams=$STREAMS" >> $GITHUB_ENV
fi
- name: Run Regression Tests [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch' # TODO: consider using the matrix strategy (https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs). See https://github.com/airbytehq/airbyte/pull/37659#discussion_r1583380234 for details.
uses: ./.github/actions/run-airbyte-ci
Expand All @@ -77,4 +89,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }}"
subcommand: connectors --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,58 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
// the sync."
// from the spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
try {
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and
// deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
}
}
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string
// is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}

if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we
// tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
}
} catch (t: Throwable) {
LOGGER.error("exception in the exception handler", t)
} finally {
terminate()
}

terminate()
}

// by doing this in a separate method we can mock it to avoid closing the jvm and therefore test
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.34.3
version=0.34.4
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.34.0'
cdkVersionRequired = '0.34.4'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.18
dockerImageTag: 2.4.19
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.33.2'
cdkVersionRequired = '0.34.4'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.7.4
dockerImageTag: 3.8.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
Expand Down Expand Up @@ -206,7 +205,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
config,
catalog,
true,
new TypeAndDedupeOperationValve(),
typerDeduper,
parsedCatalog,
defaultNamespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,6 @@
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
"order": 11
},
"enable_incremental_final_table_updates": {
"type": "boolean",
"default": false,
"description": "When enabled your data will load into your final tables incrementally while your data is still being synced. When Disabled (the default), your data loads into your final tables once at the end of a sync. Note that this option only applies if you elect to create Final tables",
"title": "Enable Loading Data Incrementally to Final Tables",
"order": 12
},
"retention_period_days": {
"type": "integer",
"default": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.0
dockerImageTag: 3.4.1
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,23 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
streamsUnderVacuum.addAll(streamsUnderVacuum(database,
ctidStreams.streamsForCtidSync(), quoteString).result());

final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync()
: ctidStreams.streamsForCtidSync().stream()
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
.toList();
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
quoteString);
final PostgresCtidHandler ctidHandler;
if (!fileNodeHandler.getFailedToQuery().isEmpty()) {
finalListOfStreamsToBeSyncedViaCtid = finalListOfStreamsToBeSyncedViaCtid.stream()
.filter(stream -> !fileNodeHandler.getFailedToQuery().contains(
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace())))
.collect(Collectors.toList());
}
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());

try {
ctidHandler =
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, quoteString, ctidStateManager,
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ tutorials:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.4.19 | 2024-05-10 | [38125](https://github.com/airbytehq/airbyte/pull/38125) | adopt latest CDK code |
| 2.4.18 | 2024-05-10 | [38111](https://github.com/airbytehq/airbyte/pull/38111) | No functional changes, deleting unused code |
| 2.4.17 | 2024-05-09 | [38098](https://github.com/airbytehq/airbyte/pull/38098) | Internal build structure change |
| 2.4.16 | 2024-05-08 | [37714](https://github.com/airbytehq/airbyte/pull/37714) | Adopt CDK 0.34.0 |
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ desired namespace.

| Version | Date | Pull Request | Subject |
| :-------------- | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.7.4 | 2024-05-07 | [\#38052](https://github.com/airbytehq/airbyte/pull/38052) | Revert problematic optimization |
| 3.8.0 | 2024-05-08 | [\#37715](https://github.com/airbytehq/airbyte/pull/37715) | Remove option for incremental typing and deduping |
| 3.7.4 | 2024-05-07 | [\#38052](https://github.com/airbytehq/airbyte/pull/38052) | Revert problematic optimization |
| 3.7.3 | 2024-05-07 | [\#34612](https://github.com/airbytehq/airbyte/pull/34612) | Adopt CDK 0.33.2 |
| 3.7.2 | 2024-05-06 | [\#37857](https://github.com/airbytehq/airbyte/pull/37857) | Use safe executeMetadata call |
| 3.7.1 | 2024-04-30 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | Bump CDK version |
Expand Down
Loading

0 comments on commit 933c03a

Please sign in to comment.