Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] Implement SchemaRegistry #771

Open
eolivelli opened this issue Sep 30, 2021 · 6 comments
Open

[FEATURE] Implement SchemaRegistry #771

eolivelli opened this issue Sep 30, 2021 · 6 comments
Labels
type/feature Indicates new functionality

Comments

@eolivelli
Copy link
Contributor

Is your feature request related to a problem? Please describe.
When running KOP you miss the Schema Registry.

Using a third party schema registry is possible but it won't support two critical features:

  • security: authentication/authorization
  • multi-tenant: you cannot isolate data between your tenants

Describe the solution you'd like
I would like to see in KOP support for a SchemaRegistry compatible with most common Kafka Schema Registry, especially for supporting AVRO users.

Integrating this with Pulsar Schema registry will be super helpful, but I am not sure it is possible and I am not sure if it is worth

@eolivelli eolivelli added the type/feature Indicates new functionality label Sep 30, 2021
@BewareMyPower
Copy link
Collaborator

I've tried it a long time ago. It's nearly impossible because Kafka uses a global unique schema id (integer) while Pulsar doesn't support it.

@eolivelli
Copy link
Contributor Author

I've tried it a long time ago.
Do you mean to use the Pulsar Schema registry ?
I guess so.

But we can implement a REST endpoint that mimics the API and stores data in some Pulsar topic under __kafka namespace of the tenant.
I haven't looked at the API for the Registry.

I will be happy to work on this topic in the short term

@BewareMyPower
Copy link
Collaborator

You can take a look at KafkaAvroDeserializer#deserialize, there's an important step that a REST request is sent for the schema string. See RestService#getId in Confluent schema registry project:

  public SchemaString getId(Map<String, String> requestProperties,
                            int id) throws IOException, RestClientException {
    String path = String.format("/schemas/ids/%d", id);

    SchemaString response = httpRequest(path, "GET", null, requestProperties,
                                        GET_SCHEMA_BY_ID_RESPONSE_TYPE);
    return response;
  }

However, it provides an integer schema id while there's no way to find a schema string by an integer id in Pulsar.

@eolivelli
Copy link
Contributor Author

We can implement such API in KOP as a separate endpoint (new Channel lnitializer).

For multitenancy and security we can leverage the username as we are doing now. The username is the name of the tenant

@BewareMyPower
Copy link
Collaborator

Oh, you're right. In this case, Kafka client should configure our provided serializer/deserializer.

The background of my previous task for schema support is to reuse Confluent's provided serializer/deserializer, so it's a little different.

@eolivelli
Copy link
Contributor Author

Kafka client should configure our provided serializer/deserializer

Why ?
isn't the Confluent SerDe supporting Authentication ?
Then we can say to the users that the must pass the tenant name as username, as they do with the username/password auth.

then we can map the requests correctly to a topic inside the __kafka namespace for the tenant

if there is no auth then we go to the default tenant

@Demogorgon314 Demogorgon314 linked a pull request Oct 13, 2021 that will close this issue
@eolivelli eolivelli mentioned this issue Oct 27, 2021
9 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type/feature Indicates new functionality
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants