diff --git a/schemaregistry/rest-service.ts b/schemaregistry/rest-service.ts index 2a2ac5d8..d1ac8907 100644 --- a/schemaregistry/rest-service.ts +++ b/schemaregistry/rest-service.ts @@ -24,7 +24,7 @@ export interface BearerAuthCredentials { //TODO: Consider retry policy, may need additional libraries on top of Axios export interface ClientConfig { baseURLs: string[], - cacheCapacity: number, + cacheCapacity?: number, cacheLatestTtlSecs?: number, isForward?: boolean, createAxiosDefaults?: CreateAxiosDefaults, @@ -37,7 +37,7 @@ export class RestService { private OAuthClient?: OAuthClient; private bearerAuth: boolean = false; - constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults, + constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults, bearerAuthCredentials?: BearerAuthCredentials) { this.client = axios.create(axiosDefaults); this.baseURLs = baseURLs; @@ -53,7 +53,7 @@ export class RestService { 'Confluent-Identity-Pool-Id': bearerAuthCredentials.identityPool, 'target-sr-cluster': bearerAuthCredentials.schemaRegistryLogicalCluster }); - this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret, + this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret, bearerAuthCredentials.tokenHost, bearerAuthCredentials.tokenPath, bearerAuthCredentials.scope); } } diff --git a/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts b/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts index fed55558..b6800c86 100644 --- a/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts +++ b/schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts @@ -65,7 +65,7 @@ class DekRegistryClient implements Client { constructor(config: ClientConfig) { const cacheOptions = { - max: config.cacheCapacity, + max: config.cacheCapacity !== undefined ? config.cacheCapacity : 1000, ...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 }), }; diff --git a/schemaregistry/schemaregistry-client.ts b/schemaregistry/schemaregistry-client.ts index 393de240..d5b91044 100644 --- a/schemaregistry/schemaregistry-client.ts +++ b/schemaregistry/schemaregistry-client.ts @@ -161,11 +161,11 @@ export class SchemaRegistryClient implements Client { constructor(config: ClientConfig) { this.clientConfig = config const cacheOptions = { - max: config.cacheCapacity, + max: config.cacheCapacity !== undefined ? config.cacheCapacity : 1000, ...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 }) }; - this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults, + this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults, config.bearerAuthCredentials); this.schemaToIdCache = new LRUCache(cacheOptions); diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index ae5fe490..8f956e3a 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -245,9 +245,9 @@ async function transform(ctx: RuleContext, schema: Type, msg: any, fieldTransfor return await Promise.all(array.map(item => transform(ctx, arraySchema.itemsType, item, fieldTransform))) case 'map': const mapSchema = schema as MapType - const map = msg as Map + const map = msg as { [key: string]: any } for (const key of Object.keys(map)) { - map.set(key, await transform(ctx, mapSchema.valuesType, map.get(key), fieldTransform)) + map[key] = await transform(ctx, mapSchema.valuesType, map[key], fieldTransform) } return map case 'record': diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index 5189ccf7..7e177e89 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -262,7 +262,8 @@ async function toType( const json = JSON.parse(info.schema) const spec = json.$schema let schema - if (spec === 'http://json-schema.org/draft/2020-12/schema') { + if (spec === 'http://json-schema.org/draft/2020-12/schema' + || spec === 'https://json-schema.org/draft/2020-12/schema') { schema = await dereferenceJSONSchemaDraft2020_12(json, { retrieve }) } else { schema = await dereferenceJSONSchemaDraft07(json, { retrieve }) @@ -302,6 +303,7 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path: for (let i = 0; i < msg.length; i++) { msg[i] = await transform(ctx, schema.items, path, msg[i], fieldTransform) } + return msg } } if (schema.$ref != null) { @@ -355,7 +357,7 @@ async function transformField(ctx: RuleContext, path: string, propName: string, function validateSubschemas(subschemas: DereferencedJSONSchema[], msg: any): DereferencedJSONSchema | null { for (let subschema of subschemas) { try { - validateJSON(subschema, msg) + validateJSON(msg, subschema) return subschema } catch (error) { // ignore diff --git a/test/schemaregistry/serde/avro.spec.ts b/test/schemaregistry/serde/avro.spec.ts index a71f6693..755a8d8f 100644 --- a/test/schemaregistry/serde/avro.spec.ts +++ b/test/schemaregistry/serde/avro.spec.ts @@ -190,6 +190,60 @@ const demoSchemaWithUnion = ` ] } ` +const schemaEvolution1 = ` +{ + "name": "SchemaEvolution", + "type": "record", + "fields": [ + { + "name": "fieldToDelete", + "type": "string" + } + ] +} +` +const schemaEvolution2 = ` +{ + "name": "SchemaEvolution", + "type": "record", + "fields": [ + { + "name": "newOptionalField", + "type": ["string", "null"], + "default": "optional" + } + ] +} +` +const complexSchema = ` +{ + "name": "ComplexSchema", + "type": "record", + "fields": [ + { + "name": "arrayField", + "type": { + "type": "array", + "items": "string" + }, + "confluent:tags": [ "PII" ] + }, + { + "name": "mapField", + "type": { + "type": "map", + "values": "string" + }, + "confluent:tags": [ "PII" ] + }, + { + "name": "unionField", + "type": ["null", "string"], + "confluent:tags": [ "PII" ] + } + ] +} +` class FakeClock extends Clock { fixedNow: number = 0 @@ -320,6 +374,38 @@ describe('AvroSerializer', () => { expect(obj2.otherField.boolField).toEqual(nested.boolField); expect(obj2.otherField.bytesField).toEqual(nested.bytesField); }) + it('schema evolution', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true}) + + let obj = { + fieldToDelete: "bye", + } + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: schemaEvolution1, + } + + await client.register(subject, info, false) + + let bytes = await ser.serialize(topic, obj) + + info = { + schemaType: 'AVRO', + schema: schemaEvolution2, + } + + await client.register(subject, info, false) + + let deser = new AvroDeserializer(client, SerdeType.VALUE, {useLatestVersion: true}) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.fieldToDelete).toEqual(undefined); + expect(obj2.newOptionalField).toEqual("optional"); + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -876,6 +962,124 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('complex encryption', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: complexSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + arrayField: [ 'hello' ], + mapField: { 'key': 'world' }, + unionField: 'bye', + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.arrayField).toEqual([ 'hello' ]); + expect(obj2.mapField).toEqual({ 'key': 'world' }); + expect(obj2.unionField).toEqual('bye'); + }) + it('complex encryption with null', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: complexSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + arrayField: [ 'hello' ], + mapField: { 'key': 'world' }, + unionField: null + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.arrayField).toEqual([ 'hello' ]); + expect(obj2.mapField).toEqual({ 'key': 'world' }); + expect(obj2.unionField).toEqual(null); + }) it('jsonata fully compatible', async () => { let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])" let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])" diff --git a/test/schemaregistry/serde/json.spec.ts b/test/schemaregistry/serde/json.spec.ts index 69a4ae28..ce979fbc 100644 --- a/test/schemaregistry/serde/json.spec.ts +++ b/test/schemaregistry/serde/json.spec.ts @@ -105,6 +105,38 @@ const demoSchema2020_12 = ` } } ` +const complexSchema = ` +{ + "type": "object", + "properties": { + "arrayField": { + "type": "array", + "items": { + "type": "string" + }, + "confluent:tags": [ "PII" ] + }, + "objectField": { + "type": "object", + "properties": { + "stringField": { "type": "string" } + }, + "confluent:tags": [ "PII" ] + }, + "unionField": { + "oneOf": [ + { + "type": "null" + }, + { + "type": "string" + } + ], + "confluent:tags": [ "PII" ] + } + } +} +` describe('JsonSerializer', () => { afterEach(async () => { @@ -136,6 +168,37 @@ describe('JsonSerializer', () => { let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) + it('basic serialization 2020-12', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new JsonSerializer(client, SerdeType.VALUE, { + useLatestVersion: true, + validate: true + }) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchema2020_12 + } + + await client.register(subject, info, false) + + let bytes = await ser.serialize(topic, obj) + + let deser = new JsonDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + }) it('serialize nested', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -237,38 +300,77 @@ describe('JsonSerializer', () => { await expect(() => ser.serialize(topic, diffObj)).rejects.toThrow(SerializationError) }) - it('basic serialization 2020-12', async () => { + it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], cacheCapacity: 1000 } let client = SchemaRegistryClient.newClient(conf) - let ser = new JsonSerializer(client, SerdeType.VALUE, { + let serConfig: JsonSerializerConfig = { useLatestVersion: true, - validate: true - }) + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! - let obj = { - intField: 123, - doubleField: 45.67, - stringField: 'hi', - boolField: true, - bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + let info: SchemaInfo = { schemaType: 'JSON', - schema: demoSchema2020_12 + schema: demoSchema, + ruleSet } await client.register(subject, info, false) + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } let bytes = await ser.serialize(topic, obj) - let deser = new JsonDeserializer(client, SerdeType.VALUE, {}) + // reset encrypted field + obj.stringField = 'hi' + obj.bytesField = Buffer.from([0, 0, 0, 1]).toString('base64') + + let deserConfig: JsonDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) + + clearKmsClients() + let registry = new RuleRegistry() + registry.registerExecutor(new FieldEncryptionExecutor()) + deser = new JsonDeserializer(client, SerdeType.VALUE, {}, registry) + obj2 = await deser.deserialize(topic, bytes) + expect(obj2).not.toEqual(obj); }) - it('basic encryption', async () => { + it('basic encryption 2020-12', async () => { let conf: ClientConfig = { baseURLs: [baseURL], cacheCapacity: 1000 @@ -302,7 +404,7 @@ describe('JsonSerializer', () => { let info: SchemaInfo = { schemaType: 'JSON', - schema: demoSchema, + schema: demoSchema2020_12, ruleSet } @@ -477,6 +579,124 @@ describe('JsonSerializer', () => { let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) + it('complex encryption', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: complexSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + arrayField: [ 'hello' ], + objectField: { 'stringField': 'world' }, + unionField: 'bye', + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: JsonDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.arrayField).toEqual([ 'hello' ]); + expect(obj2.objectField.stringField).toEqual('world'); + expect(obj2.unionField).toEqual('bye'); + }) + it('complex encryption with null', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = fieldEncryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: complexSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + arrayField: [ 'hello' ], + objectField: { 'stringField': 'world' }, + unionField: null, + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: JsonDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.arrayField).toEqual([ 'hello' ]); + expect(obj2.objectField.stringField).toEqual('world'); + expect(obj2.unionField).toEqual(null); + }) it('jsonata fully compatible', async () => { let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])" let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])"