Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArrayIndexOutOfBoundsException in writing Parquet files #168

Closed
bashir2 opened this issue Jun 7, 2021 · 10 comments
Closed

ArrayIndexOutOfBoundsException in writing Parquet files #168

bashir2 opened this issue Jun 7, 2021 · 10 comments
Labels
bug Something isn't working P1:must As issue that definitely needs to be implemented in near future.

Comments

@bashir2
Copy link
Collaborator

bashir2 commented Jun 7, 2021

This is reported by @kimaina when running the batch pipeline on AMPATH DB for some specific dates. The stack trace for the exception is copied below. The error is from this line. To find the root cause we need some FHIR resources that trigger this; but while investigating this, I realized we are using a very old version of parquet-column which we should fix regardless of the root cause.

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ArrayIndexOutOfBoundsException: 9
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
        at org.openmrs.analytics.FhirEtl.runFhirJdbcFetch(FhirEtl.java:184)
        at org.openmrs.analytics.FhirEtl.main(FhirEtl.java:217)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 9
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.startGroup(MessageColumnIO.java:395)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecord(AvroWriteSupport.java:172)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:349)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.access$400(AvroWriteSupport.java:48)
        at org.apache.parquet.avro.AvroWriteSupport$TwoLevelListWriter.writeCollection(AvroWriteSupport.java:547)
        at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:395)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecord(AvroWriteSupport.java:173)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:349)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecord(AvroWriteSupport.java:173)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:349)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.access$400(AvroWriteSupport.java:48)
        at org.apache.parquet.avro.AvroWriteSupport$TwoLevelListWriter.writeCollection(AvroWriteSupport.java:547)
        at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:395)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
        at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
        at org.openmrs.analytics.ParquetUtil.write(ParquetUtil.java:186)
        at org.openmrs.analytics.ParquetUtil.writeRecords(ParquetUtil.java:245)
        at org.openmrs.analytics.FetchSearchPageFn.processBundle(FetchSearchPageFn.java:122)
        at org.openmrs.analytics.FetchResources$SearchFn.processElement(FetchResources.java:112)
@bashir2 bashir2 added bug Something isn't working P1:must As issue that definitely needs to be implemented in near future. labels Jun 7, 2021
@bashir2 bashir2 added this to the AMPATH_Deployment milestone Jun 7, 2021
@kimaina
Copy link
Collaborator

kimaina commented Jun 8, 2021

a few updates on this.. using --secondsToFlushParquetFiles=120 throws this error after 5 minutes; on the other hand
using --secondsToFlushParquetFiles=30 does not. This only happens in --activePeriod (time) where there are so many records, especially during high-picks of data-entry (between 8am and 11:30am)

@kimaina
Copy link
Collaborator

kimaina commented Jun 14, 2021

Tested the pipeline with --secondsToFlushParquetFiles=30 for one week data, still having the same error.

@bashir2
Copy link
Collaborator Author

bashir2 commented Jul 12, 2021

I spent some more time on this but have not manage to reproduce the problem yet. Without a reproducible case it is hard to tell what is wrong but from more reading of the code (and some searches) it feels that one record (i.e., one FHIR resource) should be the problem; so it is not obvious why setting --secondsToFlushParquetFiles can mask this issue.

@kimaina can you please try to reproduce this with your data and then reduce the --activePeriod until you pinpoint a few resources that cause this? If you can find a resource that would be great, if not running for a very small period with logging enabled may give us some clue. You can use this commit to enable DEBUG logging for org.apache.parquet.io.

@kimaina
Copy link
Collaborator

kimaina commented Jul 13, 2021

Thanks @bashir2 for following on this. Let me conduct more investigation then get back to you.

Thanks,
Allan

@kimaina
Copy link
Collaborator

kimaina commented Jul 13, 2021

A few updates... Running using 1 thread led to this

