From 63e0909b7a7081346b669e5cf1f1455f8c671fcd Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 11 Sep 2024 17:28:32 -0700 Subject: [PATCH 1/2] Minor fixes for KMS clients --- schemaregistry/mock-schemaregistry-client.ts | 7 +- .../rules/encryption/azurekms/azure-client.ts | 2 +- .../rules/encryption/azurekms/azure-driver.ts | 2 +- .../rules/encryption/encrypt-executor.ts | 20 +- .../rules/encryption/gcpkms/gcp-client.ts | 8 +- .../encryption/hcvault/hcvault-client.ts | 10 +- .../encryption/hcvault/hcvault-driver.ts | 2 +- .../rules/encryption/kms-registry.ts | 5 + schemaregistry/serde/avro.ts | 54 +- schemaregistry/serde/json.ts | 12 +- schemaregistry/serde/serde.ts | 6 +- schemaregistry/tsconfig.json | 4 +- test/schemaregistry/serde/avro.spec.ts | 541 +++++++++++++++++- test/schemaregistry/serde/json.spec.ts | 392 ++++++++++++- test/schemaregistry/serde/protobuf.spec.ts | 33 +- 15 files changed, 1040 insertions(+), 58 deletions(-) diff --git a/schemaregistry/mock-schemaregistry-client.ts b/schemaregistry/mock-schemaregistry-client.ts index c2d754f5..57f7fa1e 100644 --- a/schemaregistry/mock-schemaregistry-client.ts +++ b/schemaregistry/mock-schemaregistry-client.ts @@ -201,7 +201,12 @@ class MockClient implements Client { const parsedKey = JSON.parse(key); if (parsedKey.subject === subject && (!value.softDeleted || deleted)) { if (parsedKey.schema.metadata && this.isSubset(metadata, parsedKey.schema.metadata.properties)) { - results.push({ id: parsedKey.schema.id, version: value.version, subject, schema: parsedKey.schema.schema }); + results.push({ + id: parsedKey.schema.id, + version: value.version, + subject, + ...parsedKey.schema + }); } } } diff --git a/schemaregistry/rules/encryption/azurekms/azure-client.ts b/schemaregistry/rules/encryption/azurekms/azure-client.ts index da4c3312..a0f33bb1 100644 --- a/schemaregistry/rules/encryption/azurekms/azure-client.ts +++ b/schemaregistry/rules/encryption/azurekms/azure-client.ts @@ -22,7 +22,7 @@ export class AzureKmsClient implements KmsClient { } async encrypt(plaintext: Buffer): Promise { - const result = await this.kmsClient.encrypt(AzureKmsClient.ALGORITHM, plaintext) + const result = await this.kmsClient.encrypt(AzureKmsClient.ALGORITHM, plaintext) return Buffer.from(result.result) } diff --git a/schemaregistry/rules/encryption/azurekms/azure-driver.ts b/schemaregistry/rules/encryption/azurekms/azure-driver.ts index 42d6cc84..221c38b8 100644 --- a/schemaregistry/rules/encryption/azurekms/azure-driver.ts +++ b/schemaregistry/rules/encryption/azurekms/azure-driver.ts @@ -5,7 +5,7 @@ import {AzureKmsClient} from "./azure-client"; export class AzureKmsDriver implements KmsDriver { static PREFIX = 'azure-kms://' - static TENANT_ID = 'tenant_id' + static TENANT_ID = 'tenant.id' static CLIENT_ID = 'client.id' static CLIENT_SECRET = 'client.secret' diff --git a/schemaregistry/rules/encryption/encrypt-executor.ts b/schemaregistry/rules/encryption/encrypt-executor.ts index cd5b7fb1..018db615 100644 --- a/schemaregistry/rules/encryption/encrypt-executor.ts +++ b/schemaregistry/rules/encryption/encrypt-executor.ts @@ -54,15 +54,31 @@ interface DekId { deleted: boolean } +export class Clock { + now(): number { + return Date.now() + } +} + export class FieldEncryptionExecutor extends FieldRuleExecutor { client: Client | null = null + clock: Clock static register(): FieldEncryptionExecutor { - const executor = new FieldEncryptionExecutor() + return this.registerWithClock(new Clock()) + } + + static registerWithClock(clock: Clock): FieldEncryptionExecutor { + const executor = new FieldEncryptionExecutor(clock) RuleRegistry.registerRuleExecutor(executor) return executor } + constructor(clock: Clock = new Clock()) { + super() + this.clock = clock + } + override configure(clientConfig: ClientConfig, config: Map) { this.client = DekRegistryClient.newClient(clientConfig) this.config = config @@ -416,7 +432,7 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { } isExpired(ctx: RuleContext, dek: Dek | null): boolean { - const now = Date.now() + const now = this.executor.clock.now() return ctx.ruleMode !== RuleMode.READ && this.dekExpiryDays > 0 && dek != null && diff --git a/schemaregistry/rules/encryption/gcpkms/gcp-client.ts b/schemaregistry/rules/encryption/gcpkms/gcp-client.ts index 4b51f4f4..1ef561e8 100644 --- a/schemaregistry/rules/encryption/gcpkms/gcp-client.ts +++ b/schemaregistry/rules/encryption/gcpkms/gcp-client.ts @@ -12,13 +12,9 @@ export class GcpKmsClient implements KmsClient { throw new Error(`key uri must start with ${GcpKmsDriver.PREFIX}`) } this.keyId = keyUri.substring(GcpKmsDriver.PREFIX.length) - const tokens = this.keyId.split(':') - if (tokens.length < 4) { - throw new Error(`invalid key uri ${this.keyId}`) - } this.kmsClient = creds != null - ? new KeyManagementServiceClient() - : new KeyManagementServiceClient({credentials: creds}) + ? new KeyManagementServiceClient({credentials: creds}) + : new KeyManagementServiceClient() } supported(keyUri: string): boolean { diff --git a/schemaregistry/rules/encryption/hcvault/hcvault-client.ts b/schemaregistry/rules/encryption/hcvault/hcvault-client.ts index 53434b14..ea13aa09 100644 --- a/schemaregistry/rules/encryption/hcvault/hcvault-client.ts +++ b/schemaregistry/rules/encryption/hcvault/hcvault-client.ts @@ -36,12 +36,14 @@ export class HcVaultClient implements KmsClient { } async encrypt(plaintext: Buffer): Promise { - const data = await this.kmsClient.encryptData({name: this.keyName, plainText: plaintext.toString('base64') }) - return Buffer.from(data.ciphertext, 'base64') + const response = await this.kmsClient.encryptData({name: this.keyName, plaintext: plaintext.toString('base64') }) + let data = response.data.ciphertext + return Buffer.from(data, 'utf8') } async decrypt(ciphertext: Buffer): Promise { - const data = await this.kmsClient.decryptData({name: this.keyName, cipherText: ciphertext.toString('base64') }) - return Buffer.from(data.plaintext, 'base64') + const response = await this.kmsClient.decryptData({name: this.keyName, ciphertext: ciphertext.toString('utf8') }) + let data = response.data.plaintext + return Buffer.from(data, 'base64'); } } diff --git a/schemaregistry/rules/encryption/hcvault/hcvault-driver.ts b/schemaregistry/rules/encryption/hcvault/hcvault-driver.ts index ccdcdc90..9b8638d6 100644 --- a/schemaregistry/rules/encryption/hcvault/hcvault-driver.ts +++ b/schemaregistry/rules/encryption/hcvault/hcvault-driver.ts @@ -3,7 +3,7 @@ import {HcVaultClient} from "./hcvault-client"; export class HcVaultDriver implements KmsDriver { - static PREFIX = 'hcvault-kms://' + static PREFIX = 'hcvault://' static TOKEN_ID = 'token.id' static NAMESPACE = 'namespace' diff --git a/schemaregistry/rules/encryption/kms-registry.ts b/schemaregistry/rules/encryption/kms-registry.ts index e37b6f69..cef8129c 100644 --- a/schemaregistry/rules/encryption/kms-registry.ts +++ b/schemaregistry/rules/encryption/kms-registry.ts @@ -42,3 +42,8 @@ export function getKmsClient(keyUrl: string): KmsClient | null { return null } +export function clearKmsClients(): void { + kmsClients.length = 0 +} + + diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index 5444e938..ae5fe490 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -53,31 +53,7 @@ export class AvroSerializer extends Serializer implements AvroSerde { throw new Error('message is empty') } - let enumIndex = 1 - let fixedIndex = 1 - let recordIndex = 1 - - const namingHook: TypeHook = ( - avroSchema: avro.Schema, - opts: ForSchemaOptions, - ) => { - let schema = avroSchema as any - switch (schema.type) { - case 'enum': - schema.name = `Enum${enumIndex++}`; - break; - case 'fixed': - schema.name = `Fixed${fixedIndex++}`; - break; - case 'record': - schema.name = `Record${recordIndex++}`; - break; - default: - } - return undefined - } - - let avroSchema = Type.forValue(msg, { typeHook: namingHook }) + let avroSchema = AvroSerializer.messageToSchema(msg) const schema: SchemaInfo = { schemaType: 'AVRO', schema: JSON.stringify(avroSchema), @@ -104,6 +80,34 @@ export class AvroSerializer extends Serializer implements AvroSerde { return deps }) } + + static messageToSchema(msg: any): avro.Type { + let enumIndex = 1 + let fixedIndex = 1 + let recordIndex = 1 + + const namingHook: TypeHook = ( + avroSchema: avro.Schema, + opts: ForSchemaOptions, + ) => { + let schema = avroSchema as any + switch (schema.type) { + case 'enum': + schema.name = `Enum${enumIndex++}`; + break; + case 'fixed': + schema.name = `Fixed${fixedIndex++}`; + break; + case 'record': + schema.name = `Record${recordIndex++}`; + break; + default: + } + return undefined + } + + return Type.forValue(msg, { typeHook: namingHook }) + } } export type AvroDeserializerConfig = DeserializerConfig & AvroSerdeConfig diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index a40632d9..c76fc819 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -73,7 +73,7 @@ export class JsonSerializer extends Serializer implements JsonSerde { throw new Error('message is empty') } - const jsonSchema = generateSchema(msg) + const jsonSchema = JsonSerializer.messageToSchema(msg) const schema: SchemaInfo = { schemaType: 'JSON', schema: JSON.stringify(jsonSchema), @@ -92,14 +92,14 @@ export class JsonSerializer extends Serializer implements JsonSerde { } async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise { - const schema = this.toType(ctx.target) + const schema = await this.toType(ctx.target) if (typeof schema === 'boolean') { return msg } return await transform(ctx, schema, '$', msg, fieldTransform) } - toType(info: SchemaInfo): DereferencedJSONSchema { + async toType(info: SchemaInfo): Promise { return toType(this.client, this.conf as JsonDeserializerConfig, this, info, async (client, info) => { const deps = new Map() await this.resolveReferences(client, info, deps) @@ -115,6 +115,10 @@ export class JsonSerializer extends Serializer implements JsonSerde { }, ) } + + static messageToSchema(msg: any): DereferencedJSONSchema { + return generateSchema(msg) + } } export type JsonDeserializerConfig = DeserializerConfig & JsonSerdeConfig @@ -173,7 +177,7 @@ export class JsonDeserializer extends Deserializer implements JsonSerde { } async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise { - const schema = this.toType(ctx.target) + const schema = await this.toType(ctx.target) return await transform(ctx, schema, '$', msg, fieldTransform) } diff --git a/schemaregistry/serde/serde.ts b/schemaregistry/serde/serde.ts index 18a5321a..8b2a2ca1 100644 --- a/schemaregistry/serde/serde.ts +++ b/schemaregistry/serde/serde.ts @@ -90,13 +90,13 @@ export abstract class Serde { rules = target.ruleSet?.migrationRules break case RuleMode.DOWNGRADE: - rules = source?.ruleSet?.migrationRules?.reverse() + rules = source?.ruleSet?.migrationRules?.map(x => x).reverse() break default: rules = target.ruleSet?.domainRules if (ruleMode === RuleMode.READ) { // Execute read rules in reverse order for symmetry - rules = rules?.reverse() + rules = rules?.map(x => x).reverse() } break } @@ -394,7 +394,7 @@ export abstract class Deserializer extends Serde { previous = version } if (migrationMode === RuleMode.DOWNGRADE) { - migrations = migrations.reverse() + migrations = migrations.map(x => x).reverse() } return migrations } diff --git a/schemaregistry/tsconfig.json b/schemaregistry/tsconfig.json index 1a256bdd..5379dc3f 100644 --- a/schemaregistry/tsconfig.json +++ b/schemaregistry/tsconfig.json @@ -4,12 +4,12 @@ "lib": [ "es2021", "dom" ], - "module": "preserve", + "module": "nodenext", "target": "es2021", "strict": true, "esModuleInterop": true, "forceConsistentCasingInFileNames": true, - "moduleResolution": "bundler", + "moduleResolution": "nodenext", "allowUnusedLabels": false, "allowUnreachableCode": false, "noFallthroughCasesInSwitch": true, diff --git a/test/schemaregistry/serde/avro.spec.ts b/test/schemaregistry/serde/avro.spec.ts index ec7abec9..a71f6693 100644 --- a/test/schemaregistry/serde/avro.spec.ts +++ b/test/schemaregistry/serde/avro.spec.ts @@ -1,12 +1,14 @@ import {afterEach, describe, expect, it} from '@jest/globals'; import {ClientConfig} from "../../../schemaregistry/rest-service"; import { - AvroDeserializer, AvroDeserializerConfig, + AvroDeserializer, + AvroDeserializerConfig, AvroSerializer, AvroSerializerConfig } from "../../../schemaregistry/serde/avro"; -import {SerdeType} from "../../../schemaregistry/serde/serde"; +import {SerdeType, Serializer} from "../../../schemaregistry/serde/serde"; import { + Client, Rule, RuleMode, RuleSet, @@ -14,8 +16,33 @@ import { SchemaRegistryClient } from "../../../schemaregistry/schemaregistry-client"; import {LocalKmsDriver} from "../../../schemaregistry/rules/encryption/localkms/local-driver"; -import {FieldEncryptionExecutor} from "../../../schemaregistry/rules/encryption/encrypt-executor"; +import { + Clock, + FieldEncryptionExecutor +} from "../../../schemaregistry/rules/encryption/encrypt-executor"; +import {GcpKmsDriver} from "../../../schemaregistry/rules/encryption/gcpkms/gcp-driver"; +import {AwsKmsDriver} from "../../../schemaregistry/rules/encryption/awskms/aws-driver"; +import {AzureKmsDriver} from "../../../schemaregistry/rules/encryption/azurekms/azure-driver"; +import {HcVaultDriver} from "../../../schemaregistry/rules/encryption/hcvault/hcvault-driver"; +import {JsonataExecutor} from "@confluentinc/schemaregistry/rules/jsonata/jsonata-executor"; +import stringify from "json-stringify-deterministic"; +import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry"; +import { + clearKmsClients +} from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; +const rootSchema = ` +{ + "name": "NestedTestRecord", + "type": "record", + "fields": [ + { + "name": "otherField", + "type": "DemoSchema" + } + ] +} +` const demoSchema = ` { "name": "DemoSchema", @@ -46,6 +73,35 @@ const demoSchema = ` ] } ` +const demoSchemaSingleTag = ` +{ + "name": "DemoSchema", + "type": "record", + "fields": [ + { + "name": "intField", + "type": "int" + }, + { + "name": "doubleField", + "type": "double" + }, + { + "name": "stringField", + "type": "string", + "confluent:tags": [ "PII" ] + }, + { + "name": "boolField", + "type": "boolean" + }, + { + "name": "bytesField", + "type": "bytes" + } + ] +} +` const demoSchemaWithLogicalType = ` { "name": "DemoSchema", @@ -104,8 +160,51 @@ const f1Schema = ` ] } ` +const demoSchemaWithUnion = ` +{ + "name": "DemoSchemaWithUnion", + "type": "record", + "fields": [ + { + "name": "intField", + "type": "int" + }, + { + "name": "doubleField", + "type": "double" + }, + { + "name": "stringField", + "type": ["null", "string"], + "confluent:tags": [ "PII" ] + }, + { + "name": "boolField", + "type": "boolean" + }, + { + "name": "bytesField", + "type": ["null", "bytes"], + "confluent:tags": [ "PII" ] + } + ] +} +` + +class FakeClock extends Clock { + fixedNow: number = 0 + + override now() { + return this.fixedNow + } +} -const fieldEncryptionExecutor = FieldEncryptionExecutor.register() +const fieldEncryptionExecutor = FieldEncryptionExecutor.registerWithClock(new FakeClock()) +JsonataExecutor.register() +AwsKmsDriver.register() +AzureKmsDriver.register() +GcpKmsDriver.register() +HcVaultDriver.register() LocalKmsDriver.register() //const baseURL = 'http://localhost:8081' @@ -247,7 +346,7 @@ describe('AvroSerializer', () => { 'encrypt.kms.type': 'local-kms', 'encrypt.kms.key.id': 'mykey', }, - onFailure: 'ERROR,ERROR' + onFailure: 'ERROR,NONE' } let ruleSet: RuleSet = { domainRules: [encRule] @@ -287,6 +386,14 @@ describe('AvroSerializer', () => { expect(obj2.stringField).toEqual(obj.stringField); expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); + + clearKmsClients() + let registry = new RuleRegistry() + registry.registerExecutor(new FieldEncryptionExecutor()) + deser = new AvroDeserializer(client, SerdeType.VALUE, {}, registry) + obj2 = await deser.deserialize(topic, bytes) + expect(obj2.stringField).not.toEqual(obj.stringField); + expect(obj2.bytesField).not.toEqual(obj.bytesField); }) it('basic encryption with logical type', async () => { let conf: ClientConfig = { @@ -355,6 +462,113 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('basic encryption with dek rotation', async () => { + (fieldEncryptionExecutor.clock as FakeClock).fixedNow = Date.now() + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + 'encrypt.dek.expiry.days': '1', + }, + onFailure: 'ERROR,ERROR' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchemaSingleTag, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + // reset encrypted field + obj.stringField = 'hi' + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + + let dek = await dekClient.getDek("kek1", subject, 'AES256_GCM', -1, false) + expect(1).toEqual(dek.version); + + // advance time by 2 days + (fieldEncryptionExecutor.clock as FakeClock).fixedNow += 2 * 24 * 60 * 60 * 1000 + + bytes = await ser.serialize(topic, obj) + + // reset encrypted field + obj.stringField = 'hi' + + obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + + dek = await dekClient.getDek("kek1", subject, 'AES256_GCM', -1, false) + expect(2).toEqual(dek.version); + + // advance time by 2 days + (fieldEncryptionExecutor.clock as FakeClock).fixedNow += 2 * 24 * 60 * 60 * 1000 + + bytes = await ser.serialize(topic, obj) + + // reset encrypted field + obj.stringField = 'hi' + + obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + + dek = await dekClient.getDek("kek1", subject, 'AES256_GCM', -1, false) + expect(3).toEqual(dek.version); + }) it('basic encryption with preserialized data', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -513,4 +727,321 @@ describe('AvroSerializer', () => { let obj2 = await deser.deserialize(topic, bytes) expect(obj2.f1).toEqual(obj.f1); }) + it('encryption with references', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + } + + await client.register('demo-value', info, false) + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,ERROR' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + info = { + schemaType: 'AVRO', + schema: rootSchema, + references: [{ + name: 'DemoSchema', + subject: 'demo-value', + version: 1 + }], + ruleSet + } + + await client.register(subject, info, false) + + let nested = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let obj = { + otherField: nested + } + let bytes = await ser.serialize(topic, obj) + + // reset encrypted field + nested.stringField = 'hi' + nested.bytesField = Buffer.from([1, 2]) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.otherField.intField).toEqual(nested.intField); + expect(obj2.otherField.doubleField).toBeCloseTo(nested.doubleField, 0.001); + expect(obj2.otherField.stringField).toEqual(nested.stringField); + expect(obj2.otherField.boolField).toEqual(nested.boolField); + expect(obj2.otherField.bytesField).toEqual(nested.bytesField); + }) + it('encryption with union', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,ERROR' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info = { + schemaType: 'AVRO', + schema: demoSchemaWithUnion, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + // reset encrypted field + obj.stringField = 'hi' + obj.bytesField = Buffer.from([1, 2]) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) + it('jsonata fully compatible', async () => { + let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])" + let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])" + let rule2To3 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'length': $.'height'}])" + let rule3To2 = "$merge([$sift($, function($v, $k) {$k != 'length'}), {'height': $.'length'}])" + + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + + client.updateConfig(subject, { + compatibilityGroup: 'application.version' + }) + + let widget = { + name: 'alice', + size: 123, + version: 1, + } + let avroSchema = AvroSerializer.messageToSchema(widget) + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: JSON.stringify(avroSchema), + metadata: { + properties: { + "application.version": "v1" + } + } + } + + await client.register(subject, info, false) + + let newWidget = { + name: 'alice', + height: 123, + version: 1, + } + avroSchema = AvroSerializer.messageToSchema(newWidget) + info = { + schemaType: 'AVRO', + schema: JSON.stringify(avroSchema), + metadata: { + properties: { + "application.version": "v2" + } + }, + ruleSet: { + migrationRules: [ + { + name: 'myRule1', + kind: 'TRANSFORM', + mode: RuleMode.UPGRADE, + type: 'JSONATA', + expr: rule1To2, + }, + { + name: 'myRule2', + kind: 'TRANSFORM', + mode: RuleMode.DOWNGRADE, + type: 'JSONATA', + expr: rule2To1, + }, + ] + } + } + + await client.register(subject, info, false) + + let newerWidget = { + name: 'alice', + length: 123, + version: 1, + } + avroSchema = AvroSerializer.messageToSchema(newerWidget) + info = { + schemaType: 'AVRO', + schema: JSON.stringify(avroSchema), + metadata: { + properties: { + "application.version": "v3" + } + }, + ruleSet: { + migrationRules: [ + { + name: 'myRule1', + kind: 'TRANSFORM', + mode: RuleMode.UPGRADE, + type: 'JSONATA', + expr: rule2To3, + }, + { + name: 'myRule2', + kind: 'TRANSFORM', + mode: RuleMode.DOWNGRADE, + type: 'JSONATA', + expr: rule3To2, + }, + ] + } + } + + await client.register(subject, info, false) + + let serConfig1 = { + useLatestWithMetadata: { + "application.version": "v1" + } + } + let ser1 = new AvroSerializer(client, SerdeType.VALUE, serConfig1) + let bytes = await ser1.serialize(topic, widget) + + await deserializeWithAllVersions(client, ser1, bytes, widget, newWidget, newerWidget) + + let serConfig2 = { + useLatestWithMetadata: { + "application.version": "v2" + } + } + let ser2 = new AvroSerializer(client, SerdeType.VALUE, serConfig2) + bytes = await ser2.serialize(topic, newWidget) + + await deserializeWithAllVersions(client, ser2, bytes, widget, newWidget, newerWidget) + + let serConfig3 = { + useLatestWithMetadata: { + "application.version": "v3" + } + } + let ser3 = new AvroSerializer(client, SerdeType.VALUE, serConfig3) + bytes = await ser3.serialize(topic, newerWidget) + + await deserializeWithAllVersions(client, ser3, bytes, widget, newWidget, newerWidget) + }) + + async function deserializeWithAllVersions(client: Client, ser: Serializer, bytes: Buffer, + widget: any, newWidget: any, newerWidget: any) { + let deserConfig1: AvroDeserializerConfig = { + useLatestWithMetadata: { + "application.version": "v1" + } + } + let deser1 = new AvroDeserializer(client, SerdeType.VALUE, deserConfig1) + deser1.client = ser.client + + let newobj = await deser1.deserialize(topic, bytes) + expect(stringify(newobj)).toEqual(stringify(widget)); + + let deserConfig2 = { + useLatestWithMetadata: { + "application.version": "v2" + } + } + let deser2 = new AvroDeserializer(client, SerdeType.VALUE, deserConfig2) + newobj = await deser2.deserialize(topic, bytes) + expect(stringify(newobj)).toEqual(stringify(newWidget)); + + let deserConfig3 = { + useLatestWithMetadata: { + "application.version": "v3" + } + } + let deser3 = new AvroDeserializer(client, SerdeType.VALUE, deserConfig3) + newobj = await deser3.deserialize(topic, bytes) + expect(stringify(newobj)).toEqual(stringify(newerWidget)); + } }) diff --git a/test/schemaregistry/serde/json.spec.ts b/test/schemaregistry/serde/json.spec.ts index c28f6f9d..288f7e23 100644 --- a/test/schemaregistry/serde/json.spec.ts +++ b/test/schemaregistry/serde/json.spec.ts @@ -1,7 +1,8 @@ import {afterEach, describe, expect, it} from '@jest/globals'; import {ClientConfig} from "../../../schemaregistry/rest-service"; -import {SerdeType} from "../../../schemaregistry/serde/serde"; +import {SerdeType, SerializationError, Serializer} from "../../../schemaregistry/serde/serde"; import { + Client, Rule, RuleMode, RuleSet, @@ -15,8 +16,13 @@ import { JsonSerializer, JsonSerializerConfig } from "../../../schemaregistry/serde/json"; +import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry"; +import stringify from "json-stringify-deterministic"; +import {JsonataExecutor} from "@confluentinc/schemaregistry/rules/jsonata/jsonata-executor"; +import {clearKmsClients} from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; const fieldEncryptionExecutor = FieldEncryptionExecutor.register() +JsonataExecutor.register() LocalKmsDriver.register() //const baseURL = 'http://localhost:8081' @@ -53,6 +59,32 @@ const demoSchema = ` } } ` +const demoSchemaWithUnion = ` +{ + "type": "object", + "properties": { + "intField": { "type": "integer" }, + "doubleField": { "type": "number" }, + "stringField": { + "oneOf": [ + { + "type": "null" + }, + { + "type": "string" + } + ], + "confluent:tags": [ "PII" ] + }, + "boolField": { "type": "boolean" }, + "bytesField": { + "type": "string", + "contentEncoding": "base64", + "confluent:tags": [ "PII" ] + } + } +} +` describe('JsonSerializer', () => { afterEach(async () => { @@ -143,6 +175,48 @@ describe('JsonSerializer', () => { let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) + it('basic failing validation', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new JsonSerializer(client, SerdeType.VALUE, { + useLatestVersion: true, + validate: true + }) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let jsonSchema = JsonSerializer.messageToSchema(obj) + let info: SchemaInfo = { + schemaType: 'JSON', + schema: JSON.stringify(jsonSchema) + } + + await client.register(subject, info, false) + + let bytes = await ser.serialize(topic, obj) + + let deser = new JsonDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + + let diffObj = { + intField: '123', + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + + await expect(() => ser.serialize(topic, diffObj)).rejects.toThrow(SerializationError) + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -169,7 +243,7 @@ describe('JsonSerializer', () => { 'encrypt.kms.type': 'local-kms', 'encrypt.kms.key.id': 'mykey', }, - onFailure: 'ERROR,ERROR' + onFailure: 'ERROR,NONE' } let ruleSet: RuleSet = { domainRules: [encRule] @@ -205,5 +279,319 @@ describe('JsonSerializer', () => { fieldEncryptionExecutor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) + + clearKmsClients() + let registry = new RuleRegistry() + registry.registerExecutor(new FieldEncryptionExecutor()) + deser = new JsonDeserializer(client, SerdeType.VALUE, {}, registry) + obj2 = await deser.deserialize(topic, bytes) + expect(obj2).not.toEqual(obj); + }) + it('encryption with union', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,ERROR' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchemaWithUnion, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let bytes = await ser.serialize(topic, obj) + + // reset encrypted field + obj.stringField = 'hi' + obj.bytesField = Buffer.from([0, 0, 0, 1]).toString('base64') + + let deserConfig: JsonDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) }) + it('encryption with reference', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchema, + } + await client.register('demo-value', info, false) + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,ERROR' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + info = { + schemaType: 'JSON', + schema: rootSchema, + references: [{ + name: 'DemoSchema', + subject: 'demo-value', + version: 1 + }], + ruleSet + } + await client.register(subject, info, false) + + let nested = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let obj = { + otherField: nested + } + let bytes = await ser.serialize(topic, obj) + + // reset encrypted field + nested.stringField = 'hi' + nested.bytesField = Buffer.from([0, 0, 0, 1]).toString('base64') + + let deserConfig: JsonDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + }) + it('jsonata fully compatible', async () => { + let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])" + let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])" + let rule2To3 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'length': $.'height'}])" + let rule3To2 = "$merge([$sift($, function($v, $k) {$k != 'length'}), {'height': $.'length'}])" + + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + + client.updateConfig(subject, { + compatibilityGroup: 'application.version' + }) + + let widget = { + name: 'alice', + size: 123, + version: 1, + } + let jsonSchema = JsonSerializer.messageToSchema(widget) + let info: SchemaInfo = { + schemaType: 'JSON', + schema: JSON.stringify(jsonSchema), + metadata: { + properties: { + "application.version": "v1" + } + } + } + + await client.register(subject, info, false) + + let newWidget = { + name: 'alice', + height: 123, + version: 1, + } + jsonSchema = JsonSerializer.messageToSchema(newWidget) + info = { + schemaType: 'JSON', + schema: JSON.stringify(jsonSchema), + metadata: { + properties: { + "application.version": "v2" + } + }, + ruleSet: { + migrationRules: [ + { + name: 'myRule1', + kind: 'TRANSFORM', + mode: RuleMode.UPGRADE, + type: 'JSONATA', + expr: rule1To2, + }, + { + name: 'myRule2', + kind: 'TRANSFORM', + mode: RuleMode.DOWNGRADE, + type: 'JSONATA', + expr: rule2To1, + }, + ] + } + } + + await client.register(subject, info, false) + + let newerWidget = { + name: 'alice', + length: 123, + version: 1, + } + jsonSchema = JsonSerializer.messageToSchema(newerWidget) + info = { + schemaType: 'JSON', + schema: JSON.stringify(jsonSchema), + metadata: { + properties: { + "application.version": "v3" + } + }, + ruleSet: { + migrationRules: [ + { + name: 'myRule1', + kind: 'TRANSFORM', + mode: RuleMode.UPGRADE, + type: 'JSONATA', + expr: rule2To3, + }, + { + name: 'myRule2', + kind: 'TRANSFORM', + mode: RuleMode.DOWNGRADE, + type: 'JSONATA', + expr: rule3To2, + }, + ] + } + } + + await client.register(subject, info, false) + + let serConfig1 = { + useLatestWithMetadata: { + "application.version": "v1" + } + } + let ser1 = new JsonSerializer(client, SerdeType.VALUE, serConfig1) + let bytes = await ser1.serialize(topic, widget) + + await deserializeWithAllVersions(client, ser1, bytes, widget, newWidget, newerWidget) + + let serConfig2 = { + useLatestWithMetadata: { + "application.version": "v2" + } + } + let ser2 = new JsonSerializer(client, SerdeType.VALUE, serConfig2) + bytes = await ser2.serialize(topic, newWidget) + + await deserializeWithAllVersions(client, ser2, bytes, widget, newWidget, newerWidget) + + let serConfig3 = { + useLatestWithMetadata: { + "application.version": "v3" + } + } + let ser3 = new JsonSerializer(client, SerdeType.VALUE, serConfig3) + bytes = await ser3.serialize(topic, newerWidget) + + await deserializeWithAllVersions(client, ser3, bytes, widget, newWidget, newerWidget) + }) + + async function deserializeWithAllVersions(client: Client, ser: Serializer, bytes: Buffer, + widget: any, newWidget: any, newerWidget: any) { + let deserConfig1: JsonDeserializerConfig = { + useLatestWithMetadata: { + "application.version": "v1" + } + } + let deser1 = new JsonDeserializer(client, SerdeType.VALUE, deserConfig1) + deser1.client = ser.client + + let newobj = await deser1.deserialize(topic, bytes) + expect(stringify(newobj)).toEqual(stringify(widget)); + + let deserConfig2 = { + useLatestWithMetadata: { + "application.version": "v2" + } + } + let deser2 = new JsonDeserializer(client, SerdeType.VALUE, deserConfig2) + newobj = await deser2.deserialize(topic, bytes) + expect(stringify(newobj)).toEqual(stringify(newWidget)); + + let deserConfig3 = { + useLatestWithMetadata: { + "application.version": "v3" + } + } + let deser3 = new JsonDeserializer(client, SerdeType.VALUE, deserConfig3) + newobj = await deser3.deserialize(topic, bytes) + expect(stringify(newobj)).toEqual(stringify(newerWidget)); + } }) diff --git a/test/schemaregistry/serde/protobuf.spec.ts b/test/schemaregistry/serde/protobuf.spec.ts index de238e35..4d7e1759 100644 --- a/test/schemaregistry/serde/protobuf.spec.ts +++ b/test/schemaregistry/serde/protobuf.spec.ts @@ -22,6 +22,9 @@ import { } from "./test/nested_pb"; import {TestMessageSchema} from "./test/test_pb"; import {DependencyMessageSchema} from "./test/dep_pb"; +import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry"; +import {LinkedListSchema} from "./test/cycle_pb"; +import {clearKmsClients} from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; const fieldEncryptionExecutor = FieldEncryptionExecutor.register() LocalKmsDriver.register() @@ -139,6 +142,27 @@ describe('ProtobufSerializer', () => { expect(obj2.testMesssage.testFixed32).toEqual(msg.testFixed32); expect(obj2.testMesssage.testFixed64).toEqual(msg.testFixed64); }) + it('serialize cycle', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new ProtobufSerializer(client, SerdeType.VALUE, {autoRegisterSchemas: true}) + ser.registry.add(LinkedListSchema) + let inner = create(LinkedListSchema, { + value: 100, + }) + let obj = create(LinkedListSchema, { + value: 1, + next: inner + }) + let bytes = await ser.serialize(topic, obj) + + let deser = new ProtobufDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -166,7 +190,7 @@ describe('ProtobufSerializer', () => { 'encrypt.kms.type': 'local-kms', 'encrypt.kms.key.id': 'mykey', }, - onFailure: 'ERROR,ERROR' + onFailure: 'ERROR,NONE' } let ruleSet: RuleSet = { domainRules: [encRule] @@ -201,5 +225,12 @@ describe('ProtobufSerializer', () => { fieldEncryptionExecutor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) + + clearKmsClients() + let registry = new RuleRegistry() + registry.registerExecutor(new FieldEncryptionExecutor()) + deser = new ProtobufDeserializer(client, SerdeType.VALUE, {}, registry) + obj2 = await deser.deserialize(topic, bytes) + expect(obj2).not.toEqual(obj); }) }) From 983f43e4c9f18fd137c51f89032b1c8b05ce2e03 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 16 Sep 2024 10:50:14 -0700 Subject: [PATCH 2/2] Add JSON 2020-12 test --- schemaregistry/serde/json.ts | 5 ++- test/schemaregistry/serde/json.spec.ts | 51 ++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index c76fc819..5189ccf7 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -215,14 +215,17 @@ async function toValidateFunction( const json = JSON.parse(info.schema) const spec = json.$schema - if (spec === 'http://json-schema.org/draft/2020-12/schema') { + if (spec === 'http://json-schema.org/draft/2020-12/schema' + || spec === 'https://json-schema.org/draft/2020-12/schema') { const ajv2020 = new Ajv2020(conf as JsonSerdeConfig) + ajv2020.addKeyword("confluent:tags") deps.forEach((schema, name) => { ajv2020.addSchema(JSON.parse(schema), name) }) fn = ajv2020.compile(json) } else { const ajv = new Ajv2019(conf as JsonSerdeConfig) + ajv.addKeyword("confluent:tags") ajv.addMetaSchema(draft6MetaSchema) ajv.addMetaSchema(draft7MetaSchema) deps.forEach((schema, name) => { diff --git a/test/schemaregistry/serde/json.spec.ts b/test/schemaregistry/serde/json.spec.ts index 288f7e23..69a4ae28 100644 --- a/test/schemaregistry/serde/json.spec.ts +++ b/test/schemaregistry/serde/json.spec.ts @@ -85,6 +85,26 @@ const demoSchemaWithUnion = ` } } ` +const demoSchema2020_12 = ` +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "properties": { + "intField": { "type": "integer" }, + "doubleField": { "type": "number" }, + "stringField": { + "type": "string", + "confluent:tags": [ "PII" ] + }, + "boolField": { "type": "boolean" }, + "bytesField": { + "type": "string", + "contentEncoding": "base64", + "confluent:tags": [ "PII" ] + } + } +} +` describe('JsonSerializer', () => { afterEach(async () => { @@ -217,6 +237,37 @@ describe('JsonSerializer', () => { await expect(() => ser.serialize(topic, diffObj)).rejects.toThrow(SerializationError) }) + it('basic serialization 2020-12', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new JsonSerializer(client, SerdeType.VALUE, { + useLatestVersion: true, + validate: true + }) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchema2020_12 + } + + await client.register(subject, info, false) + + let bytes = await ser.serialize(topic, obj) + + let deser = new JsonDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL],