-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java Client] Fix messages sent by producers without schema cannot be decoded #15622
[Java Client] Fix messages sent by producers without schema cannot be decoded #15622
Conversation
I'm not sure whether this fix should be cherry-picked to older branches since it changes the current behavior (even if I think it's correct). Since it's a fix at producer side, for messages produced by older version producer, even if they are compatible with the schema, the consumer still cannot consume them. In this PR, I also checked |
if (version != null && version.length == 0) {
schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
return schemaFuture;
} This error message added in #14626 could make users more confused. A |
@BewareMyPower @Technoboy- I think this one is related to the discussion in the mailing list? |
@codelipenghui Thanks, I will reply in the email. @Technoboy- Because it still tried to perform topic look up with an empty byte array schema version, which always throws an exception before decoding the message value. #14626 just modifies the error message caused by the unexpected look up. See more details in the PR description and my previous explanation. |
There are still other failed tests caused by this change, I'll fix them soon. |
### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Make `MesasgeImpl#getSchemaVersion` return null if the schema version is an empty byte array so that the consumer can consume messages produced by older version producers without schema. And return the internal schema for `getRegetReaderSchema` when `getSchemaVersion` returns null. - Fix the existing tests. Since producer without schema won't set the `schema_version` field after this patch, some tests that rely on the precise stats should be modified. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`: > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `UncheckedExecutionException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object.
f98f59c
to
c328335
Compare
@codelipenghui @congbobo184 @Technoboy- @mattisonchao Now all required tests passed, PTAL. |
… decoded (#15622) ### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Make `MesasgeImpl#getSchemaVersion` return null if the schema version is an empty byte array so that the consumer can consume messages produced by older version producers without schema. And return the internal schema for `getRegetReaderSchema` when `getSchemaVersion` returns null. - Fix the existing tests. Since producer without schema won't set the `schema_version` field after this patch, some tests that rely on the precise stats should be modified. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`: > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object. (cherry picked from commit ecd275d)
… decoded (apache#15622) ### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Make `MesasgeImpl#getSchemaVersion` return null if the schema version is an empty byte array so that the consumer can consume messages produced by older version producers without schema. And return the internal schema for `getRegetReaderSchema` when `getSchemaVersion` returns null. - Fix the existing tests. Since producer without schema won't set the `schema_version` field after this patch, some tests that rely on the precise stats should be modified. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`: > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object. (cherry picked from commit ecd275d) (cherry picked from commit a4e6e2e)
… decoded (#15622) ### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Make `MesasgeImpl#getSchemaVersion` return null if the schema version is an empty byte array so that the consumer can consume messages produced by older version producers without schema. And return the internal schema for `getRegetReaderSchema` when `getSchemaVersion` returns null. - Fix the existing tests. Since producer without schema won't set the `schema_version` field after this patch, some tests that rely on the precise stats should be modified. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`: > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object. (cherry picked from commit ecd275d)
Motivation
When I tried to consume a topic via a consumer with Avro schema while
the topic was produced by a producer without schema, the consumption
failed. It's because
MultiVersionSchemaInfoProvider#getSchemaByVersion
doesn't check if
schemaVersion
is an empty byte array. If yes, aBytesSchemaVersion
of an empty array will be passed tocache.get
andthen passed to
loadSchema
.pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
Lines 94 to 98 in f90ef9c
However,
LookupService#getSchema
cannot accept an empty byte array asthe version, so
loadSchema
failed.The root cause is that the schema version was set unexpectly when
messages were sent by a producer without schema. At broker side, the
returned schema version is never null. If the schema version was an
empty array, then it means the message doesn't have schema. However, at
Java client side, the empty byte array is treated as an existing schema
and the schema version field will be set. When consumer receives the
message, it will try to load schema whose version is an empty array.
Modifications
byte array, just ignore it.
MesasgeImpl#getSchemaVersion
return null if the schema versionis an empty byte array so that the consumer can consume messages
produced by older version producers without schema. And return the
internal schema for
getRegetReaderSchema
whengetSchemaVersion
returns null.
schema_version
field after this patch, some tests that rely on theprecise stats should be modified.
testConsumeAvroMessagesWithoutSchema
to cover the case thatmessages without schema are compatible with the schema.
This patch also modifies the existing behavior when
schemaValidationEnforced
is false and messages are produced by aproducer without schema and consumed by a consumer with schema.
If the message is incompatible with the schema
Before:
getSchemaVersion
returns an empty array andgetValue
fails with
SerializationException
:After:
getSchemaVersion
returnsnull
andgetValue
fails withSchemaSerializationException
.Otherwise (the message is compatible with the schema)
getSchemaVersion
returns an empty array andgetValue
fails with
SerializationException
.getSchemaVersion
returnsnull
andgetValue
returns thecorrectly decoded object.
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
no-need-doc
(Please explain why)
doc
(Your PR contains doc changes)
doc-added
(Docs have been already added)