Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro schema incompatible with schema upload vs python producer #9571

Open
markussteininger opened this issue Feb 12, 2021 · 11 comments
Open
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@markussteininger
Copy link

Describe the bug
Hi i just want to know if i do something wrong or if someone has identify following behavior (and found some solution for fixing it)
I create a topic and upload the Avro Schema as Yaml

pulsar/bin/pulsar-admin topics create persistent://schema/test3/foo22
/pulsar/bin/pulsar-admin schemas upload persistent://schema/test3/foo22 -f /git_files/schema/schema-validation.avro

Content of Avro Yaml Schema:
{"type": "AVRO","schema":"{\"name\":\"Validation\",\"type\":\"record\",\"fields\":[{\"name\":\"messagetimestamp\",\"type\":\"string\"}]}","properties": {}}

Schema looks like that with schemas get

{
  "version": 0,
  "schemaInfo": {
    "name": "foo22",
    "schema": {
      "name": "Validation",
      "type": "record",
      "fields": [
        {
          "name": "messagetimestamp",
          "type": "string"
        }
      ]
    },
    "type": "AVRO",
    "properties": {}
  }
}

set-is-allow-auot-update-schema is set to disable(!) to cover that nowone is “destroying” or topic
but if i know use a python producer the schema is failing with “incompatible schema”
python code:

class Validation(Record):
    messagetimestamp = String(required=True)
producer = client.create_producer(topic, schema=AvroSchema(Validation))
inputString='Test55'
producer.send(Validation(
    messagetimestamp = inputString
    ))
print("Message send")
client.close()

same behaviour with python pulsar functions .
if i turn on the auto schema update a new schema is created but total same structure:

{
  "version": 1,
  "schemaInfo": {
    "name": "foo22",
    "schema": {
      "name": "Validation",
      "type": "record",
      "fields": [
        {
          "name": "messagetimestamp",
          "type": "string"
        }
      ]
    },
    "type": "AVRO",
    "properties": {}
  }
}

Any idea why that happens and how can i avoid that multiple producer (written in Java, Python) are changing the schema all time?!
To Reproduce
Steps to reproduce the behavior:

  1. create topic
  2. upload avro Schema (with yml reference -f yml file)
  3. deactivate auto schema update
  4. create python consumer (incompatible schema)
  5. enable auto schema update
  6. check schema (schema version changed)

Expected behavior
Schema version shouldn't be updated if python schema is the same as given in the schema upload (could it be that there is some hidden character given?!)

@ta1meng
Copy link

ta1meng commented Feb 18, 2021

This might be a duplicate of #8510.

I ran into the same issues using the Java client.

Primary issue: I believe that the above two schema versions are not identical. They likely contain a whitespace difference. Unfortunately, Pulsar seems to consider two schema definitions that differ by whitespace different, and compatible.

If aligning the whitespacing of the schema definitions resolve your problem, please mark this ticket a duplicate of #8510.

--

You are right to disable auto updates. If the above worked, you don't have to read the below text. I'm sharing with you how I set the various schema policies for your reference. I've not had anyone review these policy settings, so if you see something dubious, feel free to ask me to elaborate.

bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable <tenant/namespace>

bin/pulsar-admin namespaces set-schema-compatibility-strategy <tenant/namespace> --compatibility FORWARD_TRANSITIVE

bin/pulsar-admin namespaces set-schema-validation-enforce --enable <tenant/namespace>

bin/pulsar-admin namespaces set-schema-autoupdate-strategy <tenant/namespace> --disabled

The schema related policies for my namespace look like:

bin/pulsar-admin namespaces policies <tenant/namespace>

"schema_auto_update_compatibility_strategy" : "AutoUpdateDisabled", "schema_compatibility_strategy" : "FORWARD_TRANSITIVE", "is_allow_auto_update_schema" : false, "schema_validation_enforced" : true

@codelipenghui
Copy link
Contributor

codelipenghui commented Mar 2, 2021

@ta1meng could you please verify if #9612 is fixed this problem?

@ta1meng
Copy link

ta1meng commented Mar 2, 2021

I'm not working on Pulsar this sprint but I've inserted a task into the sprint to do this testing. I'll post an update here once I've done the testing.

@ta1meng
Copy link

ta1meng commented Mar 24, 2021

I downloaded Pulsar 2.7.1 and checked one case. I uploaded an Avro schema through pulsar-admin. The schema had an extra newline in it.

In Pulsar 2.7.0 that newline made it impossible for the producer application to publish to the topic with that schema registered. The client side error I got was:

org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Schema not found and schema auto updating is disabled.

In Pulsar 2.7.1, the extra newline made no difference. Producing with a schema without that extra newline succeeded!

@ta1meng
Copy link

ta1meng commented Apr 26, 2021

@codelipenghui @congbobo184 Today I tried the Python client as well, against Pulsar 2.7.1. The same use case worked for me in Python, as expected (because the fix in #9612 was a server-side change).

@ta1meng
Copy link

ta1meng commented Apr 27, 2021

Retracting the previous comment. I got the Python client to work only against required float fields. I was unable to get it to work against required string fields.

While testing, I fixed two bugs locally. There seems to be another bug concerning string fields.

All of the bugs give the "IncompatibleSchema" error.

It's late today and I'll resume testing tomorrow.

@ta1meng
Copy link

ta1meng commented Apr 28, 2021

I went deeper today and now believe that the original issue, in its exact form, was resolved by #9612.

Here are all the cases I got working, including workarounds where I encountered bugs.

All schemas were uploaded through pulsar-admin. Producers and consumers were disallowed from updating schemas.

CAVEAT: Pulsar does not support "default": null fully. The workaround is to remove it from any Avro schema that contains it. When "default": null is removed, the resulting behavior is the same as if it were specified.

{
    "type": "AVRO",
    "schema": "{\"name\":\"Latitude\",\"type\":\"record\",\"fields\":[{\"name\":\"latitude\",\"type\":\"float\"}]}",
    "properties": {}
}
class Latitude(Record):
   latitude = Float(required=True)
{
    "type": "AVRO",
    "schema": "{\"name\":\"Latitude\",\"type\":\"record\",\"fields\":[{\"name\":\"latitude\",\"type\":\"float\"},{\"name\":\"longitude\",\"type\":\"float\"}]}",
    "properties": {}
}
class Latitude(Record):
   latitude = Float(required=True)
   longitude = Float(required=True)
{
    "type": "AVRO",
    "schema": "{\"name\":\"Latitude\",\"type\":\"record\",\"fields\":[{\"name\":\"latitude\",\"type\":\"float\"},{\"name\":\"longitude\",\"type\":\"float\"},{\"name\":\"fieldId\",\"type\":\"string\"},{\"name\":\"action\",\"type\":[\"null\",\"string\"]}]}",
    "properties": {}
}
class Latitude(Record):
   latitude = Float(required=True)
   longitude = Float(required=True)
   fieldId = String(required=True)
   action = String(required=False)
{
    "type": "AVRO",
    "schema": "{\"name\":\"Location\",\"type\":\"record\",\"fields\":[{\"name\":\"fieldId\",\"type\":\"string\"}]}",
    "properties": {}
}
class Location(Record):
   fieldId = String(required=True)
{
    "type": "AVRO",
    "schema": "{\"name\":\"Location\",\"type\":\"record\",\"fields\":[{\"name\":\"fieldId\",\"type\":\"string\"},{\"name\":\"address\",\"type\":\"string\"}]}",
    "properties": {}
}
class Location(Record):
   fieldId = String(required=True)
   address = String(required=True)
   @classmethod
   def schema(cls):
       schema = {
           'name': str(cls.__name__),
           'type': 'record',
           'fields': []
       }

       # Do NOT sort the keys!!
       for name in cls._fields.keys():
           field = cls._fields[name]
           field_type = field.schema() if field._required else ['null', field.schema()]
           schema['fields'].append({
               'name': name,
               'type': field_type
           })
       return schema
{
    "type": "AVRO",
    "schema": "{\"name\":\"Location\",\"type\":\"record\",\"fields\":[{\"name\":\"fieldId\",\"type\":\"string\"},{\"name\":\"address\",\"type\":\"string\"},{\"name\":\"action\",\"type\":[\"string\",\"null\"],\"default\":\"planting\"}]}",
    "properties": {}
}
class Location(Record):
   fieldId = String(required=True)
   address = String(required=True)
   action = String(required=False, default='planting')
   @classmethod
   def schema(cls):
       schema = {
           'name': str(cls.__name__),
           'type': 'record',
           'fields': []
       }

       # Do NOT sort the keys!!
       for name in cls._fields.keys():
           field = cls._fields[name]
           field_type = field.schema() if field._required else (
               # HACK for default values
               [field.schema(), 'null'] if field._default != None else ['null', field.schema()]
           )
           schema['fields'].append({
               'name': name,
               'type': field_type
           } if field._default == None else {
               # HACK for default values
               'name': name,
               'type': field_type,
               'default': field._default
           })
       return schema

@ta1meng
Copy link

ta1meng commented Apr 28, 2021

@codelipenghui @congbobo184 while the original issue was fixed, there were three bugs that I encountered that also resulted in an IncompatibleSchema exception.

Consider the workarounds that I implemented locally:

@classmethod
   def schema(cls):
       schema = {
           'name': str(cls.__name__),
           'type': 'record',
           'fields': []
       }

       # Do NOT sort the keys!!
       for name in cls._fields.keys():
           field = cls._fields[name]
           field_type = field.schema() if field._required else (
               # HACK for default values
               [field.schema(), 'null'] if field._default != None else ['null', field.schema()]
           )
           schema['fields'].append({
               'name': name,
               'type': field_type
           } if field._default == None else {
               # HACK for default values
               'name': name,
               'type': field_type,
               'default': field._default
           })
       return schema

which overrides Record::schema() in definition.py.

First issue: Pulsar's schema comparison logic seems textual in nature, so if two fields are specified in reverse order, the schema comparison would return "incompatible", even though the Avro schemas are compatible. The workaround I put in removes the sorting of field names, so they appear in the same order in the schema as they are declared in code. This issue can either be fixed server-side or in the Python client library.

Second issue: there is no default value support in the schema generation code. I hacked it in. I'm new to Python and I'm sure this code can be written better.

Third issue:: "default": null is partially supported, and rarely logged. This one took me the longest to figure out because I saw identical schemas printed that resulted in an IncompatibleSchema exception. While this issue would be tricky to fix in the Pulsar client library, we can improve things greatly by logging "default": null when it is a part of the schema. This one is different from the first two issues, as it seems to require a sweep across multiple Pulsar projects, so I will file this issue separately.

So this ticket can be used to track First issue and Second issue, as they are both localized to a single method in the Pulsar Python client library.

@ta1meng
Copy link

ta1meng commented Apr 28, 2021

Filed Third issue as #10426

@ta1meng
Copy link

ta1meng commented Apr 29, 2021

The patch posted in #9571 (comment) is a superset of the patch posted in #10174.

@codelipenghui
Copy link
Contributor

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

3 participants