Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion schemaregistry/mock-schemaregistry-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ class MockClient implements Client {
const parsedKey = JSON.parse(key);
if (parsedKey.subject === subject && (!value.softDeleted || deleted)) {
if (parsedKey.schema.metadata && this.isSubset(metadata, parsedKey.schema.metadata.properties)) {
results.push({ id: parsedKey.schema.id, version: value.version, subject, schema: parsedKey.schema.schema });
results.push({
id: parsedKey.schema.id,
version: value.version,
subject,
...parsedKey.schema
});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/rules/encryption/azurekms/azure-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class AzureKmsClient implements KmsClient {
}

async encrypt(plaintext: Buffer): Promise<Buffer> {
const result = await this.kmsClient.encrypt(AzureKmsClient.ALGORITHM, plaintext)
const result = await this.kmsClient.encrypt(AzureKmsClient.ALGORITHM, plaintext)
return Buffer.from(result.result)
}

Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/rules/encryption/azurekms/azure-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {AzureKmsClient} from "./azure-client";
export class AzureKmsDriver implements KmsDriver {

static PREFIX = 'azure-kms://'
static TENANT_ID = 'tenant_id'
static TENANT_ID = 'tenant.id'
static CLIENT_ID = 'client.id'
static CLIENT_SECRET = 'client.secret'

Expand Down
20 changes: 18 additions & 2 deletions schemaregistry/rules/encryption/encrypt-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,31 @@ interface DekId {
deleted: boolean
}

export class Clock {
now(): number {
return Date.now()
}
}

export class FieldEncryptionExecutor extends FieldRuleExecutor {
client: Client | null = null
clock: Clock

static register(): FieldEncryptionExecutor {
const executor = new FieldEncryptionExecutor()
return this.registerWithClock(new Clock())
}

static registerWithClock(clock: Clock): FieldEncryptionExecutor {
const executor = new FieldEncryptionExecutor(clock)
RuleRegistry.registerRuleExecutor(executor)
return executor
}

constructor(clock: Clock = new Clock()) {
super()
this.clock = clock
}

override configure(clientConfig: ClientConfig, config: Map<string, string>) {
this.client = DekRegistryClient.newClient(clientConfig)
this.config = config
Expand Down Expand Up @@ -416,7 +432,7 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
}

isExpired(ctx: RuleContext, dek: Dek | null): boolean {
const now = Date.now()
const now = this.executor.clock.now()
return ctx.ruleMode !== RuleMode.READ &&
this.dekExpiryDays > 0 &&
dek != null &&
Expand Down
8 changes: 2 additions & 6 deletions schemaregistry/rules/encryption/gcpkms/gcp-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@ export class GcpKmsClient implements KmsClient {
throw new Error(`key uri must start with ${GcpKmsDriver.PREFIX}`)
}
this.keyId = keyUri.substring(GcpKmsDriver.PREFIX.length)
const tokens = this.keyId.split(':')
if (tokens.length < 4) {
throw new Error(`invalid key uri ${this.keyId}`)
}
this.kmsClient = creds != null
? new KeyManagementServiceClient()
: new KeyManagementServiceClient({credentials: creds})
? new KeyManagementServiceClient({credentials: creds})
: new KeyManagementServiceClient()
}

supported(keyUri: string): boolean {
Expand Down
10 changes: 6 additions & 4 deletions schemaregistry/rules/encryption/hcvault/hcvault-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ export class HcVaultClient implements KmsClient {
}

async encrypt(plaintext: Buffer): Promise<Buffer> {
const data = await this.kmsClient.encryptData({name: this.keyName, plainText: plaintext.toString('base64') })
return Buffer.from(data.ciphertext, 'base64')
const response = await this.kmsClient.encryptData({name: this.keyName, plaintext: plaintext.toString('base64') })
let data = response.data.ciphertext
return Buffer.from(data, 'utf8')
}

async decrypt(ciphertext: Buffer): Promise<Buffer> {
const data = await this.kmsClient.decryptData({name: this.keyName, cipherText: ciphertext.toString('base64') })
return Buffer.from(data.plaintext, 'base64')
const response = await this.kmsClient.decryptData({name: this.keyName, ciphertext: ciphertext.toString('utf8') })
let data = response.data.plaintext
return Buffer.from(data, 'base64');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {HcVaultClient} from "./hcvault-client";

export class HcVaultDriver implements KmsDriver {

static PREFIX = 'hcvault-kms://'
static PREFIX = 'hcvault://'
static TOKEN_ID = 'token.id'
static NAMESPACE = 'namespace'

Expand Down
5 changes: 5 additions & 0 deletions schemaregistry/rules/encryption/kms-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ export function getKmsClient(keyUrl: string): KmsClient | null {
return null
}

export function clearKmsClients(): void {
kmsClients.length = 0
}


54 changes: 29 additions & 25 deletions schemaregistry/serde/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,7 @@ export class AvroSerializer extends Serializer implements AvroSerde {
throw new Error('message is empty')
}

let enumIndex = 1
let fixedIndex = 1
let recordIndex = 1

const namingHook: TypeHook = (
avroSchema: avro.Schema,
opts: ForSchemaOptions,
) => {
let schema = avroSchema as any
switch (schema.type) {
case 'enum':
schema.name = `Enum${enumIndex++}`;
break;
case 'fixed':
schema.name = `Fixed${fixedIndex++}`;
break;
case 'record':
schema.name = `Record${recordIndex++}`;
break;
default:
}
return undefined
}

let avroSchema = Type.forValue(msg, { typeHook: namingHook })
let avroSchema = AvroSerializer.messageToSchema(msg)
const schema: SchemaInfo = {
schemaType: 'AVRO',
schema: JSON.stringify(avroSchema),
Expand All @@ -104,6 +80,34 @@ export class AvroSerializer extends Serializer implements AvroSerde {
return deps
})
}

static messageToSchema(msg: any): avro.Type {
let enumIndex = 1
let fixedIndex = 1
let recordIndex = 1

const namingHook: TypeHook = (
avroSchema: avro.Schema,
opts: ForSchemaOptions,
) => {
let schema = avroSchema as any
switch (schema.type) {
case 'enum':
schema.name = `Enum${enumIndex++}`;
break;
case 'fixed':
schema.name = `Fixed${fixedIndex++}`;
break;
case 'record':
schema.name = `Record${recordIndex++}`;
break;
default:
}
return undefined
}

return Type.forValue(msg, { typeHook: namingHook })
}
}

export type AvroDeserializerConfig = DeserializerConfig & AvroSerdeConfig
Expand Down
17 changes: 12 additions & 5 deletions schemaregistry/serde/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class JsonSerializer extends Serializer implements JsonSerde {
throw new Error('message is empty')
}

const jsonSchema = generateSchema(msg)
const jsonSchema = JsonSerializer.messageToSchema(msg)
const schema: SchemaInfo = {
schemaType: 'JSON',
schema: JSON.stringify(jsonSchema),
Expand All @@ -92,14 +92,14 @@ export class JsonSerializer extends Serializer implements JsonSerde {
}

async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise<any> {
const schema = this.toType(ctx.target)
const schema = await this.toType(ctx.target)
if (typeof schema === 'boolean') {
return msg
}
return await transform(ctx, schema, '$', msg, fieldTransform)
}

toType(info: SchemaInfo): DereferencedJSONSchema {
async toType(info: SchemaInfo): Promise<DereferencedJSONSchema> {
return toType(this.client, this.conf as JsonDeserializerConfig, this, info, async (client, info) => {
const deps = new Map<string, string>()
await this.resolveReferences(client, info, deps)
Expand All @@ -115,6 +115,10 @@ export class JsonSerializer extends Serializer implements JsonSerde {
},
)
}

static messageToSchema(msg: any): DereferencedJSONSchema {
return generateSchema(msg)
}
}

export type JsonDeserializerConfig = DeserializerConfig & JsonSerdeConfig
Expand Down Expand Up @@ -173,7 +177,7 @@ export class JsonDeserializer extends Deserializer implements JsonSerde {
}

async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise<any> {
const schema = this.toType(ctx.target)
const schema = await this.toType(ctx.target)
return await transform(ctx, schema, '$', msg, fieldTransform)
}

Expand Down Expand Up @@ -211,14 +215,17 @@ async function toValidateFunction(

const json = JSON.parse(info.schema)
const spec = json.$schema
if (spec === 'http://json-schema.org/draft/2020-12/schema') {
if (spec === 'http://json-schema.org/draft/2020-12/schema'
|| spec === 'https://json-schema.org/draft/2020-12/schema') {
const ajv2020 = new Ajv2020(conf as JsonSerdeConfig)
ajv2020.addKeyword("confluent:tags")
deps.forEach((schema, name) => {
ajv2020.addSchema(JSON.parse(schema), name)
})
fn = ajv2020.compile(json)
} else {
const ajv = new Ajv2019(conf as JsonSerdeConfig)
ajv.addKeyword("confluent:tags")
ajv.addMetaSchema(draft6MetaSchema)
ajv.addMetaSchema(draft7MetaSchema)
deps.forEach((schema, name) => {
Expand Down
6 changes: 3 additions & 3 deletions schemaregistry/serde/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ export abstract class Serde {
rules = target.ruleSet?.migrationRules
break
case RuleMode.DOWNGRADE:
rules = source?.ruleSet?.migrationRules?.reverse()
rules = source?.ruleSet?.migrationRules?.map(x => x).reverse()
break
default:
rules = target.ruleSet?.domainRules
if (ruleMode === RuleMode.READ) {
// Execute read rules in reverse order for symmetry
rules = rules?.reverse()
rules = rules?.map(x => x).reverse()
}
break
}
Expand Down Expand Up @@ -394,7 +394,7 @@ export abstract class Deserializer extends Serde {
previous = version
}
if (migrationMode === RuleMode.DOWNGRADE) {
migrations = migrations.reverse()
migrations = migrations.map(x => x).reverse()
}
return migrations
}
Expand Down
4 changes: 2 additions & 2 deletions schemaregistry/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
"lib": [
"es2021", "dom"
],
"module": "preserve",
"module": "nodenext",
"target": "es2021",
"strict": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"moduleResolution": "bundler",
"moduleResolution": "nodenext",
"allowUnusedLabels": false,
"allowUnreachableCode": false,
"noFallthroughCasesInSwitch": true,
Expand Down
Loading