Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
yingwang-us committed Apr 29, 2020
1 parent 667d333 commit f259b0f
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 82 deletions.
4 changes: 2 additions & 2 deletions app/apollo/resolvers/resource.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const { withFilter } = require('apollo-server');

const buildSearchForResources = require('../utils');
const { ACTIONS, TYPES } = require('../models/const');
const { EVENTS, pubSubPlaceHolder, getStreamingTopic } = require('../subscription');
const { EVENTS, GraphqlPubSub, getStreamingTopic } = require('../subscription');
const { whoIs, validAuth } = require ('./common');
const ObjectId = require('mongoose').Types.ObjectId;

Expand Down Expand Up @@ -195,7 +195,7 @@ const resourceResolvers = {
const topic = getStreamingTopic(EVENTS.RESOURCE.UPDATED, args.org_id);
context.logger.debug({args, me: context.me, topic}, 'withFilter asyncIteratorFn');
// TODO: in future probably we should valid authorization here
return pubSubPlaceHolder.pubSub.asyncIterator(topic);
return GraphqlPubSub.getInstance().pubSub.asyncIterator(topic);
},
async (parent, args, context) => {
const queryName = 'subscribe: withFilter';
Expand Down
12 changes: 7 additions & 5 deletions app/apollo/resolvers/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ const { ACTIONS, TYPES } = require('../models/const');
const { whoIs, validAuth } = require ('./common');
const getSubscriptionUrls = require('../../utils/subscriptions.js').getSubscriptionUrls;
const tagsStrToArr = require('../../utils/subscriptions.js').tagsStrToArr;
const { EVENTS, pubSubPlaceHolder, getStreamingTopic, channelSubChangedFunc } = require('../subscription');
const { EVENTS, GraphqlPubSub, getStreamingTopic } = require('../subscription');
const { models } = require('../models');

const pubSub = GraphqlPubSub.getInstance();

const subscriptionResolvers = {
Query: {
subscriptionsByTag: async(parent, { org_id, tags }, context) => {
Expand Down Expand Up @@ -144,7 +146,7 @@ const subscriptionResolvers = {
channel: channel.name, channel_uuid, version: version.name, version_uuid
});

channelSubChangedFunc({org_id: org_id});
pubSub.channelSubChangedFunc({org_id: org_id});

return {
uuid,
Expand Down Expand Up @@ -187,7 +189,7 @@ const subscriptionResolvers = {
};
await models.Subscription.updateOne({ uuid, org_id, }, { $set: sets });

channelSubChangedFunc({org_id: org_id});
pubSub.channelSubChangedFunc({org_id: org_id});

return {
uuid,
Expand All @@ -213,7 +215,7 @@ const subscriptionResolvers = {
}
await subscription.deleteOne();

channelSubChangedFunc({org_id: org_id});
pubSub.channelSubChangedFunc({org_id: org_id});

success = true;
}catch(err){
Expand Down Expand Up @@ -248,7 +250,7 @@ const subscriptionResolvers = {
const { logger } = context;
logger.info('A client is connected with args:', args);
const topic = getStreamingTopic(EVENTS.CHANNEL.UPDATED, args.org_id);
return pubSubPlaceHolder.pubSub.asyncIterator(topic);
return GraphqlPubSub.getInstance().pubSub.asyncIterator(topic);
},
// eslint-disable-next-line no-unused-vars
async (parent, args, context) => {
Expand Down
146 changes: 84 additions & 62 deletions app/apollo/subscription/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,10 @@ const EVENTS = {
},
};

const pubSubPlaceHolder = {
enabled: false,
pubSub: new PubSub(),
};

function obscureUrl(url) {
return url.replace(/:\/\/.*@/gi, '://xxxxxxx'.concat(':yyyyyyyy', '@'));
}

async function isRedisReachable(redisUrl) {
const url = new URL(redisUrl);
if (await isPortReachable(url.port, { host: url.hostname, timeout: 5000 })) {
const options = process.env.REDIS_CERTIFICATE_PATH
? { tls: { ca: [fs.readFileSync(process.env.REDIS_CERTIFICATE_PATH)] } }
: {};
pubSubPlaceHolder.pubSub = new RedisPubSub({
publisher: new Redis(redisUrl, options),
subscriber: new Redis(redisUrl, options),
});
pubSubPlaceHolder.enabled = true;
logger.info(
`Apollo streaming is enabled on redis endpoint ${url.hostname}:${url.port}`,
);
return true;
}
logger.warn(
`Apollo streaming is disabled because ${url.hostname}:${url.port} is unreachable.`,
);
return false;
}

const redisUrl = process.env.REDIS_PUBSUB_URL || 'redis://127.0.0.1:6379/0';
if (process.env.ENABLE_GRAPHQL) {
logger.info(
`Apollo streaming service is configured on redisUrl: ${obscureUrl(
redisUrl,
)}`,
);
isRedisReachable(redisUrl);
}

function getStreamingTopic(prefix, org_id) {
if (APOLLO_STREAM_SHARDING) {
if (org_id) {
Expand All @@ -86,36 +49,95 @@ function getStreamingTopic(prefix, org_id) {
return prefix;
}

async function channelSubChangedFunc(data) {
if (pubSubPlaceHolder.enabled) {
try {
const topic = getStreamingTopic(EVENTS.CHANNEL.UPDATED, data.org_id);
logger.debug({ data, topic }, 'Publishing channel subscription update');
await pubSubPlaceHolder.pubSub.publish(topic, { subscriptionUpdated: { data }, });
} catch (error) {
logger.error(error, 'Channel subscription publish error');
class PubSubImpl {

constructor(params) {
this.enabled = false;
this.pubSub = null;
this.redisUrl = params.redisUrl || process.env.REDIS_PUBSUB_URL || 'redis://127.0.0.1:6379/0';
if (process.env.ENABLE_GRAPHQL) {
logger.info(
`Apollo streaming service is configured on redisUrl: ${obscureUrl(
this.redisUrl,
)}`,
);
this.isRedisReachable();
}
}
return data;
}

async isRedisReachable() {
const url = new URL(this.redisUrl);
if (await isPortReachable(url.port, { host: url.hostname, timeout: 5000 })) {
const options = process.env.REDIS_CERTIFICATE_PATH
? { tls: { ca: [fs.readFileSync(process.env.REDIS_CERTIFICATE_PATH)] } }
: {};
this.pubSub = new RedisPubSub({
publisher: new Redis(this.redisUrl, options),
subscriber: new Redis(this.redisUrl, options),
});
this.enabled = true;
logger.info(
`Apollo streaming is enabled on redis endpoint ${url.hostname}:${url.port}`,
);
return true;
}
logger.warn(
`Apollo streaming is disabled because ${url.hostname}:${url.port} is unreachable.`,
);
this.enabled = false;
this.pubSub = new PubSub();
return false;
}

async function resourceChangedFunc(resource) {
if (pubSubPlaceHolder.enabled) {
let op = 'upsert';
if (resource.deleted) {
op = 'delete';
async channelSubChangedFunc(data) {
if (this.enabled) {
try {
const topic = getStreamingTopic(EVENTS.CHANNEL.UPDATED, data.org_id);
logger.debug({ data, topic }, 'Publishing channel subscription update');
await this.pubSub.publish(topic, { subscriptionUpdated: { data }, });
} catch (error) {
logger.error(error, 'Channel subscription publish error');
}
}
try {
const topic = getStreamingTopic(EVENTS.RESOURCE.UPDATED, resource.org_id);
logger.debug({ op, resource, topic }, 'Publishing resource updates');
await pubSubPlaceHolder.pubSub.publish(topic, {
resourceUpdated: { resource, op },
});
} catch (error) {
logger.error(error, 'Resource publish error');
return data;
}

async resourceChangedFunc(resource) {
if (this.enabled) {
let op = 'upsert';
if (resource.deleted) {
op = 'delete';
}
try {
const topic = getStreamingTopic(EVENTS.RESOURCE.UPDATED, resource.org_id);
logger.debug({ op, resource, topic }, 'Publishing resource updates');
await this.pubSub.publish(topic, {
resourceUpdated: { resource, op },
});
} catch (error) {
logger.error(error, 'Resource publish error');
}
}
return resource;
}
return resource;
}

module.exports = { EVENTS, pubSubPlaceHolder, resourceChangedFunc, getStreamingTopic, channelSubChangedFunc };
var GraphqlPubSub = (function() {
var singleton;
return {
getInstance: function () {
if (!singleton) {
singleton = new PubSubImpl({});
}
return singleton;
},
deleteInstance: function () {
if (singleton && singleton.enabled) {
singleton.pubSub.close();
singleton = undefined;
}
}
};
})();

module.exports = { EVENTS, GraphqlPubSub, getStreamingTopic };
16 changes: 6 additions & 10 deletions app/apollo/test/resource.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ const {
} = require(`./testHelper.${AUTH_MODEL}`);

const SubClient = require('./subClient');
const { resourceChangedFunc, pubSubPlaceHolder } = require('../subscription');
const { GraphqlPubSub } = require('../subscription');

let mongoServer;
let myApollo;
const graphqlPort = 18004;
const graphqlUrl = `http://localhost:${graphqlPort}/graphql`;
const subscriptionUrl = `ws://localhost:${graphqlPort}/graphql`;
const api = apiFunc(graphqlUrl);
const pubSub = GraphqlPubSub.getInstance();

let org01Data;
let org02Data;
Expand Down Expand Up @@ -161,9 +162,7 @@ describe('resource graphql test suite', () => {

after(async () => {
await myApollo.stop(myApollo);
if (pubSubPlaceHolder.enabled) {
await pubSubPlaceHolder.pubSub.close();
}
GraphqlPubSub.deleteInstance();
await mongoServer.stop();
});

Expand Down Expand Up @@ -315,7 +314,7 @@ describe('resource graphql test suite', () => {

describe('resourceUpdated (org_id: String!, filter: String): ResourceUpdated!', () => {
before(function() {
if (pubSubPlaceHolder.enabled === false) {
if (pubSub.enabled === false) {
this.skip();
}
});
Expand All @@ -340,9 +339,6 @@ describe('resource graphql test suite', () => {

it('a user subscribe an org and filter should be able to get notification is a new/updated resource matches', async () => {
try {
if (pubSubPlaceHolder.enabled === false) {
return this.skip();
}
let dataReceivedFromSub;

token = await signInUser(models, api, user02Data);
Expand Down Expand Up @@ -389,7 +385,7 @@ describe('resource graphql test suite', () => {
await sleep(200);
aResource.org_id = org_02._id;
// const result = await api.resourceChanged({r: aResource});
resourceChangedFunc(aResource);
pubSub.resourceChangedFunc(aResource);
// expect(result.data.data.resourceChanged._id).to.equal('some_fake_id');

// sleep another 0.1 second and verify if sub received the event
Expand All @@ -399,7 +395,7 @@ describe('resource graphql test suite', () => {
// sleep 0.1 second and send a resourceChanged event
await sleep(100);
// const result1 = await api.resourceChanged({r: anotherResource});
resourceChangedFunc(anotherResource);
pubSub.resourceChangedFunc(anotherResource);
// expect(result1.data.data.resourceChanged._id).to.equal('anther_fake_id');

await unsub.unsubscribe();
Expand Down
2 changes: 2 additions & 0 deletions app/apollo/test/subscriptions.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const subscriptionsFunc = require('./subscriptionsApi');

const apollo = require('../index');
const { AUTH_MODEL } = require('../models/const');
const { GraphqlPubSub } = require('../subscription');

const { prepareUser, prepareOrganization, signInUser } = require(`./testHelper.${AUTH_MODEL}`);
let mongoServer;
Expand Down Expand Up @@ -227,6 +228,7 @@ describe('subscriptions graphql test suite', () => {

after(async () => {
await myApollo.stop(myApollo);
GraphqlPubSub.deleteInstance();
await mongoServer.stop();
}); // after

Expand Down
9 changes: 6 additions & 3 deletions app/routes/v2/clusters.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ const buildSearchableDataForResource = require('../../utils/cluster.js').buildSe
const buildSearchableDataObjHash = require('../../utils/cluster.js').buildSearchableDataObjHash;
const buildPushObj = require('../../utils/cluster.js').buildPushObj;
const buildHashForResource = require('../../utils/cluster.js').buildHashForResource;
const resourceChangedFunc = require('../../apollo/subscription/index.js').resourceChangedFunc;
const { GraphqlPubSub } = require('../../apollo/subscription');

const pubSub = GraphqlPubSub.getInstance();



const addUpdateCluster = async (req, res, next) => {
Expand Down Expand Up @@ -251,7 +254,7 @@ const updateClusterResources = async (req, res, next) => {
resourceCreated = currentResource.created;
}
if (resourceId) {
resourceChangedFunc(
pubSub.resourceChangedFunc(
{_id: resourceId, data: dataStr, created: resourceCreated,
deleted: false, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink,
hash: resourceHash, searchableData: searchableDataObj, searchableDataHash: searchableDataHash});
Expand Down Expand Up @@ -289,7 +292,7 @@ const updateClusterResources = async (req, res, next) => {
);
await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, '');
if (process.env.ENABLE_GRAPHQL === 'true') {
resourceChangedFunc({ _id: currentResource._id, created: currentResource.created, deleted: true, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink, searchableData: searchableDataObj, searchableDataHash: searchableDataHash});
pubSub.resourceChangedFunc({ _id: currentResource._id, created: currentResource.created, deleted: true, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink, searchableData: searchableDataObj, searchableDataHash: searchableDataHash});
}
}
break;
Expand Down

0 comments on commit f259b0f

Please sign in to comment.