Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for JSON-schema and Protobuf (solves #79) #93

Merged
merged 47 commits into from
Feb 24, 2021

Conversation

Malkiz
Copy link
Contributor

@Malkiz Malkiz commented Feb 16, 2021

solves #79

Changes in this PR:

  • refactor the Avro specific logic into a dedicated class
  • add logic for supporting JSON-schema and Protobuf

}

public async decode(buffer: Buffer): Promise<any> {
public async decode(buffer: Buffer, serdesOpts?: {}): Promise<any> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAYBE: make the type of serdesOpts be an OR type of the different serdes types

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAYBE: pass in a getter function schema => options

Copy link
Member

@Nevon Nevon Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to have it be:

interface SerdesOptions {
  [SchemaType.AVRO]?: AvroSerdesOptions
  [SchemaType.JSON]?: JsonSerdesOptions
  [SchemaType.PROTO]?: ProtoSerdesOptions
}

That way the interface remains pretty simple while still allowing you to pass in options for both Protobuf and Avro, in case you are consuming both.

Having a union of all types is problematic because it means that you can only pass in options for a single schema type. Using an intersection type is problematic because the different schema types can have the same option names even though they mean different things or need different values.

return subject
}

public serialize(schema: ConfluentSchema, payload: any, opts: any): Buffer {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual AVRO type for opts

}

// @ts-ignore
public getSubject(schema: ConfluentSchema, separator: string): ConfluentSubject {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

export default class JsonSerdes implements Serdes {
private getJsonSchema(schema: ConfluentSchema, opts?: any) {
const ajv = new Ajv(opts)
const validate = ajv.compile(JSON.parse(schema.schemaString))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should cache the validate function somewhere, to save the compilation time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the "Code Generation Performance" header for relevant context: https://openjsf.org/blog/2021/02/11/project-news-ajv-version-7-big-changes-and-improvements/

private validatePayload(schema: ConfluentSchema, payload: any, opts: any) {
const validate = this.getJsonSchema(schema, opts)
if (!validate(payload)) {
throw Error(validate.errors as any)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw a specific error (validation error)

if (!schema.namespace) {
throw new ConfluentSchemaRegistryArgumentError(`Invalid namespace: ${schema.namespace}`)
}
throw new ConfluentSchemaRegistryArgumentError(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this catch block, and throw specific messages from inside the serdes implementations

this.cache.setSchema(registeredSchema.id, schema)

return registeredSchema
}

public async getSchema(registryId: number): Promise<Schema> {
public async getSchema(registryId: number): Promise<ConfluentSchema> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a private function to return ConfluentSchema.
revert the return type of getSchema back to Schema which will be a union type of the different implementations

src/@types.ts Outdated
name: string
namespace?: string
type: 'record'
fields: any[]
}
export interface Schema extends RawSchema {

export interface AvroSchema extends RawAvroSchema {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implement these 3 methods for the 2 other types

@@ -146,11 +163,14 @@ export default class SchemaRegistry {
return id
}

public async getRegistryIdBySchema(subject: string, schema: Schema): Promise<number> {
public async getRegistryIdBySchema(subject: string, schema: ConfluentSchema): Promise<number> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do the same thing with the union type to allow backwards compatibility

const DEFAULT_OFFSET = 0

// Based on https://github.com/mtth/avsc/issues/140
const collectInvalidPaths = (schema: Schema, jsonPayload: object) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrate this logic to AvroSerdes

src/@types.ts Outdated
}

export type AvroOptions = Partial<ForSchemaOptions>
export type JsonOptions = any // FIXME:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ConstructorParameters<typeof Ajv>[0] (which I think is InstanceOptions from Ajv)

src/@types.ts Outdated

export type AvroOptions = Partial<ForSchemaOptions>
export type JsonOptions = any // FIXME:
export type ProtoOptions = any // FIXME:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is a little trickier. Currently we are expecting an object like { messageName: string }, which doesn't map to any options in Protobufjs itself.

Given that Protobufjs doesn't have any constructor options or anything that we could use here, I suggest that we make this just { messageName: string }, and then in the future if people have a need to pass IParseOptions or something we can extend the type like { messageName: string, parse: IParseOptions } etc.

src/@types.ts Show resolved Hide resolved
src/AvroHelper.ts Show resolved Hide resolved
schema: Schema,
separator: string,
): ConfluentSubject {
throw Error('not implemented yet')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have a chance to look into what the Java client does for this and Protobuf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, and we leave them unimplemented for now, let's at least throw a ConfluentSchemaRegistryError instead of a generic error, unless there's something higher up that catches it and re-throws.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I didn't check how the Java implementation does it.
I'll change it to ConfluentSchemaRegistryError

@Nevon
Copy link
Member

Nevon commented Feb 24, 2021

Awesome job! There'll be a bit of follow-up work before we can release, such as updating the documentation, but I'll create a new issue for that.

@Nevon Nevon merged commit 40874b2 into kafkajs:master Feb 24, 2021
This was referenced Feb 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants