Skip to content

Commit

Permalink
Merge branch 'master' into strosek/test-conn-err-retry
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Jun 5, 2024
2 parents f6757f4 + acc854a commit b705ec8
Show file tree
Hide file tree
Showing 1,574 changed files with 123,373 additions and 24,545 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.61.0
current_version = 0.62.0
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
connectors_ci:
name: Connectors Version Increment Check
runs-on: connector-test-large
if: github.event.pull_request.head.repo.fork != true
timeout-minutes: 10
steps:
- name: Checkout Airbyte
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/format-fix-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ jobs:
continue-on-error: true
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish-cdk-command-manually.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ jobs:
uses: peter-evans/create-pull-request@v6
with:
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
commit-message: Updating CDK version following release
title: Updating CDK version following release
commit-message: "chore: update CDK version following release"
title: "chore: update CDK version following release"
body: This is an automatically generated PR triggered by a CDK release
branch: automatic-cdk-release
base: master
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ _Screenshot taken from [Airbyte Cloud](https://cloud.airbyte.com/signup)_.
- Create connectors in minutes with our [no-code Connector Builder](https://docs.airbyte.com/connector-development/connector-builder-ui/overview) or [low-code CDK](https://docs.airbyte.com/connector-development/config-based/low-code-cdk-overview).
- Explore popular use cases in our [tutorials](https://airbyte.com/tutorials).
- Orchestrate Airbyte syncs with [Airflow](https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator), [Prefect](https://docs.airbyte.com/operator-guides/using-prefect-task), [Dagster](https://docs.airbyte.com/operator-guides/using-dagster-integration), [Kestra](https://docs.airbyte.com/operator-guides/using-kestra-plugin) or the [Airbyte API](https://reference.airbyte.com/reference/start).
- Easily transform loaded data with [SQL](https://docs.airbyte.com/operator-guides/transformation-and-normalization/transformations-with-sql) or [dbt](https://docs.airbyte.com/operator-guides/transformation-and-normalization/transformations-with-dbt).

Try it out yourself with our [demo app](https://demo.airbyte.io/), visit our [full documentation](https://docs.airbyte.com/) and learn more about [recent announcements](https://airbyte.com/blog-categories/company-updates). See our [registry](https://connectors.airbyte.com/files/generated_reports/connector_registry_report.html) for a full list of connectors already available in Airbyte or Airbyte Cloud.

Expand Down
5 changes: 5 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.36.5 | 2024-06-01 | [\#38792](https://github.com/airbytehq/airbyte/pull/38792) | Throw config exception if no selectable table exists in user provided schemas |
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
| 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |
| 0.35.12 | 2024-05-23 | [\#38638](https://github.com/airbytehq/airbyte/pull/38638) | Minor change to support Snowflake conversion to Kotlin |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ internal constructor(
ConnectorExceptionUtil.getDisplayMessage(rootConfigErrorThrowable),
)
// On receiving a config error, the container should be immediately shut down.
System.exit(1)
} else if (ConnectorExceptionUtil.isTransientError(rootTransientErrorThrowable)) {
AirbyteTraceMessageUtility.emitTransientErrorTrace(
e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.airbyte.cdk.integrations.base

import java.util.*
import java.util.Locale
import org.apache.commons.lang3.StringUtils

fun upperQuoted(column: String): String {
Expand All @@ -24,41 +24,60 @@ object JavaBaseConstants {
const val COLUMN_NAME_DATA: String = "_airbyte_data"
@JvmField
val LEGACY_RAW_TABLE_COLUMNS: List<String> =
java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)

// destination v2
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at"
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"

const val AIRBYTE_META_SYNC_ID_KEY = "sync_id"

// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
// use this column list.
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
java.util.List.of(
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
java.util.List.of(
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List<String> =
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
@JvmField
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID
)

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION),
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
protected abstract fun writeRecord(
recordString: String,
airbyteMetaString: String,
emittedAt: Long
generationId: Long,
emittedAt: Long,
)

/**
Expand Down Expand Up @@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}

@Throws(Exception::class)
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
override fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long {
if (!isStarted) {
if (useCompression) {
compressedBuffer = GzipCompressorOutputStream(byteCounter)
Expand All @@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}
if (inputStream == null && !isClosed) {
val startCount = byteCounter.count
writeRecord(recordString, airbyteMetaString, emittedAt)
writeRecord(recordString, airbyteMetaString, generationId, emittedAt)
return byteCounter.count - startCount
} else {
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ interface SerializableBuffer : AutoCloseable {
* @throws Exception
*/
@Throws(Exception::class)
fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long
fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long

/** Flush a buffer implementation. */
@Throws(Exception::class) fun flush()
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.14
version=0.36.5
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import org.apache.commons.lang3.ThreadUtils
import org.assertj.core.api.AssertionsForClassTypes
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -221,32 +220,6 @@ internal class IntegrationRunnerTest {
Mockito.verify(jsonSchemaValidator).validate(any(), any())
}

@Test
@Throws(Exception::class)
fun testReadException() {
val intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, statePath)
val configErrorException = ConfigErrorException("Invalid configuration")

Mockito.`when`(cliParser.parse(ARGS)).thenReturn(intConfig)
Mockito.`when`(source.read(CONFIG, CONFIGURED_CATALOG, STATE))
.thenThrow(configErrorException)

val expectedConnSpec = Mockito.mock(ConnectorSpecification::class.java)
Mockito.`when`(source.spec()).thenReturn(expectedConnSpec)
Mockito.`when`(expectedConnSpec.connectionSpecification).thenReturn(CONFIG)

val jsonSchemaValidator = Mockito.mock(JsonSchemaValidator::class.java)
val throwable =
AssertionsForClassTypes.catchThrowable {
IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator)
.run(ARGS)
}

AssertionsForClassTypes.assertThat(throwable).isInstanceOf(ConfigErrorException::class.java)
// noinspection resource
Mockito.verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE)
}

@Test
@Throws(Exception::class)
fun testCheckNestedException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
// Filter for nonNull values in case the query returned NULL (i.e. no unloaded
// records).
val minUnloadedTimestamp: Optional<Timestamp> =
timestampStream.filter { obj: Timestamp -> Objects.nonNull(obj) }.findFirst()
timestampStream.filter { obj: Timestamp? -> Objects.nonNull(obj) }.findFirst()
if (minUnloadedTimestamp.isPresent) {
// Decrement by 1 second since timestamp precision varies between databases.
val ts =
Expand All @@ -145,7 +145,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at
// all).
val minUnloadedTimestamp: Optional<Timestamp> =
timestampStream.filter { obj: Timestamp -> Objects.nonNull(obj) }.findFirst()
timestampStream.filter { obj: Timestamp? -> Objects.nonNull(obj) }.findFirst()
return InitialRawTableStatus(
true,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
includeCdcDeletedAt: Boolean,
streamId: StreamId,
suffix: String?,
records: List<JsonNode>
records: List<JsonNode>,
generationId: Long,
) {
// TODO handle generation ID
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
insertRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.functional.CheckedConsumer
import io.airbyte.commons.functional.CheckedFunction
import io.airbyte.commons.json.Jsons
Expand All @@ -55,8 +56,11 @@ import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.commons.util.AutoCloseableIterators
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
Expand Down Expand Up @@ -106,6 +110,21 @@ abstract class AbstractJdbcSource<Datatype>(
return false
}

override fun discover(config: JsonNode): AirbyteCatalog {
var catalog = super.discover(config)
var database = createDatabase(config)
catalog.streams.forEach(
Consumer { stream: AirbyteStream ->
stream.isResumable =
supportResumableFullRefresh(
database,
CatalogHelpers.toDefaultConfiguredStream(stream)
)
}
)
return catalog
}

open fun getInitialLoadHandler(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
Expand Down Expand Up @@ -233,6 +252,45 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

/**
* Checks that current user can SELECT from the tables in the schemas. We can override this
* function if it takes too long to finish for a particular database source connector.
*/
@Throws(Exception::class)
protected open fun checkUserHasPrivileges(config: JsonNode?, database: JdbcDatabase) {
var schemas = ArrayList<String>()
if (config!!.has(JdbcUtils.SCHEMAS_KEY) && config[JdbcUtils.SCHEMAS_KEY].isArray) {
for (schema in config[JdbcUtils.SCHEMAS_KEY]) {
schemas.add(schema.asText())
}
}
// if UI has schemas specified, check if the user has select access to any table
if (schemas.isNotEmpty()) {
for (schema in schemas) {
LOGGER.info {
"Checking if the user can perform select to any table in schema: $schema"
}
val tablesOfSchema = database.metaData.getTables(null, schema, "%", null)
if (tablesOfSchema.next()) {
var privileges =
getPrivilegesTableForCurrentUser<JdbcPrivilegeDto>(database, schema)
if (privileges.isEmpty()) {
LOGGER.info { "No table from schema $schema is accessible for the user." }
throw ConfigErrorException(
"User lacks privileges to SELECT from any of the tables in schema $schema"
)
}
} else {
LOGGER.info { "Schema $schema does not contain any table." }
}
}
} else {
LOGGER.info {
"No schema has been provided at the moment, skip table permission check."
}
}
}

/**
* Configures a list of operations that can be used to check the connection to the source.
*
Expand All @@ -252,9 +310,10 @@ abstract class AbstractJdbcSource<Datatype>(
CheckedFunction { connection: Connection -> connection.metaData.catalogs },
CheckedFunction { queryResult: ResultSet ->
sourceOperations.rowToJson(queryResult)
}
},
)
}
},
CheckedConsumer { database: JdbcDatabase -> checkUserHasPrivileges(config, database) },
)
}

Expand Down
Loading

0 comments on commit b705ec8

Please sign in to comment.