From 5d0a5804b8bc5b6c92d523c78f880eb09c36cf8b Mon Sep 17 00:00:00 2001 From: Justin Wang <35274216+Claimundefine@users.noreply.github.com> Date: Sun, 22 Sep 2024 19:10:54 -0400 Subject: [PATCH] Add JSON integration tests (#46) (#97) * Add JSON integration tests * remove random --- .../schemaregistry-json.spec.ts | 463 ++++++++++++++++++ examples/kafkajs/sr.js | 2 +- package-lock.json | 40 ++ schemaregistry/package.json | 1 + schemaregistry/run_docker_schemaregistry.sh | 3 + 5 files changed, 508 insertions(+), 1 deletion(-) create mode 100644 e2e/schemaregistry/schemaregistry-json.spec.ts diff --git a/e2e/schemaregistry/schemaregistry-json.spec.ts b/e2e/schemaregistry/schemaregistry-json.spec.ts new file mode 100644 index 00000000..b4751d64 --- /dev/null +++ b/e2e/schemaregistry/schemaregistry-json.spec.ts @@ -0,0 +1,463 @@ +import { KafkaJS } from '@confluentinc/kafka-javascript'; +import { + Metadata, + SchemaRegistryClient, + SchemaInfo, + Reference +} from '../../schemaregistry/schemaregistry-client'; +import { beforeEach, afterEach, describe, expect, it } from '@jest/globals'; +import { clientConfig } from '../../test/schemaregistry/test-constants'; +import { JsonSerializer, JsonSerializerConfig, JsonDeserializer } from '../../schemaregistry/serde/json'; +import { SerdeType } from "../../schemaregistry/serde/serde"; +import stringify from 'json-stringify-deterministic'; + +let schemaRegistryClient: SchemaRegistryClient; +let producer: any; + +const testServerConfigSubject = 'integ-test-server-config-subject'; + +const kafkaBrokerList = 'localhost:9092'; +const kafka = new KafkaJS.Kafka({ + kafkaJS: { + brokers: [kafkaBrokerList], + }, +}); +const testTopic = `test-topic`; +const testTopicValue = testTopic + '-value'; + +//Inspired by dotnet client +const schemaString: string = stringify({ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "Person", + "type": "object", + "additionalProperties": false, + "required": [ + "FirstName", + "LastName" + ], + "properties": { + "FirstName": { + "type": "string" + }, + "MiddleName": { + "type": [ + "null", + "string" + ] + }, + "LastName": { + "type": "string" + }, + "Gender": { + "oneOf": [ + { + "$ref": "#/definitions/Gender" + } + ] + }, + "NumberWithRange": { + "type": "integer", + "format": "int32", + "maximum": 5.0, + "minimum": 2.0 + }, + "Birthday": { + "type": "string", + "format": "date-time" + }, + "Company": { + "oneOf": [ + { + "$ref": "#/definitions/Company" + }, + { + "type": "null" + } + ] + }, + "Cars": { + "type": [ + "array", + "null" + ], + "items": { + "$ref": "#/definitions/Car" + } + } + }, + "definitions": { + "Gender": { + "type": "integer", + "description": "", + "x-enumNames": [ + "Male", + "Female" + ], + "enum": [ + 0, + 1 + ] + }, + "Company": { + "type": "object", + "additionalProperties": false, + "properties": { + "Name": { + "type": [ + "null", + "string" + ] + } + } + }, + "Car": { + "type": "object", + "additionalProperties": false, + "properties": { + "Name": { + "type": [ + "null", + "string" + ] + }, + "Manufacturer": { + "oneOf": [ + { + "$ref": "#/definitions/Company" + }, + { + "type": "null" + } + ] + } + } + } + } +}); + +const orderDetailsSchema: SchemaInfo = { + + schema: stringify({ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://example.com/order_details.schema.json", + "title": "OrderDetails", + "description": "Order Details", + "type": "object", + "properties": { + "id": { + "description": "Order Id", + "type": "integer" + }, + "customer": { + "description": "Customer", + "$ref": "http://example.com/customer.schema.json" + }, + "payment_id": { + "description": "Payment Id", + "type": "string" + } + }, + "required": [ "id", "customer"] +}), + schemaType: 'JSON', +}; + +const orderSchema: SchemaInfo = { + schema: stringify({ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://example.com/referencedproduct.schema.json", + "title": "Order", + "description": "Order", + "type": "object", + "properties": { + "order_details": { + "description": "Order Details", + "$ref": "http://example.com/order_details.schema.json" + }, + "order_date": { + "description": "Order Date", + "type": "string", + "format": "date-time" + } + }, + "required": ["order_details"] + }), + schemaType: 'JSON', +}; + +const customerSchema: SchemaInfo = { + schema: stringify({ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://example.com/customer.schema.json", + "title": "Customer", + "description": "Customer Data", + "type": "object", + "properties": { + "name": { + "Description": "Customer name", + "type": "string" + }, + "id": { + "description": "Customer id", + "type": "integer" + }, + "email": { + "description": "Customer email", + "type": "string" + } + }, + "required": ["name", "id"] + }), + schemaType: 'JSON', +}; + +const messageValue = { + "firstName": "Real", + "middleName": "Name", + "lastName": "LastName D. Roger", + "gender": "Male", + "numberWithRange": 3, + "birthday": 7671, + "company": { + "name": "WarpStream" + }, + "cars": [ + { + "name": "Flink", + "manufacturer": { + "name": "Immerok" + } + }, + { + "name": "Car", + "manufacturer": { + "name": "Car Maker" + } + } + ] +}; + + +const metadata: Metadata = { + properties: { + owner: 'Bob Jones', + email: 'bob@acme.com', + }, +}; + +const schemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata, + schemaType: 'JSON' +}; + +const customerSubject = 'Customer'; +const orderSubject = 'Order'; +const orderDetailsSubject = 'OrderDetails'; + +const subjectList = [testTopic, testTopicValue, testServerConfigSubject, orderSubject, orderDetailsSubject, customerSubject]; + +describe('SchemaRegistryClient json Integration Test', () => { + + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + + const admin = kafka.admin(); + await admin.connect(); + try { + await admin.deleteTopics({ + topics: [testTopic], + timeout: 5000, + }); + } catch (error) { + // Topic may not exist; ignore error + } + await admin.disconnect(); + + producer = kafka.producer({ + kafkaJS: { + allowAutoTopicCreation: true, + acks: 1, + compression: KafkaJS.CompressionTypes.GZIP, + } + }); + await producer.connect(); + const subjects: string[] = await schemaRegistryClient.getAllSubjects(); + + for (const subject of subjectList) { + if (subjects && subjects.includes(subject)) { + await schemaRegistryClient.deleteSubject(subject); + await schemaRegistryClient.deleteSubject(subject, true); + } + } + }); + + afterEach(async () => { + await producer.disconnect(); + producer = null; + }); + + it("Should serialize and deserialize json", async () => { + + await schemaRegistryClient.register(testTopic, schemaInfo); + + const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true }; + const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + const deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(testTopic, messageValue) + }; + + await producer.send({ + topic: testTopic, + messages: [outgoingMessage] + }); + + let consumer = kafka.consumer({ + kafkaJS: { + groupId: 'test-group', + fromBeginning: true, + partitionAssigners: [KafkaJS.PartitionAssigners.roundRobin], + }, + }); + + await consumer.connect(); + await consumer.subscribe({ topic: testTopic }); + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(testTopic, message.value as Buffer) + }; + messageRcvd = true; + + expect(decodedMessage.value).toMatchObject(messageValue); + }, + }); + + // Wait around until we get a message, and then disconnect. + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + expect(1).toEqual(1); + }, 30000); + + it("Should serialize with UseLatestVersion enabled", async () => { + await schemaRegistryClient.register(testTopic, schemaInfo); + + const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true, useLatestVersion: true }; + const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(testTopic, messageValue) + }; + + await producer.send({ + topic: testTopic, + messages: [outgoingMessage] + }); + + }); + + it('Should fail to serialize with UseLatestVersion enabled and autoRegisterSchemas disabled', async () => { + await schemaRegistryClient.register(testTopic, schemaInfo); + + const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: false, useLatestVersion: true }; + const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + + const messageValue = { "name": "Bob Jones", "age": 25 }; + + await expect(serializer.serialize(testTopic, messageValue)).rejects.toThrowError(); + }); + + it("Should serialize referenced schemas", async () => { + const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true }; + const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + const deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + await schemaRegistryClient.register(customerSubject, customerSchema); + const customerIdVersion: number = (await schemaRegistryClient.getLatestSchemaMetadata(customerSubject)).version!; + + const customerReference: Reference = { + name: "http://example.com/customer.schema.json", + subject: customerSubject, + version: customerIdVersion, + }; + orderDetailsSchema.references = [customerReference]; + + await schemaRegistryClient.register(orderDetailsSubject, orderDetailsSchema); + const orderDetailsIdVersion: number = (await schemaRegistryClient.getLatestSchemaMetadata(orderDetailsSubject)).version!; + + const orderDetailsReference: Reference = { + name: "http://example.com/order_details.schema.json", + subject: orderDetailsSubject, + version: orderDetailsIdVersion, + }; + orderSchema.references = [orderDetailsReference]; + + const orderId = await schemaRegistryClient.register(orderSubject, orderSchema); + await schemaRegistryClient.register(orderSubject, orderSchema); + console.log(`Order schema id: ${orderId}`); + + const order = { + order_details: { + id: 1, + customer: { + name: "Bob Jones", + id: 1, + email: "bob@jones.com" + }, + payment_id: "1234" + }, + order_date: "2021-07-15T12:00:00Z" + }; + + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(orderSubject, order) + }; + + await producer.send({ + topic: testTopic, + messages: [outgoingMessage] + }); + + let consumer = kafka.consumer({ + kafkaJS: { + groupId: 'test-group', + fromBeginning: true, + partitionAssigners: [KafkaJS.PartitionAssigners.roundRobin], + }, + }); + + await consumer.connect(); + + await consumer.subscribe({ topic: testTopic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(orderSubject, message.value as Buffer) + }; + messageRcvd = true; + + expect(decodedMessage.value).toMatchObject(order); + }, + }); + + // Wait around until we get a message, and then disconnect. + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); +}); \ No newline at end of file diff --git a/examples/kafkajs/sr.js b/examples/kafkajs/sr.js index a640f651..1206c7c2 100644 --- a/examples/kafkajs/sr.js +++ b/examples/kafkajs/sr.js @@ -96,7 +96,7 @@ const run = async () => { ...message, value: await registry.decode(message.value) }; - console.log("Consumer recieved message.\nBefore decoding: " + JSON.stringify(message) + "\nAfter decoding: " + JSON.stringify(decodedMessage)); + console.log("Consumer received message.\nBefore decoding: " + JSON.stringify(message) + "\nAfter decoding: " + JSON.stringify(decodedMessage)); messageRcvd = true; }, }); diff --git a/package-lock.json b/package-lock.json index 2840e12d..5e9e1e41 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1538,6 +1538,45 @@ "node": ">=14.17" } }, + "node_modules/@confluentinc/kafka-javascript": { + "version": "0.1.17-devel", + "resolved": "https://registry.npmjs.org/@confluentinc/kafka-javascript/-/kafka-javascript-0.1.17-devel.tgz", + "integrity": "sha512-u+7Rvzw1ljNSKC54OBt89oWkj98zOj4zWT5FZkAcSc5SDdZfYuKatMZYo0vNiV1V9otQee6fdJEl3qtpDd7/fQ==", + "hasInstallScript": true, + "license": "MIT", + "workspaces": [ + "schemaregistry" + ], + "dependencies": { + "@aws-sdk/client-kms": "^3.637.0", + "@azure/identity": "^4.4.1", + "@azure/keyvault-keys": "^4.8.0", + "@bufbuild/protobuf": "^2.0.0", + "@criteria/json-schema": "^0.10.0", + "@criteria/json-schema-validation": "^0.10.0", + "@google-cloud/kms": "^4.5.0", + "@hackbg/miscreant-esm": "^0.3.2-patch.3", + "@mapbox/node-pre-gyp": "^1.0.11", + "@smithy/types": "^3.3.0", + "@types/simple-oauth2": "^5.0.7", + "@types/validator": "^13.12.0", + "ajv": "^8.17.1", + "async-mutex": "^0.5.0", + "avsc": "^5.7.7", + "axios": "^1.7.3", + "bindings": "^1.3.1", + "json-stringify-deterministic": "^1.0.12", + "lru-cache": "^11.0.0", + "nan": "^2.17.0", + "node-vault": "^0.10.2", + "simple-oauth2": "^5.1.0", + "ts-jest": "^29.2.4", + "validator": "^13.12.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, "node_modules/@confluentinc/schemaregistry": { "resolved": "schemaregistry", "link": true @@ -9279,6 +9318,7 @@ "@azure/identity": "^4.4.1", "@azure/keyvault-keys": "^4.8.0", "@bufbuild/protobuf": "^2.0.0", + "@confluentinc/kafka-javascript": "^0.1.17-devel", "@criteria/json-schema": "^0.10.0", "@criteria/json-schema-validation": "^0.10.0", "@google-cloud/kms": "^4.5.0", diff --git a/schemaregistry/package.json b/schemaregistry/package.json index 4841fe10..fb3705a7 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -30,6 +30,7 @@ "@azure/identity": "^4.4.1", "@azure/keyvault-keys": "^4.8.0", "@bufbuild/protobuf": "^2.0.0", + "@confluentinc/kafka-javascript": "^0.1.17-devel", "@criteria/json-schema": "^0.10.0", "@criteria/json-schema-validation": "^0.10.0", "@google-cloud/kms": "^4.5.0", diff --git a/schemaregistry/run_docker_schemaregistry.sh b/schemaregistry/run_docker_schemaregistry.sh index 63793ccf..d3338c07 100755 --- a/schemaregistry/run_docker_schemaregistry.sh +++ b/schemaregistry/run_docker_schemaregistry.sh @@ -17,4 +17,7 @@ fi echo "Running schema registry e2e tests" +# Waiting for Zookeeper and Kafka to start +sleep 10 + $JEST $INTEG_DIR