-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Destination Postgres: fix \u0000(NULL) value processing #5336
Conversation
…f SqlOperations to PostgresSqlOperations.
@subodh1810 would you be able to take a look at this one? |
Could not look into it today, will look into it tomorrow first thing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have 1 comment.
private List<AirbyteRecordMessage> formatRecords(List<AirbyteRecordMessage> records) { | ||
// Postgres fails if json contains \u0000 unicode (NULL) in a json. | ||
records.forEach(airbyteRecordMessage -> airbyteRecordMessage | ||
.setData(Jsons.deserialize(Jsons.serialize(airbyteRecordMessage.getData()).replaceAll("\\\\u0000", "")))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine but my concern is that doing the Jsons.deserialize(Jsons.serialize
for each record here is going to have a performance impact. How about we move this to BufferedStreamConsumer
. We already have a string conversion here so it would save us from doing the serialization twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Thanks ;)
…ecessary serialization
Do we sort of have a timeline when this will be ready? I am currently experiencing this issue and I can't replicate a very important table.Thanks. |
/test connector=destination-postgres
|
@DoNotPanicUA can you trigger the tests for all the connectors that use |
...ors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DataAdapter.java
Outdated
Show resolved
Hide resolved
adaptValueNodes(null, rootNode, null); | ||
} | ||
|
||
private void adaptValueNodes(String fieldName, JsonNode node, JsonNode parentNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am not sure I follow the logic of this method. When can node.isValueNode()
be true? Also I dont like the fact that fieldName
can be null and we dont have a null check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if an element contains a value - it's a value node. An element also might be an array or object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DoNotPanicUA can you put a comment on this method explaining how it works so that its clear for anyone who is reading this code for the first time
...stination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java
Outdated
Show resolved
Hide resolved
...stination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java
Outdated
Show resolved
Hide resolved
// TODO Truncate json data instead of throwing whole record away? | ||
// or should we upload it into a special rejected record folder in s3 instead? | ||
var emittedAt = Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())); | ||
pairToCopier.get(pair).write(id, data, emittedAt); | ||
pairToCopier.get(pair).write(id, Jsons.serialize(recordMessage.getData()), emittedAt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a problem, we are not going to do Jsons.serialize
twice for Redshift. First in the isValidData
method and then here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently, it's designed in a way that we don't have many options. Original implementation does additional serialization for all destinations. Here we have one additional serialization for one destination and only for the Copy flow. So, we already do a significant improvement here.
I propose to create a new issue for the RedShift destination improvement in order to unblock the Postgres issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! Please create a follow up issue to resolve this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment explaining the logic for method adaptValueNodes
adaptValueNodes(null, rootNode, null); | ||
} | ||
|
||
private void adaptValueNodes(String fieldName, JsonNode node, JsonNode parentNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DoNotPanicUA can you put a comment on this method explaining how it works so that its clear for anyone who is reading this code for the first time
/test connector=destination-postgres
|
/test connector=destination-meilisearch
|
/test connector=destination-mssql
|
/test connector=destination-mysql
|
/test connector=destination-oracle
|
/test connector=destination-redshift
|
/test connector=destination-snowflake
|
/publish connector=connectors/destination-postgres
|
What
Fix #3476
In addition, small rework of jdbc-destination and move specific implementation to the PostgresSqlOperations.
// todo (cgardens) - move this into a postgres version of this. this syntax is postgres-specific
How
Replace all \u0000 unicode values from airbyte data messages
Recommended reading order
PostgresSqlOperations.java
Pre-merge Checklist
Updating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described here