Skip to content

Commit

Permalink
[schema registry] Return undefined for not found (#15149)
Browse files Browse the repository at this point in the history
  • Loading branch information
nguerrera authored May 8, 2021
1 parent d286ddd commit 9e856ac
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ npm run build
4. Run whichever samples you like (note that some samples may require additional setup, see the table above):

```bash
node dist/schemaRegistryAvroSample.ts
node dist/schemaRegistryAvroSample.js
```

Alternatively, run a single sample with the correct environment variables set (setting up the `.env` file is not required if you do this), for example (cross-platform):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"compilerOptions": {
"target": "ES6",
"target": "ES2018",
"module": "commonjs",
"moduleResolution": "node",
"resolveJsonModule": true,
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import * as avro from "avsc";
// - [Remaining bytes: Avro payload (in general, format-specific payload)]
// - Avro Binary Encoding
// - NOT Avro Object Container File, which includes the schema and defeats
// the purpose of this serialzer to move the schema out of the message
// the purpose of this serializer to move the schema out of the message
// payload and into the schema registry.
//
const FORMAT_INDICATOR = 0;
Expand Down Expand Up @@ -137,7 +137,7 @@ export class SchemaRegistryAvroSerializer {

const format = buffer.readUInt32BE(0);
if (format !== FORMAT_INDICATOR) {
throw new TypeError(`Buffer has unknown format indicator: 0x${format.toString(16)}`);
throw new TypeError(`Buffer has unknown format indicator: 0x${format.toString(16)}.`);
}

const schemaIdBuffer = buffer.slice(SCHEMA_ID_OFFSET, PAYLOAD_OFFSET);
Expand Down Expand Up @@ -167,6 +167,10 @@ export class SchemaRegistryAvroSerializer {
}

const schemaResponse = await this.registry.getSchemaById(schemaId);
if (!schemaResponse) {
throw new Error(`Schema with ID '${schemaId}' not found.`);
}

if (!schemaResponse.serializationType.match(/^avro$/i)) {
throw new Error(
`Schema with ID '${schemaResponse.id}' has serialization type '${schemaResponse.serializationType}', not 'avro'.`
Expand Down Expand Up @@ -195,11 +199,20 @@ export class SchemaRegistryAvroSerializer {
content: schema
};

const schemaIdResponse = this.autoRegisterSchemas
? await this.registry.registerSchema(description)
: await this.registry.getSchemaId(description);
let id: string;
if (this.autoRegisterSchemas) {
id = (await this.registry.registerSchema(description)).id;
} else {
const response = await this.registry.getSchemaId(description);
if (!response) {
throw new Error(
`Schema '${description.name}' not found in registry group '${description.group}', or not found to have matching content.`
);
}
id = response.id;
}

return this.cache(schemaIdResponse.id, schema, avroType);
return this.cache(id, schema, avroType);
}

private cache(id: string, schema: string, type: avro.Type): CacheEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ describe("SchemaRegistryAvroSerializer", function() {
);
});

it("rejects serialization when schema is not found", async () => {
const serializer = await createTestSerializer(false);
const schema = JSON.stringify({
type: "record",
name: "NeverRegistered",
namespace: "my.example",
fields: [{ name: "count", type: "int" }]
});
await assert.isRejected(serializer.serialize({ count: 42 }, schema), /not found/);
});

it("rejects deserialization when schema is not found", async () => {
const serializer = await createTestSerializer(false);
const payload = testAvroType.toBuffer(testValue);
const buffer = Buffer.alloc(36 + payload.length);
buffer.write(testSchemaIds[1], 4, 32, "utf-8");
payload.copy(buffer, 36);
await assert.isRejected(serializer.deserialize(buffer), /not found/);
});

it("serializes to the expected format", async () => {
const registry = createTestRegistry();
const schemaId = await registerTestSchema(registry);
Expand Down Expand Up @@ -124,7 +144,7 @@ describe("SchemaRegistryAvroSerializer", function() {
serializer = await createTestSerializer(false);
assert.deepStrictEqual(await serializer.deserialize(buffer), testValue);

// thow away serializer again and cover getSchemaId instead of registerSchema
// throw away serializer again and cover getSchemaId instead of registerSchema
serializer = await createTestSerializer(false);
assert.deepStrictEqual(await serializer.serialize(testValue, testSchema), buffer);
});
Expand Down Expand Up @@ -221,19 +241,14 @@ function createTestRegistry(neverLive = false): SchemaRegistry {
async function getSchemaId(
schema: SchemaDescription,
_options?: GetSchemaIdOptions
): Promise<SchemaId> {
const result = mapByContent.get(schema.content);
if (!result) {
throw new Error("No such schema is registered.");
}
return result;
): Promise<SchemaId | undefined> {
return mapByContent.get(schema.content);
}

async function getSchemaById(id: string, _options?: GetSchemaByIdOptions): Promise<Schema> {
const result = mapById.get(id);
if (!result) {
throw new Error("No such schema is registered.");
}
return result;
async function getSchemaById(
id: string,
_options?: GetSchemaByIdOptions
): Promise<Schema | undefined> {
return mapById.get(id);
}
}
8 changes: 6 additions & 2 deletions sdk/schemaregistry/schema-registry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ const description = {
}

const found = await client.getSchemaId(description);
console.log(`Got schema ID=${found.id}`);
if (found) {
console.log(`Got schema ID=${found.id}`);
}
```

### Get content of existing schema by ID
Expand All @@ -115,7 +117,9 @@ const { SchemaRegistryClient } = require("@azure/schema-registry");

const client = new SchemaRegistryClient("<endpoint>", new DefaultAzureCredential());
const foundSchema = await client.getSchemaById("<id>");
console.log(`Got schema content=${foundSchema.content}`);
if (foundSchema) {
console.log(`Got schema content=${foundSchema.content}`);
}
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ export interface SchemaId {

// @public
export interface SchemaRegistry {
getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema>;
getSchemaId(schema: SchemaDescription, options?: GetSchemaIdOptions): Promise<SchemaId>;
getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema | undefined>;
getSchemaId(schema: SchemaDescription, options?: GetSchemaIdOptions): Promise<SchemaId | undefined>;
registerSchema(schema: SchemaDescription, options?: RegisterSchemaOptions): Promise<SchemaId>;
}

// @public
export class SchemaRegistryClient implements SchemaRegistry {
constructor(endpoint: string, credential: TokenCredential, options?: SchemaRegistryClientOptions);
readonly endpoint: string;
getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema>;
getSchemaId(schema: SchemaDescription, options?: GetSchemaIdOptions): Promise<SchemaId>;
getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema | undefined>;
getSchemaId(schema: SchemaDescription, options?: GetSchemaIdOptions): Promise<SchemaId | undefined>;
registerSchema(schema: SchemaDescription, options?: RegisterSchemaOptions): Promise<SchemaId>;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ export async function main() {
const registered = await client.registerSchema(schemaDescription);
console.log(`Registered schema with ID=${registered.id}`);

// Get ID for exisiting schema by its description.
// Get ID for existing schema by its description.
// Note that this would throw if it had not been previously registered.
const found = await client.getSchemaId(schemaDescription);
console.log(`Got schema ID=${found.id}`);
if (found) {
console.log(`Got schema ID=${found.id}`);
}

// Get content of existing schema by its ID
const foundSchema = await client.getSchemaById(registered.id);
console.log(`Got schema content=${foundSchema.content}`);
if (foundSchema) {
console.log(`Got schema content=${foundSchema.content}`);
}
}

main().catch((err) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
"dependencies": {
"@azure/schema-registry": "next",
"dotenv": "latest",
"@azure/identity": "^1.1.0"
"@azure/identity": "2.0.0-beta.3"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ async function main() {
const registered = await client.registerSchema(schemaDescription);
console.log(`Registered schema with ID=${registered.id}`);

// Get ID for exisiting schema by its description.
// Get ID for existing schema by its description.
// Note that this would throw if it had not been previously registered.
const found = await client.getSchemaId(schemaDescription);
console.log(`Got schema ID=${found.id}`);
if (found) {
console.log(`Got schema ID=${found.id}`);
}

// Get content of existing schema by its ID
const foundSchema = await client.getSchemaById(registered.id);
console.log(`Got schema content=${foundSchema.content}`);
if (foundSchema) {
console.log(`Got schema content=${foundSchema.content}`);
}
}

main().catch((err) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ npm run build
4. Run whichever samples you like (note that some samples may require additional setup, see the table above):

```bash
node dist/schemaRegistrySample.ts
node dist/schemaRegistrySample.js
```

Alternatively, run a single sample with the correct environment variables set (setting up the `.env` file is not required if you do this), for example (cross-platform):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"dependencies": {
"@azure/schema-registry": "next",
"dotenv": "latest",
"@azure/identity": "^1.1.0"
"@azure/identity": "2.0.0-beta.3"
},
"devDependencies": {
"typescript": "~4.2.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ export async function main() {
const registered = await client.registerSchema(schemaDescription);
console.log(`Registered schema with ID=${registered.id}`);

// Get ID for exisiting schema by its description.
// Get ID for existing schema by its description.
// Note that this would throw if it had not been previously registered.
const found = await client.getSchemaId(schemaDescription);
console.log(`Got schema ID=${found.id}`);
if (found) {
console.log(`Got schema ID=${found.id}`);
}

// Get content of existing schema by its ID
const foundSchema = await client.getSchemaById(registered.id);
console.log(`Got schema content=${foundSchema.content}`);
if (foundSchema) {
console.log(`Got schema content=${foundSchema.content}`);
}
}

main().catch((err) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"compilerOptions": {
"target": "ES6",
"target": "ES2018",
"module": "commonjs",
"moduleResolution": "node",
"resolveJsonModule": true,
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
Expand Down
11 changes: 7 additions & 4 deletions sdk/schemaregistry/schema-registry/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,18 @@ export interface SchemaRegistry {
* content.
*
* @param schema - Schema to match.
* @returns Matched schema's ID.
* @returns Matched schema's ID or undefined if no matching schema was found.
*/
getSchemaId(schema: SchemaDescription, options?: GetSchemaIdOptions): Promise<SchemaId>;
getSchemaId(
schema: SchemaDescription,
options?: GetSchemaIdOptions
): Promise<SchemaId | undefined>;

/**
* Gets an existing schema by ID.
*
* @param id - Unique schema ID.
* @returns Schema with given ID.
* @returns Schema with given ID or undefined if no schema was found with the given ID.
*/
getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema>;
getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema | undefined>;
}
50 changes: 33 additions & 17 deletions sdk/schemaregistry/schema-registry/src/schemaRegistryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,44 @@ export class SchemaRegistryClient implements SchemaRegistry {
* content.
*
* @param schema - Schema to match.
* @returns Matched schema's ID.
* @returns Matched schema's ID or undefined if no matching schema was found.
*/
async getSchemaId(schema: SchemaDescription, options?: GetSchemaIdOptions): Promise<SchemaId> {
const response = await this.client.schema.queryIdByContent(
schema.group,
schema.name,
schema.serializationType,
schema.content,
options
);
return convertSchemaIdResponse(response);
async getSchemaId(
schema: SchemaDescription,
options?: GetSchemaIdOptions
): Promise<SchemaId | undefined> {
try {
const response = await this.client.schema.queryIdByContent(
schema.group,
schema.name,
schema.serializationType,
schema.content,
options
);
return convertSchemaIdResponse(response);
} catch (error) {
if (typeof error === "object" && error?.statusCode === 404) {
return undefined;
}
throw error;
}
}

/**
* Gets the ID of an existing schema with matching name, group, type, and
* content.
* Gets an existing schema by ID.
*
* @param schema - Schema to match.
* @returns Matched schema's ID.
* @param id - Unique schema ID.
* @returns Schema with given ID or undefined if no schema was found with the given ID.
*/
async getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema> {
const response = await this.client.schema.getById(id, options);
return convertSchemaResponse(response);
async getSchemaById(id: string, options?: GetSchemaByIdOptions): Promise<Schema | undefined> {
try {
const response = await this.client.schema.getById(id, options);
return convertSchemaResponse(response);
} catch (error) {
if (typeof error === "object" && error?.statusCode === 404) {
return undefined;
}
throw error;
}
}
}
Loading

0 comments on commit 9e856ac

Please sign in to comment.