Skip to content

Commit

Permalink
Add clientConfig, baseUrl retry, RestError, encodeURIComponent (#12) (#…
Browse files Browse the repository at this point in the history
…68)

* Add clientConfig, baseUrl retry, RestError

* refactor such that RestService takes in necessary dependencies
  • Loading branch information
Claimundefine authored Aug 21, 2024
1 parent 4aee89f commit d73496f
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 219 deletions.
28 changes: 19 additions & 9 deletions dekregistry/dekregistry-client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import { LRUCache } from 'lru-cache';
import { Mutex } from 'async-mutex';
import { RestService } from '../schemaregistry/rest-service';
import { ClientConfig, RestService } from '../schemaregistry/rest-service';
import stringify from 'json-stringify-deterministic';

/*
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
*
* Copyright (c) 2024 Confluent, Inc.
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

interface Kek {
name?: string;
kmsType?: string;
Expand Down Expand Up @@ -51,13 +60,14 @@ class DekRegistryClient implements Client {
private kekMutex: Mutex;
private dekMutex: Mutex;

constructor(restService: RestService, cacheSize: number = 512, cacheTTL?: number) {
constructor(config: ClientConfig) {
const cacheOptions = {
max: cacheSize,
...(cacheTTL !== undefined && { maxAge: cacheTTL })
max: config.cacheCapacity,
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 }),
};

this.restService = restService;

this.restService = new RestService(config.createAxiosDefaults, config.baseURLs, config.isForward);
this.kekCache = new LRUCache<string, Kek>(cacheOptions);
this.dekCache = new LRUCache<string, Dek>(cacheOptions);
this.kekMutex = new Mutex();
Expand Down Expand Up @@ -124,7 +134,7 @@ class DekRegistryClient implements Client {
shared,
};

const response = await this.restService.sendHttpRequest<Kek>(
const response = await this.restService.handleRequest<Kek>(
'/dek-registry/v1/keks',
'POST',
request);
Expand All @@ -143,7 +153,7 @@ class DekRegistryClient implements Client {
}
name = encodeURIComponent(name);

const response = await this.restService.sendHttpRequest<Kek>(
const response = await this.restService.handleRequest<Kek>(
`/dek-registry/v1/keks/${name}?deleted=${deleted}`,
'GET');
this.kekCache.set(cacheKey, response.data);
Expand All @@ -169,7 +179,7 @@ class DekRegistryClient implements Client {
};
kekName = encodeURIComponent(kekName);

const response = await this.restService.sendHttpRequest<Dek>(
const response = await this.restService.handleRequest<Dek>(
`/dek-registry/v1/keks/${kekName}/deks`,
'POST',
request);
Expand All @@ -194,7 +204,7 @@ class DekRegistryClient implements Client {
kekName = encodeURIComponent(kekName);
subject = encodeURIComponent(subject);

const response = await this.restService.sendHttpRequest<Dek>(
const response = await this.restService.handleRequest<Dek>(
`/dek-registry/v1/keks/${kekName}/deks/${subject}/versions/${version}?deleted=${deleted}`,
'GET');
this.dekCache.set(cacheKey, response.data);
Expand Down
20 changes: 7 additions & 13 deletions e2e/schemaregistry/schemaregistry-client.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { RestService } from '../../schemaregistry/rest-service';
import {
Compatibility,
SchemaRegistryClient,
Expand All @@ -8,19 +7,10 @@ import {
Metadata
} from '../../schemaregistry/schemaregistry-client';
import { beforeEach, describe, expect, it } from '@jest/globals';
import { clientConfig } from '../../test/schemaregistry/test-constants';

/* eslint-disable @typescript-eslint/no-non-null-asserted-optional-chain */

const baseUrls = ['http://localhost:8081'];
const headers = { 'Content-Type': 'application/vnd.schemaregistry.v1+json' };
const restService = new RestService(baseUrls, false);
restService.setHeaders(headers);

const basicAuth = Buffer.from('RBACAllowedUser-lsrc1:nohash').toString('base64');
restService.setAuth(basicAuth);

restService.setTimeout(10000);

