Skip to content

Commit

Permalink
Develop to Release-1.0.0-GA (Sanketika-Obsrv#72) (Sanketika-Obsrv#73)
Browse files Browse the repository at this point in the history
* Issue Sanketika-Obsrv#4 feat: enhance the ingest event to add obsrv and source meta

* issue#223: feat: Updated API endpoints with verbs

* issue#223: fix: Test case modification

* issue Sanketika-Obsrv#84 : fix: API swagger doc update

* Sanketika-Obsrv#99 fix: upgrade packages to fix vulnerabilities

* Command-service Vulnerabilities fixes (Sanketika-Obsrv#56)

* Release 1.3.0 into Main (Sanketika-Obsrv#53)

* issue#223: feat: Updated API endpoints with verbs

* issue#223: fix: Test case modification

* issue Sanketika-Obsrv#84 : fix: API swagger doc update

* Sanketika-Obsrv#99 fix: upgrade packages to fix vulnerabilities

---------





* command-service vulnerabilities fixes

---------






* Sanketika-Obsrv#90 fix: Resolve API Issues (Sanketika-Obsrv#52)

* Sanketika-Obsrv#90 fix: format error messages, code cleanup

* Sanketika-Obsrv#90 fix: validate extraction config during ingest

* Sanketika-Obsrv#90 fix: add test cases for extraction key validation

* Sanketika-Obsrv#90 fix: move error handler to helpers for standard handling

* Sanketika-Obsrv#90 fix: update telemetry audit event set function (Sanketika-Obsrv#58)

* Build and deployment (Sanketika-Obsrv#57)

* build api image

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* build api image

* Update build_and_deploy.yaml

* build api image

* build api image

* build api image

* build and deploy api image

* build and deploy api image

* build and deploy api image

* build and deploy of api service

* build and deploy of api service

* modify docker file

* modify docker file

* modify docker file

* update build and deployment

* update build and deployment

* Update build_and_deploy.yaml

* api service build and deployment

* api service build and deployment

* feat: obsrv api service build and deployment github actions configuration

* #0 fix: update the tag condition in actions

---------







* #0 fix: add azure exhaust support (Sanketika-Obsrv#44)

* #0 fix: add azure exhaust support

* #0 fix: update azure exhaust service without async

* develop into release 1.3.0 (Sanketika-Obsrv#59)

* Issue Sanketika-Obsrv#4 feat: enhance the ingest event to add obsrv and source meta

* Sanketika-Obsrv#90 fix: Resolve API Issues (Sanketika-Obsrv#52)

* Sanketika-Obsrv#90 fix: format error messages, code cleanup

* Sanketika-Obsrv#90 fix: validate extraction config during ingest

* Sanketika-Obsrv#90 fix: add test cases for extraction key validation

* Sanketika-Obsrv#90 fix: move error handler to helpers for standard handling

* Sanketika-Obsrv#90 fix: update telemetry audit event set function (Sanketika-Obsrv#58)

* Build and deployment (Sanketika-Obsrv#57)

* build api image

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* build api image

* Update build_and_deploy.yaml

* build api image

* build api image

* build api image

* build and deploy api image

* build and deploy api image

* build and deploy api image

* build and deploy of api service

* build and deploy of api service

* modify docker file

* modify docker file

* modify docker file

* update build and deployment

* update build and deployment

* Update build_and_deploy.yaml

* api service build and deployment

* api service build and deployment

* feat: obsrv api service build and deployment github actions configuration

* #0 fix: update the tag condition in actions

---------







* #0 fix: add azure exhaust support (Sanketika-Obsrv#44)

* #0 fix: add azure exhaust support

* #0 fix: update azure exhaust service without async

---------









* Sanketika-Obsrv#126 Feat: Add Querying on Aggregated datasources (Sanketika-Obsrv#60)

* Sanketika-Obsrv#126 feat: enable querying on aggregated datasources

* Sanketika-Obsrv#126 feat: Fix validation issues and update routes for aggregate queries

* Sanketika-Obsrv#126 fix: remove unused methods

* Sanketika-Obsrv#126 fix: remove aggregates from endpoint

* Sanketika-Obsrv#126 fix: add validation for granularity options

* Sanketika-Obsrv#126 fix: Remove unused routes and validators

* Sanketika-Obsrv#126 feat: add new property to datasources structure and update rollup querying

* #305 Feat: Add and Update dataset status (Sanketika-Obsrv#62)

* #305 feat: add retire apis and update dataset status

* #305 fix: remove unused import

* #305 fix: add command service api call to restart jobs

* #305 fix: fix command service payload

* #305 fix: update api doc

* #305 fix: clear unused code

* #305 fix: clear unused code

* #305 fix: undo change submit ingestion

* #305 fix: update test case to use enums

* #305 fix: add test env file for test cases

* #305 fix: add enum for string in test case

* Issue #305 feat: exclude system-events data source in validation (Sanketika-Obsrv#63)

* Issue Sanketika-Obsrv#165 feat: generate AUDIT events (Sanketika-Obsrv#71)

---------

Co-authored-by: Manjunath Davanam <manjunath@sanketika.in>
Co-authored-by: shiva-rakshith <rakshiths@sanketika.in>
Co-authored-by: Jerald <jeraldj@sanketika.in>
Co-authored-by: harishkumar gangula <harish@sanketika.in>
Co-authored-by: Sowmya N Dixit <sowmyadixit7@gmail.com>
Co-authored-by: Manoj Krishna <92361832+ManojKrishnaChintaluri@users.noreply.github.com>
Co-authored-by: ManojCKrishna <Manoj.Chintaluri@syngenta.com>
Co-authored-by: Manoj Krishna <92361832+ManojKrishnaChintauri@users.noreply.github.com>
Co-authored-by: Ravi Mula <ravismula@users.noreply.github.com>
Co-authored-by: Ravinder Kumar <yravinderkumar33@gmail.com>
  • Loading branch information
11 people authored Dec 26, 2023
1 parent 319821f commit b5243a8
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 89 deletions.
4 changes: 2 additions & 2 deletions api-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ResponseHandler } from "./helpers/ResponseHandler";
import { loadExtensions } from "./managers/Extensions";
import { router } from "./routes/Router";
import bodyParser from "body-parser";
import { processTelemetryAuditEvent } from "./services/telemetry";
import { interceptAuditEvents } from "./services/telemetry";
import { queryService } from "./routes/Router";
import { routesConfig } from "./configs/RoutesConfig";
import { QueryValidator } from "./validators/QueryValidator";
Expand All @@ -25,7 +25,7 @@ app.set("queryServices", services);

loadExtensions(app)
.finally(() => {
// app.use(processTelemetryAuditEvent()) // uncomment if above extension is not loaded
app.use(interceptAuditEvents())
app.use("/", router);
app.use("*", ResponseHandler.routeNotFound);
app.use(ResponseHandler.errorResponse);
Expand Down
7 changes: 5 additions & 2 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// These configurations provide settings and values for various aspects of dataset management, data ingestion, and table configurations in a system.

const env = process.env.system_env || "local"

export const config = {
"env": process.env.system_env || "local",
"env": env,
"api_port": process.env.api_port || 3000,
"body_parser_limit": process.env.body_parser_limit || "100mb",
"version": "1.0",
Expand Down Expand Up @@ -60,7 +63,7 @@ export const config = {
"redis_port": process.env.redis_port || 6379
},
"exclude_datasource_validation": process.env.exclude_datasource_validation ? process.env.exclude_datasource_validation.split(",") : ["system-stats", "failed-events-summary", "masterdata-system-stats", "system-events"], // list of datasource names to skip validation while calling query API
"telemetry_dataset": process.env.telemetry_dataset || "telemetry",
"telemetry_dataset": process.env.telemetry_dataset || `${env}.system.telemetry.events`,
"table_names": { // Names of all tables available for CRUD operations
"datasets": "datasets",
"datasources": "datasources",
Expand Down
15 changes: 12 additions & 3 deletions api-service/src/data/telemetryActions.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
export default {
"createDataset": "dataset:create",
"updateDataset": "dataset:update",
"readDataset": "dataset:read",
"listDatasets": "dataset:list",
"createDatasource": "datasource:create",
"updateDatasource": "datasource:update",
"getDatasource": "datasource:get",
"listDatasource": "datasource:list",
"createTransformation": "transformation:create",
"updateTransformation": "transformation:update",
"createSystemSetting": "systemSetting: create",
"updateSystemSetting": "systemSetting:update",
"createDatasetSourceConfig": "datasetSourceConfig:create",
"updateDatasetSourceConfig": "datasetSourceConfig:update"
"updateDatasetSourceConfig": "datasetSourceConfig:update",
"getatasetSourceConfig": "datasetSourceConfig:get",
"listDatasetSourceConfig": "datasetSourceConfig:list",
"nativeQuery": "dataset:query:native",
"sqlQuery": "dataset:query:sql",
"ingestEvents": "dataset:events:ingest",
"submitIngestionSpec": "datasource:ingestion:submit",
"datasetExhaust": "dataset:exhaust:get"
}
43 changes: 20 additions & 23 deletions api-service/src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ import promEntities from "../helpers/prometheus/entities";
import { metricsScrapeHandler } from "../helpers/prometheus";

export const validationService = new ValidationService();

export const queryService = new QueryService(new HTTPConnector(`${config.query_api.druid.host}:${config.query_api.druid.port}`));

export const kafkaConnector = new KafkaConnector()

export const dbConnector = new DbConnector(config.db_connector_config);

export const datasourceService = new DataSourceService(dbConnector, config.table_names.datasources);
export const datasetService = new DatasetService(dbConnector, config.table_names.datasets);
export const datasetSourceConfigService = new DatasetSourceConfigService(dbConnector, config.table_names.datasetSourceConfig);
Expand All @@ -37,37 +33,38 @@ export const globalCache: any = new Map()
export const router = express.Router()
dbConnector.init()
/** Query API(s) */
router.post([`${routesConfig.query.native_query.path}`, `${routesConfig.query.native_query_with_params.path}`,], ResponseHandler.setApiId(routesConfig.query.native_query.api_id), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeNativeQuery);
router.post([`${routesConfig.query.sql_query.path}`, `${routesConfig.query.sql_query_with_params.path}`,], ResponseHandler.setApiId(routesConfig.query.sql_query.api_id), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeSqlQuery);

router.post([`${routesConfig.query.native_query.path}`, `${routesConfig.query.native_query_with_params.path}`,], ResponseHandler.setApiId(routesConfig.query.native_query.api_id), telemetryAuditStart({ action: telemetryActions.nativeQuery, operationType: OperationType.GET }), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeNativeQuery);
router.post([`${routesConfig.query.sql_query.path}`, `${routesConfig.query.sql_query_with_params.path}`,], ResponseHandler.setApiId(routesConfig.query.sql_query.api_id), telemetryAuditStart({ action: telemetryActions.sqlQuery, operationType: OperationType.GET }), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeSqlQuery);

/** Ingestor API */
router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesConfig.data_ingest.api_id), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.create);
router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesConfig.data_ingest.api_id), telemetryAuditStart({ action: telemetryActions.ingestEvents, operationType: OperationType.CREATE }), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.create);

/** Dataset APIs */
router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), validationService.validateRequestBody, telemetryAuditStart({ action: telemetryActions.createDataset, operationType: OperationType.CREATE }), datasetService.save);
router.patch(`${routesConfig.config.dataset.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.update.api_id), validationService.validateRequestBody, telemetryAuditStart({ action: telemetryActions.updateDataset, operationType: OperationType.UPDATE }), datasetService.update);
router.get(`${routesConfig.config.dataset.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.read.api_id), datasetService.read);
router.post(`${routesConfig.config.dataset.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.list.api_id), validationService.validateRequestBody, datasetService.list);
router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), telemetryAuditStart({ action: telemetryActions.createDataset, operationType: OperationType.CREATE }), validationService.validateRequestBody, datasetService.save);
router.patch(`${routesConfig.config.dataset.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.update.api_id), telemetryAuditStart({ action: telemetryActions.updateDataset, operationType: OperationType.UPDATE }), validationService.validateRequestBody, datasetService.update);
router.get(`${routesConfig.config.dataset.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.read.api_id), telemetryAuditStart({ action: telemetryActions.readDataset, operationType: OperationType.GET }), datasetService.read);
router.post(`${routesConfig.config.dataset.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.list.api_id), telemetryAuditStart({ action: telemetryActions.listDatasets, operationType: OperationType.LIST }), validationService.validateRequestBody, datasetService.list);

/** Dataset Source Config APIs */
router.post(`${routesConfig.config.dataset_source_config.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.save.api_id), validationService.validateRequestBody, telemetryAuditStart({ action: telemetryActions.createDatasetSourceConfig, operationType: OperationType.CREATE }), datasetSourceConfigService.save);
router.patch(`${routesConfig.config.dataset_source_config.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.update.api_id), validationService.validateRequestBody, telemetryAuditStart({ action: telemetryActions.updateDatasetSourceConfig, operationType: OperationType.UPDATE }), datasetSourceConfigService.update);
router.get(`${routesConfig.config.dataset_source_config.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.read.api_id), datasetSourceConfigService.read);
router.post(`${routesConfig.config.dataset_source_config.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.list.api_id), validationService.validateRequestBody, datasetSourceConfigService.list);
router.post(`${routesConfig.config.dataset_source_config.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.save.api_id), telemetryAuditStart({ action: telemetryActions.createDatasetSourceConfig, operationType: OperationType.CREATE }), validationService.validateRequestBody, datasetSourceConfigService.save);
router.patch(`${routesConfig.config.dataset_source_config.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.update.api_id), telemetryAuditStart({ action: telemetryActions.updateDatasetSourceConfig, operationType: OperationType.UPDATE }), validationService.validateRequestBody, datasetSourceConfigService.update);
router.get(`${routesConfig.config.dataset_source_config.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.read.api_id), telemetryAuditStart({ action: telemetryActions.getatasetSourceConfig, operationType: OperationType.GET }), datasetSourceConfigService.read);
router.post(`${routesConfig.config.dataset_source_config.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset_source_config.list.api_id), telemetryAuditStart({ action: telemetryActions.listDatasetSourceConfig, operationType: OperationType.LIST }), validationService.validateRequestBody, datasetSourceConfigService.list);

/** DataSource API(s) */
router.post(`${routesConfig.config.datasource.save.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.save.api_id), validationService.validateRequestBody, telemetryAuditStart({ action: telemetryActions.createDatasource, operationType: OperationType.CREATE }), datasourceService.save);
router.patch(`${routesConfig.config.datasource.update.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.update.api_id), validationService.validateRequestBody, telemetryAuditStart({ action: telemetryActions.updateDatasource, operationType: OperationType.UPDATE }), datasourceService.update);
router.get(`${routesConfig.config.datasource.read.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.read.api_id), datasourceService.read);
router.post(`${routesConfig.config.datasource.list.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.list.api_id), validationService.validateRequestBody, datasourceService.list);
router.post(`${routesConfig.config.datasource.save.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.save.api_id), telemetryAuditStart({ action: telemetryActions.createDatasource, operationType: OperationType.CREATE }), validationService.validateRequestBody, datasourceService.save);
router.patch(`${routesConfig.config.datasource.update.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.update.api_id), telemetryAuditStart({ action: telemetryActions.updateDatasource, operationType: OperationType.UPDATE }), validationService.validateRequestBody, datasourceService.update);
router.get(`${routesConfig.config.datasource.read.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.read.api_id), telemetryAuditStart({ action: telemetryActions.getDatasource, operationType: OperationType.GET }), datasourceService.read);
router.post(`${routesConfig.config.datasource.list.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.list.api_id), telemetryAuditStart({ action: telemetryActions.listDatasource, operationType: OperationType.LIST }), validationService.validateRequestBody, datasourceService.list);

/** Exhaust API(s) */
router.get(`${routesConfig.exhaust.path}`, ResponseHandler.setApiId(routesConfig.exhaust.api_id), validationService.validateRequestParams, exhaustService.getData);
router.get(`${routesConfig.exhaust.path}`, ResponseHandler.setApiId(routesConfig.exhaust.api_id), telemetryAuditStart({ action: telemetryActions.datasetExhaust, operationType: OperationType.GET }), validationService.validateRequestParams, exhaustService.getData);
router.get(`${routesConfig.prometheus.path}`, metricsScrapeHandler);

/*** Submit Ingestion API(s) */
router.post(`${routesConfig.submit_ingestion.path}`, ResponseHandler.setApiId(routesConfig.submit_ingestion.api_id), validationService.validateRequestBody, ingestorService.submitIngestion)
/*** Submit Ingestion API(s) */
router.post(`${routesConfig.submit_ingestion.path}`, ResponseHandler.setApiId(routesConfig.submit_ingestion.api_id), telemetryAuditStart({ action: telemetryActions.submitIngestionSpec, operationType: OperationType.CREATE }), validationService.validateRequestBody, ingestorService.submitIngestion)

/** Query Wrapper API(s) */
router.post(routesConfig.query_wrapper.sql_wrapper.path, ResponseHandler.setApiId(routesConfig.query_wrapper.sql_wrapper.api_id), onRequest({ entity: promEntities.data_out }), wrapperService.forwardSql)
router.post(routesConfig.query_wrapper.native_post.path, ResponseHandler.setApiId(routesConfig.query_wrapper.native_post.api_id), onRequest({ entity: promEntities.data_out }), wrapperService.forwardNative)
Expand Down
2 changes: 2 additions & 0 deletions api-service/src/services/ClientCloudService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { config as globalConfig } from '../configs/Config';
import CloudService from '../lib/client-cloud-services';
import moment from "moment";
import { DateRange } from '../models/ExhaustModels';
import { updateTelemetryAuditEvent } from './telemetry';

export class ClientCloudService {
private cloudProvider: string
Expand Down Expand Up @@ -40,6 +41,7 @@ export class ClientCloudService {
const { params } = req;
const { datasetId } = params;
const { type } = req.query;
updateTelemetryAuditEvent({ request: req, object: { id: datasetId, type: "dataset", ver: "1.0.0" } });
// Validations
if(type && globalConfig.exhaust_config.exclude_exhaust_types.includes(type.toString())) {
next({statusCode: 404, message: constants.RECORD_NOT_FOUND, errCode: httpStatus["404_NAME"],})
Expand Down
14 changes: 10 additions & 4 deletions api-service/src/services/DataSourceService.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import { Request, Response, NextFunction } from "express";
import _ from 'lodash'
import { Datasources } from "../helpers/Datasources";
import { findAndSetExistingRecord } from "./telemetry";
import { findAndSetExistingRecord, updateTelemetryAuditEvent } from "./telemetry";
import { DbUtil } from "../helpers/DbUtil";
import constants from "../resources/Constants.json";
import { ingestorService } from "../routes/Router";
import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler";
import { DatasetStatus, IConnector } from "../models/DatasetModels";

const telemetryObject = { id: null, type: "datasource", ver: "1.0.0" };

export class DataSourceService {
private table: string
private dbConnector: IConnector;
private dbUtil: DbUtil
private errorHandler: ErrorResponseHandler;

constructor(dbConnector: IConnector, table: string) {
this.dbConnector = dbConnector
this.table = table
Expand All @@ -23,6 +27,7 @@ export class DataSourceService {
try {
const datasources = new Datasources(req.body)
const payload: any = datasources.setValues()
updateTelemetryAuditEvent({ request: req, object: { ...telemetryObject, id: _.get(payload, 'id'), } });
await this.validateDatasource(payload)
await this.dbUtil.save(req, res, next, payload)
} catch (error: any) { this.errorHandler.handleError(req, res, next, error) }
Expand All @@ -32,14 +37,15 @@ export class DataSourceService {
const datasources = new Datasources(req.body)
const payload: Record<string, any> = datasources.setValues()
await this.validateDatasource(payload)
await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": payload.id }, object: { id: payload.id, type: "datasource" } });
await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": _.get(payload, 'id') }, object: { ...telemetryObject, id: _.get(payload, 'id') } });
await this.dbUtil.upsert(req, res, next, payload)
} catch (error: any) { this.errorHandler.handleError(req, res, next, error) }
}
public read = async (req: Request, res: Response, next: NextFunction) => {
try {
let status: any = req.query.status || DatasetStatus.Live
const id = req.params.datasourceId
updateTelemetryAuditEvent({ request: req, object: { ...telemetryObject, id } });
await this.dbUtil.read(req, res, next, { id, status })
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) }
}
Expand All @@ -54,13 +60,13 @@ export class DataSourceService {
if (_.isEmpty(datasetRecord)) {
throw constants.DATASET_NOT_FOUND;
}
if (!_.isUndefined(payload.datasource_ref) &&!_.isUndefined(payload.ingestion_spec.spec.dataSchema.dataSource) && payload.datasource_ref !== payload.ingestion_spec.spec.dataSchema.dataSource) {
if (!_.isUndefined(payload.datasource_ref) && !_.isUndefined(payload.ingestion_spec.spec.dataSchema.dataSource) && payload.datasource_ref !== payload.ingestion_spec.spec.dataSchema.dataSource) {
throw constants.INVALID_DATASOURCE_REF;
}
if (
!_.isUndefined(payload.dataset_id) && !_.isUndefined(payload.ingestion_spec) && datasetRecord.router_config.topic !== payload.ingestion_spec.spec.ioConfig.topic) {
throw constants.INVALID_TOPIC;
}
}

}
Loading

0 comments on commit b5243a8

Please sign in to comment.