Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,397 changes: 3,453 additions & 944 deletions package-lock.json

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
},
"license": "MIT",
"devDependencies": {
"@bufbuild/buf": "^1.37.0",
"@bufbuild/protoc-gen-es": "^2.0.0",
"@eslint/js": "^9.9.0",
"@types/eslint__js": "^8.42.3",
"@types/node": "^20.4.5",
"@types/node": "^20.16.1",
"bluebird": "^3.5.3",
"eslint": "^8.57.0",
"eslint-plugin-jest": "^28.6.0",
Expand All @@ -46,13 +48,16 @@
"typescript-eslint": "^8.2.0"
},
"dependencies": {
"@bufbuild/buf": "^1.37.0",
"@aws-sdk/client-kms": "^3.637.0",
"@azure/identity": "^4.4.1",
"@azure/keyvault-keys": "^4.8.0",
"@bufbuild/protobuf": "^2.0.0",
"@bufbuild/protoc-gen-es": "^2.0.0",
"@criteria/json-schema": "^0.10.0",
"@criteria/json-schema-validation": "^0.10.0",
"@google-cloud/kms": "^4.5.0",
"@hackbg/miscreant-esm": "^0.3.2-patch.3",
"@mapbox/node-pre-gyp": "^1.0.11",
"@smithy/types": "^3.3.0",
"@types/validator": "^13.12.0",
"ajv": "^8.17.1",
"async-mutex": "^0.5.0",
Expand All @@ -61,8 +66,8 @@
"bindings": "^1.3.1",
"json-stringify-deterministic": "^1.0.12",
"lru-cache": "^11.0.0",
"miscreant": "^0.3.2",
"nan": "^2.17.0",
"node-vault": "^0.10.2",
"ts-jest": "^29.2.4",
"validator": "^13.12.0"
},
Expand Down
9 changes: 9 additions & 0 deletions schemaregistry/dekregistry/dekregistry-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { LRUCache } from 'lru-cache';
import { Mutex } from 'async-mutex';
import { ClientConfig, RestService } from '../rest-service';
import stringify from 'json-stringify-deterministic';
import {MockDekRegistryClient} from "./mock-dekregistry-client";

