-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JDBC Sources: validate actual source schema #21844
JDBC Sources: validate actual source schema #21844
Conversation
/test connector=connectors/source-clickhouse
Build PassedTest summary info:
|
/test connector=connectors/source-postgres
Build PassedTest summary info:
|
/test connector=connectors/source-mysql
Build PassedTest summary info:
|
/test connector=connectors/source-redshift
Build PassedTest summary info:
|
/test connector=connectors/source-snowflake
Build PassedTest summary info:
|
Affected Connector ReportNOTE
|
Connector | Version | Changelog | Publish |
---|---|---|---|
source-alloydb |
1.0.36 |
✅ | ✅ |
source-alloydb-strict-encrypt |
1.0.36 |
🔵 (ignored) |
🔵 (ignored) |
source-bigquery |
0.2.3 |
✅ | ✅ |
source-clickhouse |
0.1.15 |
✅ | ✅ |
source-clickhouse-strict-encrypt |
0.1.15 |
🔵 (ignored) |
🔵 (ignored) |
source-cockroachdb |
0.1.19 |
✅ | ✅ |
source-cockroachdb-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
source-db2 |
0.1.17 |
✅ | ✅ |
source-db2-strict-encrypt |
0.1.17 |
🔵 (ignored) |
🔵 (ignored) |
source-dynamodb |
0.1.0 |
✅ | ✅ |
source-jdbc |
0.3.5 |
🔵 (ignored) |
🔵 (ignored) |
source-mongodb-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
source-mongodb-v2 |
0.1.19 |
✅ | ✅ |
source-mssql |
0.4.28 |
✅ | ✅ |
source-mssql-strict-encrypt |
0.4.28 |
🔵 (ignored) |
🔵 (ignored) |
source-mysql |
1.0.21 |
✅ | ✅ |
source-mysql-strict-encrypt |
1.0.21 |
🔵 (ignored) |
🔵 (ignored) |
source-oracle |
0.3.22 |
✅ | ✅ |
source-oracle-strict-encrypt |
0.3.22 |
🔵 (ignored) |
🔵 (ignored) |
source-postgres |
1.0.42 |
✅ | ✅ |
source-postgres-strict-encrypt |
1.0.42 |
🔵 (ignored) |
🔵 (ignored) |
source-redshift |
0.3.16 |
✅ | ✅ |
source-scaffold-java-jdbc |
0.1.0 |
🔵 (ignored) |
🔵 (ignored) |
source-snowflake |
0.1.29 |
✅ | ✅ |
source-tidb |
0.2.2 |
✅ | ✅ |
- See "Actionable Items" below for how to resolve warnings and errors.
✅ Destinations (0)
Connector | Version | Changelog | Publish |
---|
- See "Actionable Items" below for how to resolve warnings and errors.
✅ Other Modules (0)
Actionable Items
(click to expand)
Category | Status | Actionable Item |
---|---|---|
Version | ❌ mismatch |
The version of the connector is different from its normal variant. Please bump the version of the connector. |
⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
|
Changelog | ⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
❌ changelog missing |
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog. | |
Publish | ⚠ not in seed |
The connector is not in the seed file (e.g. source_definitions.yaml ), so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that it is not a bug. |
❌ diff seed version |
The connector exists in the seed file, but the latest version is not listed there. This usually means that the latest version is not published. Please use the /publish command to publish the latest version. |
|
||
final JsonNode catalogSchema = stream.getJsonSchema(); | ||
if (!catalogSchema.equals(currentJsonSchema)) { | ||
LOGGER.warn("Source schema changed for table {}! Actual schema: {}. Catalog schema: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an error state?
Should we do something other than warn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Can we actually throw a ConfigErrorException
here so that this is actionable to the user? With a message maybe : "The underlying schema changed for the table. Please refresh your source schema! Source schema changed for table {}! Actual schema: {}. Catalog schema: {}"
Along those lines. In addition, could you add a unit test as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodireich @akashkulk
I'm not sure that throwing an exception here would be correct in all cases, the sync would fail if the datatype changed from integer to float, but if the change were the reverse (from float to integer), and all values in the database contained only integers, then the sync would be successful, also if the user manually added some columns - the sync will be successful, but in some cases when the column is deleted - the sync will fail
I think that there may be other possible variations of a successful/unsuccessful sync, for example, using an incremental mode, but at the same time, throwing an exception here will lead to a complete fail of the sync, while only one stream can fail, for which the user manually changed structure in the database, and all other streams should not fail
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So some changes can go through but other will require to refresh the source schema.
My question:
for non-breaking change (e.g user adds a column) - can we keep on syncing indefinitely with the old schema?
for breaking change (e.g user removes a column) - at what stage will the sync fail, is it deterministic or depending on sync type and other factors.
I think this change is making it better for identifying a state we're unaware of today, until something breaks. I wonder if we there's a good way to know what to do? (maybe it's impossible).
+1 to Akash's suggestion for adding a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodireich added the unit test
Answering your questions:
for non-breaking change - yes, sync could be indefinitely with the old schema
for breaking change - i'm not sure that we could determine the exact stage of sync failure . Potentially - it could be
- On normalization step as described here
- During WRITE(avro values mapping to the old avro schema or during json schema formatting in BigQuery Denormalized)
- During READ (at any place where old catalog used)
@VitaliiMaltsev the change looks good. Otherwise the warning would keep getting logged but connection would still fail eventually, right? |
Airbyte Code Coverage
|
/test connector=connectors/source-postgres
Build PassedTest summary info:
|
@@ -180,6 +183,35 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, | |||
}); | |||
} | |||
|
|||
private void validateSourceSchema(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo, | |||
ConfiguredAirbyteCatalog catalog, | |||
Database database) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove passing in the Database
parameter. It's not being used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove passing in the
Database
parameter. It's not being used
removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The platform already has an auto detect schema change feature which will catch these schema drifts, so I'm not sure if we need this logging.
@@ -253,6 +262,45 @@ public void testCanReadUtf8() throws Exception { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testValuesChangedOnChangedSourceSchema() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what's being tested in this test: IIUC this test is verifying that if the underlying schema is changed, the records read will be different
But won't this always be true? Across two different reads
, even with without the changes in this PR the read will always return different records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what's being tested in this test: IIUC this test is verifying that if the underlying schema is changed, the records read will be different
But won't this always be true? Across two different
reads
, even with without the changes in this PR the read will always return different records
updated this test with 4 different reads with the same catalog:
1 read - initial sync (integers)
2 read - nothing changed ==> values are same as in 1 read (integers)
3 read - added 1 record into the source table ==> added 1 airbyte record (integers)
4 read - user changed column definition from int to float ==> values in airbyte records become double
This test fully emulate behaviour from this issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you aren't actually checking for the actual change you're making here, correct? i.e. you aren't verifying the correct logs are being outputted.
In that case, I think you should remove this test altogether since that change is a logging one (and not a validation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you aren't actually checking for the actual change you're making here, correct? i.e. you aren't verifying the correct logs are being outputted.
In that case, I think you should remove this test altogether since that change is a logging one (and not a validation)
removed the test
...elational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java
Show resolved
Hide resolved
I did not find any platform logging if the source schema changed with the comparison actual schema vs catalog shema. Where can i find it? |
@@ -180,6 +183,34 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, | |||
}); | |||
} | |||
|
|||
private void validateSourceSchema(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be changed to be named logSourceSchemaChange
validate implies that if it isn't the same you'd be throwing an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add a quick comment on what this function is doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add a quick comment on what this function is doing
renamed this method and added a comment
...elational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java
Show resolved
Hide resolved
It's not in the logs. However, there is a new Review changes interstitial introduced. https://www.loom.com/i/e17df917d481406faed839595d3e97c4 |
* JDBC Sources: validate actual source schema * add unit test * updated test cases * refactoring
What
From 21413
If the schema of the source was manually changed by the user and the structure of the table was changed (columns were added, data types were changed), but the user did not refresh and did not save the new schema in airbyte-db, the sync may fail
How
Validate actual source schema vs catalog schema and log warning if the source schema was changed
Recommended reading order
AbstractDbSource.java
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.