Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-9704: [Pulsar SQL] Query key-value schema data error #2239

Open
sijie opened this issue Mar 2, 2021 · 0 comments
Open

ISSUE-9704: [Pulsar SQL] Query key-value schema data error #2239

sijie opened this issue Mar 2, 2021 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Mar 2, 2021

Original Issue: apache#9704


Describe the bug
If the producer uses the key-value schema in a separate mode and disables the batch feature, the Pulsar SQL will get in trouble.

To Reproduce

  1. produce key-value schema data use separate mode

set enableBatching to false

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Schema<KeyValue<Stock, Stock>> schema = Schema.KeyValue(
        Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED);
String topic = "kv-schema-test2";

@Cleanup
Producer<KeyValue<Stock, Stock>> producer = pulsarClient
        .newProducer(schema)
        .topic(topic)
        .enableBatching(false)
        .create();

for (int i = 0; i < 10; i++) {
    producer.send(new KeyValue<>(
            new Stock(i, "K_STOCK_" + i, i * 100),
            new Stock(i, "V_STOCK_" + i, i * 100)));
}
@Data
public class Stock {
    private int entryId;
    private String symbol;
    private double sharePrice;
}
  1. query data by Pulsar SQL
  2. see the error logs
2021-02-24T21:01:22.914+0800	INFO	20210224_130122_00004_yzrcx.1.0-0-114	org.apache.pulsar.sql.presto.PulsarRecordCursor	Initializing split with parameters: PulsarSplit{splitId=0, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=5, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=0, endPositionEntryId=5, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.914+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	org.apache.pulsar.sql.presto.PulsarRecordCursor	Initializing split with parameters: PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=4, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr	java.lang.ArrayIndexOutOfBoundsException: -52
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.processInternal(Driver.java:379)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.processFor(Driver.java:276)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown Source)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at java.lang.Thread.run(Thread.java:748)
2021-02-24T21:01:22.974+0800	ERROR	SplitRunner-5-106	io.prestosql.execution.executor.TaskExecutor	Error processing Split 20210224_130122_00004_yzrcx.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=4, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}} (start = 2.86554176341414E8, wall = 53 ms, cpu = 0 ms, wait = 0 ms, calls = 1): GENERIC_INTERNAL_ERROR: Decoding avro record failed.
io.prestosql.spi.PrestoException: Decoding avro record failed.
	at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:70)
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
	at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
	at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
	at io.prestosql.operator.Driver.processInternal(Driver.java:379)
	at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
	at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
	at io.prestosql.operator.Driver.processFor(Driver.java:276)
	at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -52
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
	at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
	at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
	at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
	... 14 more

Expected behavior
Query data successfully.

Desktop (please complete the following information):

  • OS: MacOS 11.2.1

Additional context
Maybe related to this method in the class RawMessageImpl.

public Optional<ByteBuf> getKeyBytes() {
    if (getKey().isPresent()) {
        if (hasBase64EncodedKey()) {
            return Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
        } else {
            return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
        }
    }
    return Optional.empty();
}
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant