Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support JsonSchema anyOf when writing Parquet/Avro in S3 destination #4294

Closed
olivermeyer opened this issue Jun 23, 2021 · 7 comments · Fixed by #4613
Closed

Support JsonSchema anyOf when writing Parquet/Avro in S3 destination #4294

olivermeyer opened this issue Jun 23, 2021 · 7 comments · Fixed by #4613
Assignees

Comments

@olivermeyer
Copy link
Contributor

Expected Behavior

I have a connection between Salesforce and S3 (Parquet). The expected behaviour is that the sync should work, and data should be written to S3.

Current Behavior

The sync starts but quickly hangs with no further messages in the logs.

Logs

Since the Salesforce connector exposes credentials in plain text in the logs, I cannot post them in full. However, I found the following which seems relevant:

2021-06-23 14:25:34 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-06-23 14:25:34 [33mWARN[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):78 - {} - Airbyte message consumer: failed.
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.IllegalStateException: Field CreatedDate has no type
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.parquet.JsonToAvroSchemaConverter.getTypes(JsonToAvroSchemaConverter.java:74)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.parquet.JsonToAvroSchemaConverter.getNonNullTypes(JsonToAvroSchemaConverter.java:68)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.parquet.JsonToAvroSchemaConverter.getNullableFieldTypes(JsonToAvroSchemaConverter.java:189)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.parquet.JsonToAvroSchemaConverter.getAvroSchema(JsonToAvroSchemaConverter.java:137)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory.create(ProductionWriterFactory.java:58)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.S3Consumer.startTracked(S3Consumer.java:103)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.start(FailureTrackingAirbyteMessageConsumer.java:54)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:127)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:113)
2021-06-23 14:25:34 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:49)

Steps to Reproduce

  1. Set up a Salesforce source
  2. Set up an S3 destination, with Parquet format
  3. Set up a connection between the two, syncing the Account stream (might affect other streams as well); trigger the sync and wait

Severity of the bug for you

Critical - CSV is not acceptable as a file format for us, and not having this connection is an immediate showstopper.

Airbyte Version

0.26.2-alpha

Connector Version (if applicable)

Salesforce: 0.2.1
S3: 0.1.6

Additional context

I tried syncing another stream (UserPreference) and ran into the same issue. The logs were similar too:

2021-06-23 14:39:22 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.IllegalStateException: Field SystemModstamp has no type

There definitely seems to be a pattern, but I'm not familiar enough with Airbyte's internals to understand it.

I can also confirm that the following works:

  • Reading from another source and writing to S3 in Parquet
  • Reading from Salesforce and writing to S3 in CSV
@olivermeyer olivermeyer added the type/bug Something isn't working label Jun 23, 2021
@sherifnada sherifnada added the area/connectors Connector related issues label Jun 23, 2021
@sherifnada
Copy link
Contributor

@olivermeyer could you download the full log from the UI and share it here?

@tuliren
Copy link
Contributor

tuliren commented Jun 23, 2021

Here is the schema of the field causing the problem:
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce-singer/sample_files/configured_catalog.json#L207

"SystemModstamp": {
  "anyOf": [
    {
      "type": "string",
      "format": "date-time"
    },
    {
      "type": ["string", "null"]
    }
  ]
}

The root cause of this failure is that when writing to Parquet, we perform a Json schema to Parquet schema conversion, and currently we don’t support the anyOf, allOf, oneOf keyword, as documented here:
https://docs.airbyte.io/integrations/destinations/s3#data-schema

But on a second thought, it is probably not difficult to support those keywords. Although there is no direct equivalent keywords in Parquet schema, we can just have a less stringent type union as a workaround.

@tuliren tuliren self-assigned this Jun 23, 2021
@olivermeyer
Copy link
Contributor Author

Thanks for looking into this @tuliren. That makes sense. I'll follow this issue for a fix :-)

@sherifnada sherifnada changed the title Salesforce to S3-Parquet connector does not work Support JsonSchema anyOf when writing Parquet/Avro in S3 destination Jun 24, 2021
@MaxwellJK
Copy link
Contributor

MaxwellJK commented Jun 30, 2021

This is a workaround I applied to file

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/JsonToAvroSchemaConverter.java

and seems to work.
Inline comments explain my basic solution, but happy to provide more info if anybody else is brave enough to give it a go

  Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) {
    // Filter out null types, which will be added back in the end.

    // ******************
    // Defining a new JsonNode with the content from the input parameter then I check if there is "anyOf" in the JsonNode
    // if there is, I define an ArrayNode with the content of thisFieldDefinition.get("anyOf")
    // I extract the first element which is then used in the rest of the function
    // Not ideal but considering the Json schema, it works for me as I finally get all the timestamps from Salesforce

    JsonNode thisFieldDefinition = fieldDefinition;
    if (thisFieldDefinition.get("anyOf") != null)
    {
      ArrayNode anyOf = (ArrayNode) thisFieldDefinition.get("anyOf");
      thisFieldDefinition = anyOf.elements().next();
    }

    // ******************

    List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, thisFieldDefinition.get("type"))
        .stream()
        .flatMap(fieldType -> {
          Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, thisFieldDefinition, true);
          if (singleFieldSchema.isUnion()) {
            return singleFieldSchema.getTypes().stream();
          } else {
            return Stream.of(singleFieldSchema);
          }
        })
        .distinct()
        .collect(Collectors.toList());  
         
    if (nonNullFieldTypes.isEmpty()) {
      return Schema.create(Schema.Type.NULL);
    } else {
      // Mark every field as nullable to prevent missing value exceptions from Parquet.
      nonNullFieldTypes.add(0, Schema.create(Schema.Type.NULL));
      return Schema.createUnion(nonNullFieldTypes);
    }
  }

@tuliren
Copy link
Contributor

tuliren commented Jul 7, 2021

@MaxwellJK, thanks for the workaround. It only works for the first element within the anyOf array though. This PR #4613 should solve the issue generically.

@MaxwellJK
Copy link
Contributor

yeah I know, I just quickly developed it to fix the problem I was having with Salesforce.
I'm glad you've found a better solution though! well done :)

@olivermeyer
Copy link
Contributor Author

Thanks @tuliren!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants