Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar SQL] Query key-value schema data error #9704

Open
gaoran10 opened this issue Feb 25, 2021 · 5 comments
Open

[Pulsar SQL] Query key-value schema data error #9704

gaoran10 opened this issue Feb 25, 2021 · 5 comments
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@gaoran10
Copy link
Contributor

Describe the bug
If the producer uses the key-value schema in a separate mode and disables the batch feature, the Pulsar SQL will get in trouble.

To Reproduce

  1. produce key-value schema data use separate mode

set enableBatching to false

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Schema<KeyValue<Stock, Stock>> schema = Schema.KeyValue(
        Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED);
String topic = "kv-schema-test2";

@Cleanup
Producer<KeyValue<Stock, Stock>> producer = pulsarClient
        .newProducer(schema)
        .topic(topic)
        .enableBatching(false)
        .create();

for (int i = 0; i < 10; i++) {
    producer.send(new KeyValue<>(
            new Stock(i, "K_STOCK_" + i, i * 100),
            new Stock(i, "V_STOCK_" + i, i * 100)));
}
@Data
public class Stock {
    private int entryId;
    private String symbol;
    private double sharePrice;
}
  1. query data by Pulsar SQL
  2. see the error logs
2021-02-24T21:01:22.914+0800	INFO	20210224_130122_00004_yzrcx.1.0-0-114	org.apache.pulsar.sql.presto.PulsarRecordCursor	Initializing split with parameters: PulsarSplit{splitId=0, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=5, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=0, endPositionEntryId=5, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.914+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	org.apache.pulsar.sql.presto.PulsarRecordCursor	Initializing split with parameters: PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=4, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr	java.lang.ArrayIndexOutOfBoundsException: -52
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
2021-02-24T21:01:22.959+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.processInternal(Driver.java:379)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
2021-02-24T21:01:22.960+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.operator.Driver.processFor(Driver.java:276)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown Source)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2021-02-24T21:01:22.961+0800	INFO	20210224_130122_00004_yzrcx.1.0-1-106	stderr		at java.lang.Thread.run(Thread.java:748)
2021-02-24T21:01:22.974+0800	ERROR	SplitRunner-5-106	io.prestosql.execution.executor.TaskExecutor	Error processing Split 20210224_130122_00004_yzrcx.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=4, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}} (start = 2.86554176341414E8, wall = 53 ms, cpu = 0 ms, wait = 0 ms, calls = 1): GENERIC_INTERNAL_ERROR: Decoding avro record failed.
io.prestosql.spi.PrestoException: Decoding avro record failed.
	at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:70)
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
	at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
	at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
	at io.prestosql.operator.Driver.processInternal(Driver.java:379)
	at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
	at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
	at io.prestosql.operator.Driver.processFor(Driver.java:276)
	at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -52
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
	at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
	at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
	at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
	at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
	... 14 more

Expected behavior
Query data successfully.

Desktop (please complete the following information):

  • OS: MacOS 11.2.1

Additional context
Maybe related to this method in the class RawMessageImpl.

public Optional<ByteBuf> getKeyBytes() {
    if (getKey().isPresent()) {
        if (hasBase64EncodedKey()) {
            return Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
        } else {
            return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
        }
    }
    return Optional.empty();
}
@codelipenghui
Copy link
Contributor

@gaoran10 Is this one is fixed by #9685?

@gaoran10
Copy link
Contributor Author

I think this issue has not been solved yet. The key point is use the key-value schema in seperate mode and disable the batch feature, if enable the batch feature the Pulsar SQL works well.

@codelipenghui
Copy link
Contributor

@congbobo184 Could you please take a look again? @gaoran10 Do you mean if use key-value schema in separate mode and disabled the message batching, the problem still there?

@gaoran10
Copy link
Contributor Author

@gaoran10 Do you mean if use key-value schema in separate mode and disabled the message batching, the problem still there?

Yes, I think the problem still exists.

@codelipenghui
Copy link
Contributor

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants