diff --git a/package-lock.json b/package-lock.json index c69d17e0..7a43a244 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9322,7 +9322,7 @@ }, "schemaregistry": { "name": "@confluentinc/schemaregistry", - "version": "v0.1.17.6-devel", + "version": "v0.2.1", "license": "MIT", "dependencies": { "@aws-sdk/client-kms": "^3.637.0", @@ -9373,7 +9373,8 @@ "license": "ISC", "devDependencies": { "@confluentinc/kafka-javascript": "^0.2.0", - "@confluentinc/schemaregistry": "^v0.1.17.6-devel", + "@confluentinc/schemaregistry": "^v0.2.1", + "axios": "^1.7.7", "uuid": "^10.0.0" } }, diff --git a/schemaregistry-examples/package.json b/schemaregistry-examples/package.json index 29e4d4f4..0b6e46c2 100644 --- a/schemaregistry-examples/package.json +++ b/schemaregistry-examples/package.json @@ -9,7 +9,8 @@ "description": "", "devDependencies": { "@confluentinc/kafka-javascript": "^0.2.0", - "@confluentinc/schemaregistry": "^v0.1.17.6-devel", + "@confluentinc/schemaregistry": "^v0.2.1", + "axios": "^1.7.7", "uuid": "^10.0.0" } } diff --git a/schemaregistry-examples/src/constants.ts b/schemaregistry-examples/src/constants.ts index 05065da6..a30f5881 100644 --- a/schemaregistry-examples/src/constants.ts +++ b/schemaregistry-examples/src/constants.ts @@ -1,11 +1,12 @@ import { BasicAuthCredentials } from '@confluentinc/schemaregistry'; const issuerEndpointUrl = ''; // e.g. 'https://dev-123456.okta.com/oauth2/default/v1/token'; -const clientId = ''; -const clientSecret = ''; +const oauthClientId = ''; +const oauthClientSecret = ''; const scope = ''; // e.g. 'schemaregistry'; const identityPoolId = ''; // e.g. pool-Gx30 -const logicalCluster = ''; //e.g. lsrc-a6m5op +const kafkaLogicalCluster = ''; // e.g. lkc-12345 +const schemaRegistryLogicalCluster = ''; //e.g. lsrc-a6m5op const baseUrl = ''; // e.g. 'https://psrc-3amt5nj.us-east-1.aws.confluent.cloud' const clusterBootstrapUrl = ''; // e.g. "pkc-p34xa.us-east-1.aws.confluent.cloud:9092" const clusterApiKey = ''; @@ -22,6 +23,6 @@ const basicAuthCredentials: BasicAuthCredentials = { }; export { - issuerEndpointUrl, clientId, clientSecret, scope, identityPoolId, logicalCluster, baseUrl, - clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials + issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster, + baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials }; \ No newline at end of file diff --git a/schemaregistry-examples/src/kafka-oauth.ts b/schemaregistry-examples/src/kafka-oauth.ts new file mode 100644 index 00000000..0e044d87 --- /dev/null +++ b/schemaregistry-examples/src/kafka-oauth.ts @@ -0,0 +1,137 @@ +import { + AvroSerializer, AvroSerializerConfig, SerdeType, + ClientConfig, SchemaRegistryClient, SchemaInfo, BearerAuthCredentials +} from "@confluentinc/schemaregistry"; +import { CreateAxiosDefaults } from "axios"; +import { KafkaJS } from '@confluentinc/kafka-javascript'; +import { + clusterBootstrapUrl, + baseUrl, + issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, + identityPoolId, schemaRegistryLogicalCluster, kafkaLogicalCluster +} from "./constants"; +import axios from 'axios'; + +// Only showing the producer, will be the same implementation for the consumer + +async function token_refresh() { + try { + // Make a POST request to get the access token + const response = await axios.post(issuerEndpointUrl, new URLSearchParams({ + grant_type: 'client_credentials', + client_id: oauthClientId, + client_secret: oauthClientSecret, + scope: scope + }), { + headers: { + 'Content-Type': 'application/x-www-form-urlencoded' + } + }); + + // Extract the token and expiration time from the response + const token = response.data.access_token; + const exp_seconds = Math.floor(Date.now() / 1000) + response.data.expires_in; + const exp_ms = exp_seconds * 1000; + + const principal = 'admin'; // You can adjust this based on your needs + const extensions = { + traceId: '123', + logicalCluster: kafkaLogicalCluster, + identityPoolId: identityPoolId + }; + + return { value: token, lifetime: exp_ms, principal, extensions }; + } catch (error) { + console.error('Failed to retrieve OAuth token:', error); + throw new Error('Failed to retrieve OAuth token'); + } +} + +async function kafkaProducerAvro() { + + const createAxiosDefaults: CreateAxiosDefaults = { + timeout: 10000 + }; + + const bearerAuthCredentials: BearerAuthCredentials = { + credentialsSource: 'OAUTHBEARER', + issuerEndpointUrl: issuerEndpointUrl, + clientId: oauthClientId, + clientSecret: oauthClientSecret, + scope: scope, + identityPoolId: identityPoolId, + logicalCluster: schemaRegistryLogicalCluster + } + + const clientConfig: ClientConfig = { + baseURLs: [baseUrl], + createAxiosDefaults: createAxiosDefaults, + cacheCapacity: 512, + cacheLatestTtlSecs: 60, + bearerAuthCredentials: bearerAuthCredentials + }; + + const schemaRegistryClient = new SchemaRegistryClient(clientConfig); + + const kafka: KafkaJS.Kafka = new KafkaJS.Kafka({ + kafkaJS: { + brokers: [clusterBootstrapUrl], + ssl: true, + sasl: { + mechanism: 'oauthbearer', + oauthBearerProvider: token_refresh + }, + }, + }); + + const producer: KafkaJS.Producer = kafka.producer({ + kafkaJS: { + allowAutoTopicCreation: true, + acks: 1, + compression: KafkaJS.CompressionTypes.GZIP, + } + }); + + console.log("Producer created"); + + const schemaString: string = JSON.stringify({ + type: 'record', + name: 'User', + fields: [ + { name: 'name', type: 'string' }, + { name: 'age', type: 'int' }, + ], + }); + + const schemaInfo: SchemaInfo = { + schemaType: 'AVRO', + schema: schemaString, + }; + + const userTopic = 'example-user-topic'; + await schemaRegistryClient.register(userTopic + "-value", schemaInfo); + + const userInfo = { name: 'Alice N Bob', age: 30 }; + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + + const serializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + + const outgoingMessage = { + key: "1", + value: await serializer.serialize(userTopic, userInfo) + }; + + console.log("Outgoing message: ", outgoingMessage); + + await producer.connect(); + + await producer.send({ + topic: userTopic, + messages: [outgoingMessage] + }); + + await producer.disconnect(); +} + +kafkaProducerAvro(); \ No newline at end of file diff --git a/schemaregistry-examples/src/oauth-schemaregistry.ts b/schemaregistry-examples/src/oauth-schemaregistry.ts index 3481abd6..10be00bd 100644 --- a/schemaregistry-examples/src/oauth-schemaregistry.ts +++ b/schemaregistry-examples/src/oauth-schemaregistry.ts @@ -1,8 +1,8 @@ import { SchemaRegistryClient, BearerAuthCredentials, ClientConfig } from '@confluentinc/schemaregistry'; import { CreateAxiosDefaults } from 'axios'; import { - issuerEndpointUrl, clientId, clientSecret, scope, - identityPoolId, logicalCluster, baseUrl + issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, + identityPoolId, schemaRegistryLogicalCluster, baseUrl } from './constants'; async function oauthSchemaRegistry() { @@ -10,11 +10,11 @@ async function oauthSchemaRegistry() { const bearerAuthCredentials: BearerAuthCredentials = { credentialsSource: 'OAUTHBEARER', issuerEndpointUrl: issuerEndpointUrl, - clientId: clientId, - clientSecret: clientSecret, + clientId: oauthClientId, + clientSecret: oauthClientSecret, scope: scope, identityPoolId: identityPoolId, - logicalCluster: logicalCluster + logicalCluster: schemaRegistryLogicalCluster } const createAxiosDefaults: CreateAxiosDefaults = {