let schemaRegistryClient: SchemaRegistryClient;
const testSubject = 'integ-test-subject';
const testServerConfigSubject = 'integ-test-server-config-subject';
Expand Down Expand Up @@ -72,7 +62,7 @@ const backwardCompatibleSchemaInfo: SchemaInfo = {
describe('SchemaRegistryClient Integration Test', () => {

beforeEach(async () => {
schemaRegistryClient = new SchemaRegistryClient(restService);
schemaRegistryClient = new SchemaRegistryClient(clientConfig);
const subjects: string[] = await schemaRegistryClient.getAllSubjects();

if (subjects && subjects.includes(testSubject)) {
Expand All @@ -86,7 +76,11 @@ describe('SchemaRegistryClient Integration Test', () => {
}
});

it('should register, retrieve, and delete a schema', async () => {
it("Should return RestError when retrieving non-existent schema", async () => {
await expect(schemaRegistryClient.getBySubjectAndId(testSubject, 1)).rejects.toThrow();
});

it('Should register, retrieve, and delete a schema', async () => {
// Register a schema
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, schemaInfo);
expect(registerResponse).toBeDefined();
Expand Down
10 changes: 10 additions & 0 deletions schemaregistry/rest-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export class RestError extends Error {
status: number;
errorCode: number;

constructor(message: string, status: number, errorCode: number) {
super(message + "; Error code: " + errorCode);
this.status = status;
this.errorCode = errorCode;
}
}
64 changes: 41 additions & 23 deletions schemaregistry/rest-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse } from 'axios';
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse, CreateAxiosDefaults } from 'axios';
import { RestError } from './rest-error';

/*
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
Expand All @@ -9,43 +10,60 @@ import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse } from 'axios';
* of the MIT license. See the LICENSE.txt file for details.
*/

export type ClientConfig = {
createAxiosDefaults: CreateAxiosDefaults,
baseURLs: string[],
cacheCapacity: number,
cacheLatestTtlSecs?: number,
isForward?: boolean
}

export class RestService {
private client: AxiosInstance
private client: AxiosInstance;
private baseURLs: string[];

constructor(baseUrls: string[], isForward = false) {
this.client = axios.create({
baseURL: baseUrls[0], // Use the first base URL as the default
timeout: 5000, // Default timeout
headers: { 'Content-Type': 'application/vnd.schemaregistry.v1+json' },
})
constructor(axiosDefaults: CreateAxiosDefaults, baseURLs: string[], isForward?: boolean) {
this.client = axios.create(axiosDefaults);
this.baseURLs = baseURLs;

if (isForward) {
this.client.defaults.headers.common['X-Forward'] = 'true'
}
}

public async sendHttpRequest<T>(
public async handleRequest<T>(
url: string,
method: 'GET' | 'POST' | 'PUT' | 'DELETE',
data?: any, // eslint-disable-line @typescript-eslint/no-explicit-any
config?: AxiosRequestConfig,
): Promise<AxiosResponse<T>> {
try {
const response = await this.client.request<T>({
url,
method,
data,
...config,
})
return response
} catch (error) {
if (axios.isAxiosError(error) && error.response) {
throw new Error(`HTTP error: ${error.response.status} - ${error.response.data}`)
} else {
const err = error as Error;
throw new Error(`Unknown error: ${err.message}`)

for (let i = 0; i < this.baseURLs.length; i++) {
try {
this.setBaseURL(this.baseURLs[i]);
const response = await this.client.request<T>({
url,
method,
data,
...config,
})
return response;
} catch (error) {
if (axios.isAxiosError(error) && error.response && (error.response.status < 200 || error.response.status > 299)) {
const data = error.response.data;
if (data.error_code && data.message) {
error = new RestError(data.message, error.response.status, data.error_code);
} else {
error = new Error(`Unknown error: ${error.message}`)
}
}
if (i === this.baseURLs.length - 1) {
throw error;
}
}
}

throw new Error('Internal HTTP retry error'); // Should never reach here
}

public setHeaders(headers: Record<string, string>): void {
Expand Down
Loading

0 comments on commit d73496f

Please sign in to comment.