/*
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
Expand Down Expand Up @@ -76,6 +77,14 @@ class DekRegistryClient implements Client {
this.dekMutex = new Mutex();
}

static newClient(config: ClientConfig): Client {
let url = config.baseURLs[0]
if (url.startsWith("mock://")) {
return new MockDekRegistryClient()
}
return new DekRegistryClient(config)
}

static getEncryptedKeyMaterialBytes(dek: Dek): Buffer | null {
if (!dek.encryptedKeyMaterial) {
return null;
Expand Down
9 changes: 5 additions & 4 deletions schemaregistry/dekregistry/mock-dekregistry-client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Client, Dek, Kek } from "./dekregistry-client";
import { MOCK_TS } from "./constants";
import stringify from "json-stringify-deterministic";
import {RestError} from "../rest-error";

class MockDekRegistryClient implements Client {
private kekCache: Map<string, Kek>;
Expand Down Expand Up @@ -39,7 +40,7 @@ class MockDekRegistryClient implements Client {
return cachedKek;
}

throw new Error(`Kek not found: ${name}`);
throw new RestError(`Kek not found: ${name}`, 404, 40400);
}

async registerDek(kekName: string, subject: string, algorithm: string,
Expand Down Expand Up @@ -75,18 +76,18 @@ class MockDekRegistryClient implements Client {
}
}
if (latestVersion === 0) {
throw new Error(`Dek not found: ${subject}`);
throw new RestError(`Dek not found: ${subject}`, 404, 40400);
}
version = latestVersion;
}

const cacheKey = stringify({ kekName, subject, version, algorithm, deleted });
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted: false });
const cachedDek = this.dekCache.get(cacheKey);
if (cachedDek) {
return cachedDek;
}

throw new Error(`Dek not found: ${subject}`);
throw new RestError(`Dek not found: ${subject}`, 404, 40400);
}

async close() {
Expand Down
50 changes: 29 additions & 21 deletions schemaregistry/mock-schemaregistry-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@

import { Client, Compatibility, SchemaInfo, SchemaMetadata, ServerConfig } from './schemaregistry-client';
import {
Client,
Compatibility,
minimize,
SchemaInfo,
SchemaMetadata,
ServerConfig
} from './schemaregistry-client';
import stringify from "json-stringify-deterministic";
import {ClientConfig} from "./rest-service";
import {RestError} from "./rest-error";

interface VersionCacheEntry {
version: number;
Expand Down Expand Up @@ -57,13 +65,13 @@ class MockClient implements Client {
async register(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise<number> {
const metadata = await this.registerFullResponse(subject, schema, normalize);
if (!metadata) {
throw new Error("Failed to register schema");
throw new RestError("Failed to register schema", 422, 42200);
}
return metadata.id;
}

async registerFullResponse(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise<SchemaMetadata> {
const cacheKey = stringify({ subject, schema });
const cacheKey = stringify({ subject, schema: minimize(schema) });

const cacheEntry = this.infoToSchemaCache.get(cacheKey);
if (cacheEntry && !cacheEntry.softDeleted) {
Expand All @@ -72,7 +80,7 @@ class MockClient implements Client {

const id = await this.getIDFromRegistry(subject, schema);
if (id === -1) {
throw new Error("Failed to retrieve schema ID from registry");
throw new RestError("Failed to retrieve schema ID from registry", 422, 42200);
}

const metadata: SchemaMetadata = { ...schema, id };
Expand Down Expand Up @@ -112,7 +120,7 @@ class MockClient implements Client {
newVersion = versions[versions.length - 1] + 1;
}

const cacheKey = stringify({ subject, schema: schema });
const cacheKey = stringify({ subject, schema: minimize(schema) });
this.schemaToVersionCache.set(cacheKey, { version: newVersion, softDeleted: false });
}

Expand All @@ -121,24 +129,24 @@ class MockClient implements Client {
const cacheEntry = this.idToSchemaCache.get(cacheKey);

if (!cacheEntry || cacheEntry.softDeleted) {
throw new Error("Schema not found");
throw new RestError("Schema not found", 404, 40400);
}
return cacheEntry.info;
}

async getId(subject: string, schema: SchemaInfo): Promise<number> {
const cacheKey = stringify({ subject, schema });
const cacheKey = stringify({ subject, schema: minimize(schema) });
const cacheEntry = this.infoToSchemaCache.get(cacheKey);
if (!cacheEntry || cacheEntry.softDeleted) {
throw new Error("Schema not found");
throw new RestError("Schema not found", 404, 40400);
}
return cacheEntry.metadata.id;
}

async getLatestSchemaMetadata(subject: string): Promise<SchemaMetadata> {
const version = await this.latestVersion(subject);
if (version === -1) {
throw new Error("No versions found for subject");
throw new RestError("No versions found for subject", 404, 40400);
}

return this.getSchemaMetadata(subject, version);
Expand All @@ -154,7 +162,7 @@ class MockClient implements Client {
}

if (!json) {
throw new Error("Schema not found");
throw new RestError("Schema not found", 404, 40400);
}

let id: number = -1;
Expand All @@ -165,15 +173,15 @@ class MockClient implements Client {
}
}
if (id === -1) {
throw new Error("Schema not found");
throw new RestError("Schema not found", 404, 40400);
}


return {
id,
version,
subject,
schema: json.schema.schema
...json.schema,
};
}

Expand All @@ -198,7 +206,7 @@ class MockClient implements Client {
}

if (results.length === 0) {
throw new Error("Schema not found");
throw new RestError("Schema not found", 404, 40400);
}

let latest: SchemaMetadata = results[0];
Expand All @@ -225,7 +233,7 @@ class MockClient implements Client {
const results = await this.allVersions(subject);

if (results.length === 0) {
throw new Error("No versions found for subject");
throw new RestError("No versions found for subject", 404, 40400);
}
return results;
}
Expand Down Expand Up @@ -275,11 +283,11 @@ class MockClient implements Client {
}

async getVersion(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise<number> {
const cacheKey = stringify({ subject, schema });
const cacheKey = stringify({ subject, schema: minimize(schema) });
const cacheEntry = this.schemaToVersionCache.get(cacheKey);

if (!cacheEntry || cacheEntry.softDeleted) {
throw new Error("Schema not found");
throw new RestError("Schema not found", 404, 40400);
}

return cacheEntry.version;
Expand Down Expand Up @@ -333,7 +341,7 @@ class MockClient implements Client {
if (parsedKey.subject === subject && value.version === version) {
await this.deleteVersion(key, version, permanent);

const cacheKeySchema = stringify({ subject, schema: parsedKey.schema });
const cacheKeySchema = stringify({ subject, schema: minimize(parsedKey.schema) });
const cacheEntry = this.infoToSchemaCache.get(cacheKeySchema);
if (cacheEntry) {
await this.deleteMetadata(cacheKeySchema, cacheEntry.metadata, permanent);
Expand Down Expand Up @@ -363,7 +371,7 @@ class MockClient implements Client {
async getCompatibility(subject: string): Promise<Compatibility> {
const cacheEntry = this.configCache.get(subject);
if (!cacheEntry) {
throw new Error("Subject not found");
throw new RestError("Subject not found", 404, 40400);
}
return cacheEntry.compatibilityLevel as Compatibility;
}
Expand All @@ -376,7 +384,7 @@ class MockClient implements Client {
async getDefaultCompatibility(): Promise<Compatibility> {
const cacheEntry = this.configCache.get(noSubject);
if (!cacheEntry) {
throw new Error("Default compatibility not found");
throw new RestError("Default compatibility not found", 404, 40400);
}
return cacheEntry.compatibilityLevel as Compatibility;
}
Expand All @@ -389,7 +397,7 @@ class MockClient implements Client {
async getConfig(subject: string): Promise<ServerConfig> {
const cacheEntry = this.configCache.get(subject);
if (!cacheEntry) {
throw new Error("Subject not found");
throw new RestError("Subject not found", 404, 40400);
}
return cacheEntry;
}
Expand All @@ -402,7 +410,7 @@ class MockClient implements Client {
async getDefaultConfig(): Promise<ServerConfig> {
const cacheEntry = this.configCache.get(noSubject);
if (!cacheEntry) {
throw new Error("Default config not found");
throw new RestError("Default config not found", 404, 40400);
}
return cacheEntry;
}
Expand Down
46 changes: 46 additions & 0 deletions schemaregistry/rules/encryption/awskms/aws-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {KmsClient} from "../kms-registry";
import {AwsKmsDriver} from "./aws-driver";
import {
DecryptCommand,
EncryptCommand,
KMSClient
} from '@aws-sdk/client-kms'
import {AwsCredentialIdentity} from "@smithy/types";

export class AwsKmsClient implements KmsClient {

private kmsClient: KMSClient
private keyId: string

constructor(keyUri: string, creds?: AwsCredentialIdentity) {
if (!keyUri.startsWith(AwsKmsDriver.PREFIX)) {
throw new Error(`key uri must start with ${AwsKmsDriver.PREFIX}`)
}
this.keyId = keyUri.substring(AwsKmsDriver.PREFIX.length)
const tokens = this.keyId.split(':')
if (tokens.length < 4) {
throw new Error(`invalid key uri ${this.keyId}`)
}
const regionName = tokens[3]
this.kmsClient = new KMSClient({
region: regionName,
...creds && {credentials: creds}
})
}

supported(keyUri: string): boolean {
return keyUri.startsWith(AwsKmsDriver.PREFIX)
}

async encrypt(plaintext: Buffer): Promise<Buffer> {
const encryptCommand = new EncryptCommand({KeyId: this.keyId, Plaintext: plaintext});
const data = await this.kmsClient.send(encryptCommand)
return Buffer.from(data.CiphertextBlob!);
}

async decrypt(ciphertext: Buffer): Promise<Buffer> {
const decryptCommand = new DecryptCommand({KeyId: this.keyId, CiphertextBlob: ciphertext});
const data = await this.kmsClient.send(decryptCommand);
return Buffer.from(data.Plaintext!)
}
}
29 changes: 29 additions & 0 deletions schemaregistry/rules/encryption/awskms/aws-driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {KmsClient, KmsDriver, registerKmsDriver} from "../kms-registry";
import {AwsKmsClient} from "./aws-client";
import {AwsCredentialIdentity} from "@smithy/types";

export class AwsKmsDriver implements KmsDriver {

static PREFIX = 'aws-kms://'
static ACCESS_KEY_ID = 'access.key.id'
static SECRET_ACCESS_KEY = 'secret.access.key'

static register(): void {
registerKmsDriver(new AwsKmsDriver())
}

getKeyUrlPrefix(): string {
return AwsKmsDriver.PREFIX
}

newKmsClient(config: Map<string, string>, keyUrl?: string): KmsClient {
const uriPrefix = keyUrl != null ? keyUrl : AwsKmsDriver.PREFIX
const key = config.get(AwsKmsDriver.ACCESS_KEY_ID)
const secret = config.get(AwsKmsDriver.SECRET_ACCESS_KEY)
let creds: AwsCredentialIdentity | undefined
if (key != null && secret != null) {
creds = {accessKeyId: key, secretAccessKey: secret}
}
return new AwsKmsClient(uriPrefix, creds)
}
}
33 changes: 33 additions & 0 deletions schemaregistry/rules/encryption/azurekms/azure-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {KmsClient} from "../kms-registry";
import {AzureKmsDriver} from "./azure-driver";
import {TokenCredential} from "@azure/identity";
import {CryptographyClient, EncryptionAlgorithm} from "@azure/keyvault-keys";

export class AzureKmsClient implements KmsClient {
private static ALGORITHM: EncryptionAlgorithm = 'RSA-OAEP-256'

private kmsClient: CryptographyClient
private keyId: string

constructor(keyUri: string, creds: TokenCredential) {
if (!keyUri.startsWith(AzureKmsDriver.PREFIX)) {
throw new Error(`key uri must start with ${AzureKmsDriver.PREFIX}`)
}
this.keyId = keyUri.substring(AzureKmsDriver.PREFIX.length)
this.kmsClient = new CryptographyClient(this.keyId, creds)
}

supported(keyUri: string): boolean {
return keyUri.startsWith(AzureKmsDriver.PREFIX)
}

async encrypt(plaintext: Buffer): Promise<Buffer> {
const result = await this.kmsClient.encrypt(AzureKmsClient.ALGORITHM, plaintext)
return Buffer.from(result.result)
}

async decrypt(ciphertext: Buffer): Promise<Buffer> {
const result = await this.kmsClient.decrypt(AzureKmsClient.ALGORITHM, ciphertext)
return Buffer.from(result.result)
}
}
Loading