Skip to content

Commit

Permalink
Merge pull request #1 from recursethis/fork/main
Browse files Browse the repository at this point in the history
Added Protobuf Support
  • Loading branch information
recursethis authored Feb 26, 2024
2 parents f4a1a74 + c01b697 commit cf2e95d
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 10 deletions.
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Datagen CLI

This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Avro format or to Postgres.
This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Protobuf format or Avro format or to Postgres.

The benefits of using this datagen tool are:
- You can specify what values are generated using the expansive [FakerJS API](https://fakerjs.dev/api/) to craft data that more faithfully imitates your use case. This allows you to more easily apply business logic downstream.
Expand Down Expand Up @@ -85,7 +85,7 @@ Fake Data Generator
Options:
-V, --version output the version number
-s, --schema <char> Schema file to use
-f, --format <char> The format of the produced data (choices: "json", "avro", "postgres", "webhook", default: "json")
-f, --format <char> The format of the produced data (choices: "json", "avro", "postgres", "webhook", "proto", default: "json")
-n, --number <char> Number of records to generate. For infinite records, use -1 (default: "10")
-c, --clean Clean (delete) Kafka topics and schema subjects previously created
-dr, --dry-run Dry run (no data will be produced to Kafka)
Expand Down Expand Up @@ -213,7 +213,11 @@ Here is the general syntax for a JSON input schema:
{
"_meta": {
"topic": "<my kafka topic>",
"key": "<field to be used for kafka record key>" ,
"key": "<field to be used for kafka record key>" ,
"proto": {
"dir": "<directory with protobuf schemas>",
"schema": "<protobfuf message schema name>"
},
"relationships": [
{
"topic": "<topic for dependent dataset>",
Expand Down
2 changes: 1 addition & 1 deletion datagen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ program
.requiredOption('-s, --schema <char>', 'Schema file to use')
.addOption(
new Option('-f, --format <char>', 'The format of the produced data')
.choices(['json', 'avro', 'postgres', 'webhook'])
.choices(['json', 'avro', 'postgres', 'webhook', 'proto'])
.default('json')
)
.addOption(
Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
"dependencies": {
"@avro/types": "^1.0.25",
"@faker-js/faker": "^7.6.0",
"@faker-js/faker": "^8.0.0",
"@kafkajs/confluent-schema-registry": "^3.3.0",
"@types/node": "^18.14.6",
"arg": "^5.0.2",
Expand All @@ -53,7 +53,9 @@
"dotenv": "^16.0.2",
"kafkajs": "^2.2.3",
"node-sql-parser": "^4.6.1",
"pg": "^8.11.0"
"pg": "^8.11.0",
"protobufjs": "^6.11.4",
"glob": "10.3.10"
},
"devDependencies": {
"@types/jest": "^29.4.0",
Expand Down
87 changes: 87 additions & 0 deletions src/formats/protoFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { OutputFormat } from "./outputFormat";
import protobuf from "protobufjs";
import alert from "cli-alerts";
import { globSync } from "glob";

export class ProtoFormat implements OutputFormat {
private schemas: any = {};
private schemaFiles: Set<string>;

static async getProtoSchemas(megaRecord: any, protoSchemaFiles: string[]) {

if (!protoSchemaFiles || protoSchemaFiles.length === 0) {
protoSchemaFiles = [];
protoSchemaFiles.push(...(await ProtoFormat.getProtoSchemaFiles(megaRecord)));
}

const protoSchemas = {};
const protoRoot = protobuf.loadSync(protoSchemaFiles);
for (const topic in megaRecord) {

const protoSchema = {};
try {
protoSchema["messageType"] = protoRoot.lookupType(megaRecord[topic].schema);
protoSchema["name"] = topic
protoSchema["namespace"] = megaRecord[topic].schema

if (global.debug) {
alert({
type: `success`,
name: `Proto Schema for topic ${topic}:`,
msg: `\n ${JSON.stringify(protoSchema, null, 2)}`
});
}

protoSchemas[topic] = protoSchema;
} catch (error) {
alert({
type: `error`,
name: `protobuf lookup type error for schema ${megaRecord[topic].schema}`,
msg: `${error}`
});
process.exit(1);

}
}

return protoSchemas;
}

static async getProtoSchemaFiles(megaRecord: any) {
const protoFiles = new Set<string>();
for (const topic in megaRecord) {
(await ProtoFormat.getProtoSchemaFilesSync(megaRecord[topic].schemaDir)).forEach(file => protoFiles.add(file));
}
return protoFiles;
}

static async getProtoSchemaFilesSync(directory: string) {
if (!directory) {
return [];
}
return globSync(directory + (directory.endsWith("/") ? "" : "/") + "**/*.proto");
}

async register(megaRecord: any): Promise<void> {
this.schemaFiles = await ProtoFormat.getProtoSchemaFiles(megaRecord);
this.schemas = await ProtoFormat.getProtoSchemas(megaRecord, Array.from(this.schemaFiles));
}

async encode(record: any, topic: string): Promise<Buffer> {
const messageType = this.schemas[topic]['messageType'];

// check if the message is valid
const error = messageType.verify(record);
if (global.debug && error) {
alert({
type: `warning`,
name: `${record} with ${this.schemas[topic]['namespace']} is not valid`,
msg: `${error}`
});
}
// if the message is not valid, convert plain object
const message = error ? messageType.fromObject(record) : messageType.create(record);

return messageType.encode(message).finish();
}
}
15 changes: 12 additions & 3 deletions src/kafkaDataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { AvroFormat } from './formats/avroFormat.js';
import { JsonFormat } from './formats/jsonFormat.js';
import sleep from './utils/sleep.js';
import asyncGenerator from './utils/asyncGenerator.js';
import { accessRecordKey } from './utils/recordKey.js';
import { ProtoFormat } from "./formats/protoFormat.js";


export default async function kafkaDataGenerator({
format,
Expand All @@ -26,6 +29,8 @@ export default async function kafkaDataGenerator({
outputFormat = await AvroFormat.create();
} else if (format === 'json') {
outputFormat = new JsonFormat();
} else if (format === 'proto') {
outputFormat = new ProtoFormat();
}

producer = await KafkaProducer.create(outputFormat);
Expand All @@ -37,14 +42,18 @@ export default async function kafkaDataGenerator({

if (iteration === 0) {
await producer?.prepare(megaRecord);
if (global.debug && global.dryRun && format === 'avro') {
await AvroFormat.getAvroSchemas(megaRecord);
if (global.debug && global.dryRun) {
if (format === 'avro') {
await AvroFormat.getAvroSchemas(megaRecord);
} else if(format === 'proto') {
await ProtoFormat.getProtoSchemas(megaRecord, []);
}
}
}

for (const topic in megaRecord) {
for await (const record of megaRecord[topic].records) {
let key = null;
let key = accessRecordKey(megaRecord[topic].key, record)
if (record[megaRecord[topic].key]) {
key = record[megaRecord[topic].key];
}
Expand Down
6 changes: 6 additions & 0 deletions src/schemas/generateMegaRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ export async function generateMegaRecord(schema: any) {
});
}

// specify the proto field for the topic
if ("proto" in _meta) {
megaRecord[topic]["schemaDir"] = _meta.proto.dir;
megaRecord[topic]["schema"] = _meta.proto.schema;
}

// for records that already exist, generate values
// for every field that doesn't already have a value.
megaRecord[topic]["key"] = _meta.key
Expand Down
3 changes: 3 additions & 0 deletions src/utils/recordKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function accessRecordKey(path: string, record: any): any {
return path?.split('.').reduce((level, key) => level && level[key], record);
}
25 changes: 25 additions & 0 deletions tests/datagen.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ describe('Schema Parsing Tests', () => {
expect(output).toContain('Dry run: Skipping record production...');
expect(output).toContain('Stopping the data generator');
});
test('should parse json schema with proto definitions', () => {
const schema = './tests/schema.json';
const output = datagen(`-s ${schema} -n 2 -f proto`);
expect(output).toContain('Parsing JSON schema...');
expect(output).toContain('Dry run: Skipping record production...');
expect(output).toContain('Stopping the data generator');
});
});


Expand All @@ -54,6 +61,24 @@ describe('Test missing schema file', () => {
expect(error.status).toBe(1);
}
});
test('should return error if proto schema is not defined', () => {
const schema = './tests/iterationIndex.json'
try {
const output = datagen(`-s ${schema} -n 2 -f proto`);
} catch (error) {
expect(error.stdout.toString()).toContain(`Error: no such type`);
expect(error.status).toBe(1);
}
});
test('should return error if proto schema is not defined', () => {
const schema = './tests/iterationIndex.json'
try {
const output = datagen(`-s ${schema} -n 2 -f proto`);
} catch (error) {
expect(error.stdout.toString()).toContain(`Error: no such type`);
expect(error.status).toBe(1);
}
});
});

describe('Test record size', () => {
Expand Down
6 changes: 5 additions & 1 deletion tests/iterationIndex.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
{
"_meta": {
"topic": "air_quality",
"key": "id"
"key": "id",
"proto": {
"dir": "./tests",
"schema": "datagen.dne"
}
},
"id": "iteration.index",
"timestamp": "faker.date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')",
Expand Down
12 changes: 12 additions & 0 deletions tests/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"_meta": {
"topic": "mz_datagen_users",
"key": "id",
"proto": {
"dir": "./tests",
"schema": "datagen.User"
},
"relationships": [
{
"topic": "mz_datagen_posts",
Expand All @@ -28,6 +32,10 @@
"_meta": {
"topic": "mz_datagen_posts",
"key": "id",
"proto": {
"dir": "./tests",
"schema": "datagen.Post"
},
"relationships": [
{
"topic": "mz_datagen_comments",
Expand All @@ -46,6 +54,10 @@
"_meta": {
"topic": "mz_datagen_comments",
"key": "id",
"proto": {
"dir": "./tests",
"schema": "datagen.Comment"
},
"relationships": [
{
"topic": "mz_datagen_users",
Expand Down
38 changes: 38 additions & 0 deletions tests/schema.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
syntax = "proto3";

package datagen;

// Definition for the User data
message User {
message Nested {
string phone = 1;
string website = 2;
}

int32 id = 1; // Assuming IDs are integers
string name = 2;
string email = 3;
string phone = 4;
string website = 5;
string city = 6;
string company = 7;
Nested nested = 8; // Nested message for phone and website
}

// Definition for the Post data
message Post {
int32 id = 1;
int32 user_id = 2; // Assuming this is a reference to a User ID
string title = 3;
string body = 4;
}

// Definition for the Comment data
message Comment {
int32 id = 1;
int32 user_id = 2; // Assuming this is a reference to a User ID
int32 post_id = 3; // Assuming this is a reference to a Post ID
string body = 4;
int32 views = 5; // Assuming views is an integer
int32 status = 6; // Assuming status is an integer representing some enum
}

0 comments on commit cf2e95d

Please sign in to comment.