Skip to content

Commit

Permalink
feat: add auth for subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
MARCO MANCO s281564 authored and GabriFila committed May 27, 2021
1 parent 4e60791 commit b22da9c
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 31 deletions.
90 changes: 77 additions & 13 deletions qlkube/src/decorateSubscription.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
const { withFilter } = require('apollo-server');
const { gql } = require('apollo-server-core');
const { extendSchema } = require('graphql/utilities');
const { addResolversToSchema } = require('@graphql-tools/schema');
const { PubSub, withFilter } = require('apollo-server');
const { extendSchema } = require('graphql/utilities');
const { pubsubAsyncIterator } = require('./pubsub.js');
const { subscriptions } = require('./subscriptions.js');
const { capitalizeType } = require('./utils.js');
const { canWatchResource } = require('./watch.js');

const pubsub = new PubSub();
let cacheSubscriptions = {};
let apiServerUrl = '';

function decorateEnum(baseSchema, enumName, values) {
if (!baseSchema) throw 'Parameter baseSchema cannot be empty!';
Expand Down Expand Up @@ -62,12 +66,22 @@ function decorateSubscription(baseSchema, targetType, enumType) {
Subscription: {
[subscriptionField]: {
subscribe: withFilter(
() => pubsub.asyncIterator([label]),
() => pubsubAsyncIterator(label),
(payload, variables, info, context) => {
const resourceApi = subscriptions.filter(sub => {
return `${sub.type}Update` === context.fieldName;
})[0];
return (
payload.apiObj.metadata.namespace === variables.namespace &&
(variables.name === undefined ||
payload.apiObj.metadata.name === variables.name)
payload.apiObj.metadata.name === variables.name) &&
checkPermission(
info.token,
resourceApi.group,
resourceApi.resource,
variables.namespace,
variables.name
)
);
}
),
Expand All @@ -92,29 +106,79 @@ function decorateSubscription(baseSchema, targetType, enumType) {
return newSchema;
}

function setupSubscriptions(subscriptions, schema) {
async function checkPermission(
token,
group = '',
resource,
namespace,
name = ''
) {
if (!token) throw 'Parameter token cannot be empty!';
if (!resource) throw 'Parameter resource cannot be empty!';
if (!namespace) throw 'Parameter namespace cannot be empty!';

const keyCache = `${token}_${group}_${resource}_${namespace}_${name}`;
const lastSub = cacheSubscriptions[keyCache];
const canUserWatchResourceCached =
lastSub &&
!(
Date.now() - lastSub > 10 * 60 * 1000 &&
delete cacheSubscriptions[keyCache]
);

if (canUserWatchResourceCached) {
return true;
} else {
const canUserWatchResource = await canWatchResource(
apiServerUrl,
token,
resource,
group,
namespace,
name
);

if (canUserWatchResource) {
cacheSubscriptions[keyCache] = Date.now();
return true;
}
}
return false;
}

function clearCache() {
const courrentTimestamp = Date.now();
Object.keys(cacheSubscriptions).forEach(e => {
courrentTimestamp - cacheSubscriptions[e] > 10 * 60 * 1000 &&
delete cacheSubscriptions[e];
});
}

function setupSubscriptions(subscriptions, schema, kubeApiUrl) {
if (!subscriptions) throw 'Parameter subscriptions cannot be empty!';
if (!schema) throw 'Parameter schema cannot be empty!';
if (!kubeApiUrl) throw 'Parameter apiServerUrl cannot be empty!';

apiServerUrl = kubeApiUrl;

let newSchema = decorateEnum(schema, 'UpdateType', [
'ADDED',
'MODIFIED',
'DELETED',
]);

subscriptions.forEach(e => {
newSchema = decorateSubscription(newSchema, e.type, 'UpdateType');
subscriptions.forEach(sub => {
newSchema = decorateSubscription(newSchema, sub.type, 'UpdateType');
});

return newSchema;
}
setInterval(() => {
clearCache();
}, 1000 * 60 * 5);

function publishEvent(label, value) {
pubsub.publish(label, value);
return newSchema;
}

module.exports = {
decorateSubscription,
setupSubscriptions,
publishEvent,
};
34 changes: 21 additions & 13 deletions qlkube/src/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
const fs = require('fs').promises;
const { createServer } = require('http');
const express = require('express');
const { ApolloServer } = require('apollo-server-express');
const compression = require('compression');
const { createSchema } = require('./schema');
const { kwatch } = require('./watch.js');
const { setupSubscriptions } = require('./decorateSubscription.js');
const { subscriptions } = require('./subscriptions.js');
const getOpenApiSpec = require('./oas');
const dotenv = require('dotenv');
const express = require('express');
const fs = require('fs').promises;
const { printSchema } = require('graphql');
const { createServer } = require('http');
const logger = require('pino')({ useLevelLabels: true });
const dotenv = require('dotenv');
const { setupSubscriptions } = require('./decorateSubscription.js');
const getOpenApiSpec = require('./oas');
const { createSchema } = require('./schema');
const { subscriptions } = require('./subscriptions.js');
const { kwatch } = require('./watch.js');

dotenv.config();

Expand All @@ -30,12 +30,11 @@ async function main() {
'utf8'
)
: '';

const oas = await getOpenApiSpec(kubeApiUrl, token);
let schema = await createSchema(oas, kubeApiUrl, token);

try {
schema = setupSubscriptions(subscriptions, schema);
schema = setupSubscriptions(subscriptions, schema, kubeApiUrl);
} catch (e) {
console.error(e);
process.exit(1);
Expand All @@ -47,6 +46,11 @@ async function main() {
path: '/subscription',
onConnect: (connectionParams, webSocket, context) => {
console.log('Connected!');
const token =
(connectionParams.authorization &&
connectionParams.authorization.split(' ')[1]) ||
'';
return { token };
},
onDisconnect: (webSocket, context) => {
console.log('Disconnected!');
Expand All @@ -55,7 +59,8 @@ async function main() {

context: ({ req, connection }) => {
if (connection) {
return {};
const token = connection.context.token || '';
return { token };
} else {
if (req.headers.authorization && req.headers.authorization.length > 0) {
const strs = req.headers.authorization.split(' ');
Expand Down Expand Up @@ -96,7 +101,10 @@ async function main() {

try {
subscriptions.forEach(sub => {
kwatch(sub.resource, sub.type);
const resourceApi = `/${sub.api}${sub.group ? `/${sub.group}` : ''}/${
sub.version
}/${sub.resource}`;
kwatch(resourceApi, sub.type);
});
} catch (e) {
console.error(e);
Expand Down
13 changes: 13 additions & 0 deletions qlkube/src/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const { PubSub } = require('apollo-server');

const pubsub = new PubSub();

function publishEvent(label, value) {
pubsub.publish(label, value);
}

function pubsubAsyncIterator(label) {
return pubsub.asyncIterator([label]);
}

module.exports = { publishEvent, pubsubAsyncIterator };
16 changes: 14 additions & 2 deletions qlkube/src/subscriptions.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
const subscriptions = [
{ resource: '/api/v1/pods', type: 'ioK8sApiCoreV1Pod' },
{ resource: '/api/v1/nodes', type: 'ioK8sApiCoreV1Node' },
{
api: 'apis',
group: 'crownlabs.polito.it',
version: 'v1alpha2',
resource: 'instances',
type: 'itPolitoCrownlabsV1alpha2Instance',
},
{
api: 'apis',
group: 'crownlabs.polito.it',
version: 'v1alpha2',
resource: 'templates',
type: 'itPolitoCrownlabsV1alpha2Template',
},
];

module.exports = { subscriptions };
55 changes: 52 additions & 3 deletions qlkube/src/watch.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,63 @@
const k8s = require('@kubernetes/client-node');
const { publishEvent } = require('./decorateSubscription.js');
const fetch = require('node-fetch');
const { publishEvent } = require('./pubsub.js');

const kc = new k8s.KubeConfig();
kc.loadFromDefault();
const watch = new k8s.Watch(kc);

// resource needs to be PLURAL
async function canWatchResource(
apiServerUrl,
token,
resource,
group,
namespace,
name
) {
return fetch(
`${apiServerUrl}/apis/authorization.k8s.io/v1/selfsubjectaccessreviews`,
{
method: 'POST',
body: JSON.stringify({
kind: 'SelfSubjectAccessReview',
apiVersion: 'authorization.k8s.io/v1',
spec: {
resourceAttributes: {
namespace,
verb: 'watch',
group,
resource,
name,
},
},
}),
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
},
}
)
.then(res => res.json())
.then(body => {
if (body.status) {
return body.status.allowed === true;
}
return false;
})
.catch(err => {
console.error('ERROR WHEN CHECKING IF USER CAN CHECK', err);
throw new Error(
'ERROR WHEN CHECKING IF USER CAN CHECK',
err.message,
err
);
});
}

function kwatch(api, label) {
if (!api) throw 'Parameter api cannot be empty!';
if (!label) throw 'Parameter label cannot be empty!';

watch
.watch(
api,
Expand All @@ -26,4 +75,4 @@ function kwatch(api, label) {
.then(req => {});
}

module.exports = { kwatch };
module.exports = { kwatch, canWatchResource };

0 comments on commit b22da9c

Please sign in to comment.