ERROR - ParquetUtil.write(190) |2021-07-13T18:07:30,337| Dropping Observation resource with id amrs/ws/fhir2/R4/Observation/c4d02bb3-d466-4f88-9574-bc2813e6ea34 due to exception: org.apache.avro.AvroTypeException: Cannot encode decimal with scale 16 as scale 4 without rounding 
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ArrayIndexOutOfBoundsException: 9
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
        at org.openmrs.analytics.FhirEtl.runFhirJdbcFetch(FhirEtl.java:185)
        at org.openmrs.analytics.FhirEtl.main(FhirEtl.java:218)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 9
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.startGroup(MessageColumnIO.java:402)
        at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:395)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecord(AvroWriteSupport.java:173)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:349)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.access$400(AvroWriteSupport.java:48)
        at org.apache.parquet.avro.AvroWriteSupport$TwoLevelListWriter.writeCollection(AvroWriteSupport.java:549)
        at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:397)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
        at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
        at org.openmrs.analytics.ParquetUtil.write(ParquetUtil.java:186)
        at org.openmrs.analytics.ParquetUtil.writeRecords(ParquetUtil.java:245)
        at org.openmrs.analytics.FetchSearchPageFn.processBundle(FetchSearchPageFn.java:122)
        at org.openmrs.analytics.FetchPatientHistory$1.ProcessElement(FetchPatientHistory.java:54)

Here is the specific obs

{
    "resourceType": "Observation",
    "id": "c4d02bb3-d466-4f88-9574-bc2813e6ea34",
    "status": "final",
    "category": [
        {
            "coding": [
                {
                    "system": "http://terminology.hl7.org/CodeSystem/observation-category",
                    "code": "laboratory",
                    "display": "Laboratory"
                }
            ]
        }
    ],
    "code": {
        "coding": [
            {
                "code": "a898c87a-1350-11df-a1f1-0026b9348838",
                "display": "BODY SURFACE AREA"
            }
        ]
    },
    "subject": {

    },
    "encounter": {
        "reference": "Encounter/xxx",
        "type": "Encounter"
    },
    "effectiveDateTime": "xxxxx",
    "issued": "xxxx",
    "valueQuantity": {
        "value": 1.7126976771553504
    }
}

@bashir2
Copy link
Collaborator Author

bashir2 commented Jul 14, 2021

Thanks @kimaina but this Observation resource should not fail the pipeline. It has the BigDecimal conversion bug (Issue #156) but that bug has an try/catch guard and does not fail the pipeline (as the first line of your log dump shows too, i.e., ERROR - ParquetUtil.write(190) ...). However, the next log lines indicate that you are indeed very close to a resource that demonstrates the ArrayIndexOutOfBoundsException issue.

kimaina added a commit to kimaina/openmrs-fhir-analytics that referenced this issue Jul 14, 2021
kimaina added a commit to kimaina/openmrs-fhir-analytics that referenced this issue Jul 14, 2021
kimaina added a commit to kimaina/openmrs-fhir-analytics that referenced this issue Jul 14, 2021
@kimaina
Copy link
Collaborator

kimaina commented Jul 14, 2021

Thanks @bashir2.

Please see this #188 which reproduces ArrayIndexOutOfBoundsException #168

Also a potential fix can be found in this PR #189

@bashir2
Copy link
Collaborator Author

bashir2 commented Jul 15, 2021

Thanks @kimaina, #188 is really helpful. So the core issue seems to be #156 and the try/catch we added before is creating other issues. So the TL;DR; is that we have to properly fix #156 (adding another more generic catch just hides the problem a little more and will probably bite us again).

The longer version is that all of the 4 resources you have in the new Observation bundle in #188, have the #156 bug. If you drop any of them we cannot reproduce #168 anymore. So it seems when the AvroTypeException in #156 happens we skip some of the required final steps for writing the Avro record, e.g., this call and that causes the AvroWriteSupport and its MessageColumnIO to end up in a bad state.

I think I know the root cause of #156 which is a discrepancy between the value scale and schema scale that Bunsen sets for decimal types (e.g., valueQuantity of Observation resources in this case). The fix is probably not in our code-base but rather in Bunsen. I will try to see if can find a proper fix.

@kimaina
Copy link
Collaborator

kimaina commented Jul 15, 2021

If you drop any of them we cannot reproduce #168 anymore.

Precisely!

Adding another more generic catch just hides the problem a little more and will probably bite us again?

Agreed, Bunsen fix should resolve this problem!

@bashir2
Copy link
Collaborator Author

bashir2 commented Jul 16, 2021

Let's close this issue and track this under #156.

@bashir2 bashir2 closed this as completed Jul 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1:must As issue that definitely needs to be implemented in near future.
Projects
None yet
Development

No branches or pull requests

2 participants