Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter #2239

Closed
asfimport opened this issue Oct 10, 2018 · 12 comments
Closed

Comments

@asfimport
Copy link
Collaborator

asfimport commented Oct 10, 2018

The following unit test added to TestAvroSchemaConverter fails

@Test
public void testConvertedSchemaToStringCantRedefineList() throws Exception {
  String parquet = "message spark_schema {\n" +
      "  optional group annotation {\n" +
      "    optional group transcriptEffects (LIST) {\n" +
      "      repeated group list {\n" +
      "        optional group element {\n" +
      "          optional group effects (LIST) {\n" +
      "            repeated group list {\n" +
      "              optional binary element (UTF8);\n" +
      "            }\n" +
      "          }\n" +
      "        }\n" +
      "      }\n" +
      "    }\n" +
      "  }\n" +
      "}\n";

  Configuration conf = new Configuration(false);
  AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
  Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType(parquet));
  schema.toString();
}

while this one succeeds

@Test
public void testConvertedSchemaToStringCantRedefineList() throws Exception {
  String parquet = "message spark_schema {\n" +
      "  optional group annotation {\n" +
      "    optional group transcriptEffects (LIST) {\n" +
      "      repeated group list {\n" +
      "        optional group element {\n" +
      "          optional group effects (LIST) {\n" +
      "            repeated group list {\n" +
      "              optional binary element (UTF8);\n" +
      "            }\n" +
      "          }\n" +
      "        }\n" +
      "      }\n" +
      "    }\n" +
      "  }\n" +
      "}\n";
 
  Configuration conf = new Configuration(false);
  conf.setBoolean("parquet.avro.add-list-element-records", false);
  AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
  Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType(parquet));
  schema.toString();
}

I don't see a way to influence the code path in AvroIndexedRecordConverter to respect this configuration, resulting in the following stack trace downstream


  Cause: org.apache.avro.SchemaParseException: Can't redefine: list
  at org.apache.avro.Schema$Names.put(Schema.java:1128)
  at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
  at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
  at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
  at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
  at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
  at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
  at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
  at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
  at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
  at org.apache.avro.Schema.toString(Schema.java:324)
  at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
  at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
  at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333)
  at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172)
  at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
  at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168)
  at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
  at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66)
  at org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34)
  at org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144)
  at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136)
  at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
...

See also downstream issues
https://issues.apache.org/jira/browse/SPARK-25588
bigdatagenomics/adam#2058

Reporter: Michael Heuer
Assignee: Nándor Kollár / @nandorKollar

Related issues:

PRs and other links:

Note: This issue was originally created as PARQUET-1441. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Michael Heuer:
I've found I can get a similar stack trace going through AvroRecordConverter instead of AvroIndexedRecordConverter, by setting parquet.avro.compatible to false

    val job = HadoopUtil.newJob(sc)
    val conf = ContextUtil.getConfiguration(job)
    conf.setBoolean("parquet.avro.compatible", false)

  Cause: org.apache.avro.SchemaParseException: Can't redefine: list
  at org.apache.avro.Schema$Names.put(Schema.java:1128)
  at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
  at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
  at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
  at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
  at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
  at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
  at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
  at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
  at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
  at org.apache.avro.Schema.toString(Schema.java:324)
  at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
  at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
  at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)
  at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
  at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
  at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
  at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
  at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
  at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
  at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
  at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
...

@asfimport
Copy link
Collaborator Author

antonkulaga:
Any updates on this? This bug blocks ADAM library and hence blocks most of bioinformaticians using Spark.

@asfimport
Copy link
Collaborator Author

Thiruvalluvan M. G.:
The behavior here is due to the following. For every group in Parquet schema, AvroSchemaConverter constructs a record schema in Avro. Avro's record schemas has a required attribute name. AvroSchemaConverter assigns the name of the group as the name for Avro's record schema. In this example, there are two groups with the same name list. Additionally, Avro requires that schema names be unique. So the two instances of list as name causes Avro to complain. A simple fix is to remove the conflict, for example by renaming the second list to list2:

@Test
public void testConvertedSchemaToStringCantRedefineList() throws Exception {
  String parquet = "message spark_schema {\n" +
      "  optional group annotation {\n" +
      "    optional group transcriptEffects (LIST) {\n" +
      "      repeated group list {\n" +
      "        optional group element {\n" +
      "          optional group effects (LIST) {\n" +
      "            repeated group list2 {\n" +
      "              optional binary element (UTF8);\n" +
      "            }\n" +
      "          }\n" +
      "        }\n" +
      "      }\n" +
      "    }\n" +
      "  }\n" +
      "}\n";

  Configuration conf = new Configuration(false);
  AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
  Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType(parquet));
  schema.toString();
}

