Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1-RC to Main #259

Merged
merged 12 commits into from
Oct 22, 2024
137 changes: 91 additions & 46 deletions api-service/src/middlewares/RBAC_middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import { config } from "../configs/Config";
import _ from "lodash";
import userPermissions from "./userPermissions.json";
import httpStatus from "http-status";
import { userService } from "../services/UserService";
interface AccessControl {
apiGroups : {
apiGroups: {
[key: string]: string[];
},
roles : {
roles: {
[key: string]: string[];
}
}
Expand Down Expand Up @@ -42,58 +43,102 @@ const errorHandler = (statusCode: number, message: string, req: Request, res: Re
);
};


const checkAccess = (decoded: any, action: string, req: Request, res: Response) => {
if (decoded.roles) {
const hasAccess = decoded.roles.some((role: string) => {
const apiGroups = accessControl.roles[role];
return apiGroups?.some((apiGroup: string) =>
accessControl.apiGroups[apiGroup]?.includes(action)
);
});

if (hasAccess) {
return true;
} else {
const rolesWithAccess = Object.keys(accessControl.roles).filter(role => {
const apiGroups = accessControl.roles[role];
return apiGroups?.some(apiGroup => accessControl.apiGroups[apiGroup]?.includes(action));
});

const rolesMessage = rolesWithAccess.length > 0
? `The following roles have access to this action: ${rolesWithAccess.join(", ")}`
: "No roles have this action";

const errorMessage = `Access denied. User does not have permission to perform this action. ${rolesMessage}.`;
errorHandler(403, errorMessage, req, res);
return false;
}
}

errorHandler(403, "Access denied. User does not have permission to perform this action.", req, res);
return false;
};

const basicToken = (token: string, req: Request, res: Response, next: NextFunction) => {
try {
const decoded = jwt.verify(token, config.user_token_public_key);

if (!decoded || !_.isObject(decoded)) {
return errorHandler(401, "Token verification failed or invalid token", req, res);
}

(req as any).userID = decoded.id;
const action = (req as any).id;

if (checkAccess(decoded, action, req, res)) {
return next();
}
} catch (error) {
return errorHandler(401, "Token verification error", req, res);
}
};

const keycloakTokenVerify = async (token: string, req: Request, res: Response, next: NextFunction) => {
try {
const decoded = jwt.decode(token);
if (decoded && _.isObject(decoded)) {
(req as any).userID = decoded.sub;
const action = (req as any).id;
const userCondition = { id: decoded.sub };
const userDetails = ["roles", "user_name"];
const user = await userService.getUser(userCondition, userDetails);

if (!user) {
return errorHandler(404, "User not found", req, res);
}

if (checkAccess(user, action, req, res)) {
return next();
}
}
} catch (error) {
return errorHandler(401, "Token decode error", req, res);
}
};

export default {
name: "rbac:middleware",
handler: () => (req: Request, res: Response, next: NextFunction) => {
handler: () => async (req: Request, res: Response, next: NextFunction) => {
try {
if (_.lowerCase(config.is_RBAC_enabled) === "false") {
(req as any).userID = "SYSTEM";
next();
return next();
}

const token = req.get("x-user-token");
if (!token) {
return errorHandler(401, "No token provided", req, res);
}

const decoded = jwt.decode(token);
if (decoded && _.isObject(decoded) && decoded.roles) {
return basicToken(token, req, res, next);
} else {
const public_key = config.user_token_public_key;
const token = req.get("x-user-token");
if (!token) {
return errorHandler(401, "No token provided", req, res);
}
jwt.verify(token as string, public_key, (err, decoded) => {
if (err) {
return errorHandler(401, "Token verification failed", req, res);
}
if (decoded && _.isObject(decoded)) {
if (!decoded?.id) {
return errorHandler(401, "User ID is missing from the decoded token.", req, res);
}
(req as any).userID = decoded?.id;
const action = (req as any).id;
const hasAccess = decoded?.roles?.some((role: string) => {
const apiGroups = accessControl.roles[role];

if (!apiGroups) return false;

return apiGroups.some((apiGroup: string) =>
accessControl.apiGroups[apiGroup]?.includes(action)
);
});
if (!hasAccess) {
const rolesWithAccess = Object.keys(accessControl.roles).filter(role => {
const apiGroups = accessControl.roles[role];
if (!apiGroups) return false;
return apiGroups.some(apiGroup => accessControl.apiGroups[apiGroup]?.includes(action));
});
const rolesMessage = rolesWithAccess.length > 0
? `The following roles have access to this action: ${rolesWithAccess.join(", ")}`
: "No roles have this action";

const errorMessage = `Access denied. User does not have permission to perform this action. ${rolesMessage}.`;

return errorHandler(403, errorMessage, req, res);
}
next();
}
});
return await keycloakTokenVerify(token, req, res, next);
}
} catch (error) {
next(error);
return next(error);
}
},
};
49 changes: 49 additions & 0 deletions api-service/src/models/User.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { DataTypes } from "sequelize";
import { sequelize } from "../connections/databaseConnection";

export const User = sequelize.define("user", {
id: {
type: DataTypes.STRING,
primaryKey: true
},
user_name: {
type: DataTypes.STRING
},
password: {
type: DataTypes.STRING
},
first_name: {
type: DataTypes.STRING
},
last_name: {
type: DataTypes.STRING,
},
provider: {
type: DataTypes.STRING,
},
email_address: {
type: DataTypes.STRING,
},
mobile_number: {
type: DataTypes.STRING,
},
created_on: {
type: DataTypes.STRING,
},
last_updated_on: {
type: DataTypes.STRING,
},
roles: {
type: DataTypes.ARRAY(DataTypes.STRING),
defaultValue: ["viewer"]
},
status: {
type: DataTypes.STRING,
defaultValue: "active"
}
}, {
tableName: "oauth_users",
timestamps: true,
createdAt: "created_date",
updatedAt: "updated_date"
})
11 changes: 11 additions & 0 deletions api-service/src/services/UserService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { User } from "../models/User";

class UserService {

getUser = (where?: Record<string, any>, attributes?: string[]): Promise<any> => {
return User.findOne({ where, attributes, raw: true });
}

}

export const userService = new UserService();
13 changes: 8 additions & 5 deletions command-service/helm-charts/flink-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,14 @@ serviceMonitor:
port: prom

flink_jobs:
"kafka-connector-1-0-0":
"enabled": "false"
"connector_id": "kafka-connector-1.0.0"
"source": "kafka-connector-1.0.0"
"main_program": "kafka-connector-1.0.0.jar"
kafka-connector-1-0-0:
registry: sanketikahub
repository: flink-connectors
tag: 1.1.0
enabled: "false"
connector_id: "kafka-connector-1.0.0"
source: "kafka-connector-1.0.0"
main_program: "kafka-connector-1.0.0.jar"

commonAnnotations:
reloader.stakater.com/auto: "true"
57 changes: 31 additions & 26 deletions command-service/src/command/kafka_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from config import Config
from model.data_models import Action, ActionResponse, CommandPayload
from service.http_service import HttpService
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.admin import AdminClient, NewTopic, KafkaError, KafkaException
from command.dataset_command import DatasetCommand

class KafkaCommand(ICommand):
Expand All @@ -14,36 +14,41 @@ def __init__(self, config: Config, http_service: HttpService, dataset_command: D

def execute(self, command_payload: CommandPayload, action: Action):
result = None
if action == Action.CREATE_KAFKA_TOPIC.name:
print(
f"Invoking CREATE_KAFKA_TOPIC command for dataset_id {command_payload.dataset_id}..."
)
self.config_obj = Config()
dataset_id = command_payload.dataset_id
live_dataset, data_version = self.dataset_command._check_for_live_record(
dataset_id
)
topic = live_dataset.router_config['topic']
brokers = self.config_obj.find("kafka.brokers")
print(f"broker", brokers)
result = self.create_kafka_topic(topic, brokers, 1, 1)
return result
dataset_id = command_payload.dataset_id
live_dataset, data_version = self.dataset_command._check_for_live_record(dataset_id)
if live_dataset is None:
if action == Action.CREATE_KAFKA_TOPIC.name:
print(
f"Invoking CREATE_KAFKA_TOPIC command for dataset_id {dataset_id}..."
)
draft_dataset_record = self.dataset_command._get_draft_dataset(dataset_id)
topic = draft_dataset_record.get("router_config")["topic"]
broker = self.config.find("kafka.brokers")
num_partitions = self.config.find("kafka.no_of_partitions")
replication_factor = self.config.find("kafka.replication_factor")
print(f"topic is", topic)
result = self.create_kafka_topic(topic, broker, num_partitions, replication_factor)
return result
return ActionResponse(status="OK", status_code=200)


def create_kafka_topic(self, topic, broker, num_partitions, replication_factor):
admin_client = AdminClient({'bootstrap.servers': broker})
print(f"topic is",topic)
new_topic = [NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)]
errValue = ActionResponse(status="ERROR", status_code=500)

try:
admin_client = AdminClient({'bootstrap.servers': broker})
new_topic = [NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)]
fs = admin_client.create_topics(new_topic)
for topic, f in fs.items():
try:
f.result()
print(f"Topic '{topic}' created successfully")
except Exception as e:
print(f"Failed to create topic '{topic}': {e}")
f.result()
print(f"Topic '{topic}' created successfully")
return ActionResponse(status="OK", status_code=200)
except (KafkaError, KafkaException) as kafkaErr:
print(f"Kafka exception:", kafkaErr)
return errValue
except RuntimeError as e:
print(f"Runtime exception: {e}")
return errValue
except Exception as e:
print(f"Error creating topic: {e}")

return ActionResponse(status="OK", status_code=200)
print(f"Error creating topic: {e}")
return errValue
4 changes: 3 additions & 1 deletion command-service/src/config/service_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ flink:
commands:
PUBLISH_DATASET:
workflow:
- MAKE_DATASET_LIVE
- CREATE_KAFKA_TOPIC
- MAKE_DATASET_LIVE
- SUBMIT_INGESTION_TASKS
- START_PIPELINE_JOBS
- DEPLOY_CONNECTORS
Expand Down Expand Up @@ -167,6 +167,8 @@ kafka:
brokers: localhost:9092
telemetry:
topic: system.telemetry.events
replication_factor: 1
no_of_partitions: 1

connector_jobs:
spark:
Expand Down
Loading