I've verified that this indeed fixes this test.

[~heuermh] If this workaround solves your problem, please resolve this issue and also the corresponding issue in Avro AVRO-2272. If this workaround is not possible for you to implement, please let us know why. Thank you.

@asfimport
Copy link
Collaborator Author

Nándor Kollár / @nandorKollar:
The workaround proposed above should work, however it is not 100% compliant with Parquet LogicalTypes spec. It states, that "The middle level, named list, must be a repeated group with a single field named element.", but fortunately due to backward compatibility rules for nested type, it doesn't cause error.

@asfimport
Copy link
Collaborator Author

Thiruvalluvan M. G.:
Great point @nandorKollar! Here is something that we can do, without breaking the rules of either system. Parquet wants the name of the repeated group to be list and Avro does not like more than one list. Then, can we modify the code AvroSchemaConverter to attach a unique suffix (say, a number) for name of the generated Avro record schema? Essentially, we'd automatically do what I proposed to do manually earlier. In order not to surprise those who used only one list, we can make the name of the first list be still list. For every other place we add an increasing number starting with 1. Will it work?

@asfimport
Copy link
Collaborator Author

Nándor Kollár / @nandorKollar:
Well, I think the correct solution is setting parquet.avro.add-list-element-records false, like in the second test case in the attached PR.

@asfimport
Copy link
Collaborator Author

Michael Heuer:
Note as mentioned above that while parquet.avro.add-list-element-records=false works in the unit tests, it does not appear work with AvroIndexedRecordConverter, which is what we hit downstream in Spark.

As far as workarounds, I'm afraid we're so far downstream that I'm not sure we would be able to use one.  We use Avro AVDL to generate Java objects for persisting Spark RDDs to Parquet and separately to generate Scala products for persisting Spark Datasets to Parquet.  Spark generates the schema for these Datasets-as-Parquet.  Up until Spark version 2.4.0, which bumped Parquet to version 1.10 and Avro to 1.8.2, we could write out Datasets-as-Parquet and read in RDDs-as-Parquet without trouble (the two different schema were considered compatible).

@asfimport
Copy link
Collaborator Author

Nándor Kollár / @nandorKollar:
After a bit more investigation, I think this is probably a Parquet issue, however it isn't clean to me how can this be a regression. The failing code path in Parquet and in Avro was committed long ago, what I can think of as a possibility is that Spark might have recently moved from 2-level list structure to 3-level lists.

The unit test attached to this PR doesn't reflect the problem, because I think it tests the correct behaviour: in the converter one can switch between 2 and 3 level list with parquet.avro.add-list-element-records property. The test for the Spark Jira is a lot more informative.

I think that the problem is that AvroRecordConverter tries to decide between 3 and 2 level list by first trying to interpret the schema as 2 level, and check the compatibility with the expected Avro schema. Normally, the two are incompatible (if it was written as 3 level), and Parquet will know that it is a 3-level list. This works fine when lists are not nested into other lists, but if we try to represent the 3 level nested list Parquet structure as 2 level, the resulting 2 level Avro schema is not even a valid Avro schema!

@asfimport
Copy link
Collaborator Author

Nándor Kollár / @nandorKollar:
Looks like this compatibility check broke this scenario. The commit wasn't committed when 1.8.1 was released, but it is backported to all other later branches (including 1.8.x, so if 1.8.2 is gets released, then this case will break there too).

@asfimport
Copy link
Collaborator Author

Michael Heuer:
Sorry, which compatibility check and commit?  I'm also confused by the version numbers in your comment, both Parquet and Avro have made 1.8.2 releases.

The regression is complicated and perhaps not worth discussing here, by Spark moving to Parquet 1.10 and Avro 1.8.2 our previous workaround of pinning parquet-avro to 1.8.1 no longer works.  That workaround was necessary because Spark depended on Parquet 1.8.2 and Avro 1.7.x which were incompatible with each other.

@asfimport
Copy link
Collaborator Author

Nándor Kollár / @nandorKollar:
The compatibility check introduced by PARQUET-651 in AvroRecordConverter, this. I was referring to Parquet version, 1.8.1 doesn't have this change, while 1.8.2 already has.

@asfimport
Copy link
Collaborator Author

Nándor Kollár / @nandorKollar:
Ok, so looks like there is a more fundamental problem related to this. Parquet allows using the same name in nested structures, while Avro doesn't allow it for records. For example, a file with this Parquet schema

message Message {
    optional group a1 {
        required float a2;
        optional group a1 {
            required float a4;
        }
    }
}

is not readable via AvroParquetReader. Of course this could be easily solved by renaming the inner a1 to something else, but for lists, this doesn't work. I think using Avro namespaces during schema conversion could fix this bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant