Skip to content

Commit

Permalink
#333: Refactor outgoing call to KG and update elastic search delete a…
Browse files Browse the repository at this point in the history
…nd initialization
  • Loading branch information
tholulomo committed Mar 12, 2023
1 parent a21b2b2 commit 3d3eb01
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 60 deletions.
17 changes: 6 additions & 11 deletions resfulservice/script/getTiffImage.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
const mongoose = require('mongoose');
const storedFiles = require('../src/models/fsFiles');
// const storedFiles = require('../src/models/fsFiles');
const env = process.env;

/**
* Ensure the environment variable is set
*/

const db = mongoose.connection
const db = mongoose.connection;

async function getImagesFromMongo () {
try {
await mongoose.connect(`mongodb://${env.MM_MONGO_USER}:${env.MM_MONGO_PWD}@localhost:${env.MONGO_PORT}/${env.MM_DB}`, {keepAlive: true, keepAliveInitialDelay: 300000});
db.on('error', () => console.log('An error occurred'));
db.once('open', () => console.log('Open successfully'));
console.log('ran out')
} catch (err) {
throw err
}
await mongoose.connect(`mongodb://${env.MM_MONGO_USER}:${env.MM_MONGO_PWD}@localhost:${env.MONGO_PORT}/${env.MM_DB}`, { keepAlive: true, keepAliveInitialDelay: 300000 });
db.on('error', () => console.log('An error occurred'));
db.once('open', () => console.log('Open successfully'));
// mongoose
// .connect(`mongodb://${env.MM_MONGO_USER}:${env.MM_MONGO_PWD}@localhost:${env.MONGO_PORT}/${env.MM_DB}`, {
// useNewUrlParser: true, useUnifiedTopology: true
Expand All @@ -30,4 +25,4 @@ async function getImagesFromMongo () {
// })
}

getImagesFromMongo();
getImagesFromMongo();
20 changes: 18 additions & 2 deletions resfulservice/src/controllers/adminController.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,24 @@ const _loadBulkElasticSearch = async (req, res, next) => {
try {
const total = data.length;
let rejected = 0;

// Delete existing docs in this index type
log.info(`_loadBulkElasticSearch(): Deleting existing ${type} indices`);
if (total) await elasticSearch.deleteIndexDocs(type);
log.info(`_loadBulkElasticSearch(): Successfully deleted ${type} indices`);

for (const item of data) {
const response = await elasticSearch.indexDocument(req, type, item);

if (!response) {
log.debug(`_loadBulkElasticSearch()::error: rejected - ${response.statusText}`);
rejected = rejected + 1;
}
}

await elasticSearch.refreshIndices(req, type);
successWriter(req, 'success', '_loadBulkElasticSearch');

return res.status(200).json({
total,
rejected
Expand All @@ -76,19 +85,21 @@ const _loadBulkElasticSearch = async (req, res, next) => {
*/
exports.loadElasticSearch = async (req, res, next) => {
const log = req.logger;
log.info('loadElasticSearch(): Function entry');
const body = JSON.parse(req?.body);
const type = body?.type;
const doc = body?.doc;

log.info('loadElasticSearch(): Function entry');
if (!type || !doc) {
return next(errorWriter(req, 'Category type or doc is missing', 'loadElasticSearch', 422));
}

try {
const response = await elasticSearch.indexDocument(req, type, doc);

await elasticSearch.refreshIndices(req, type);
successWriter(req, 'success', 'loadElasticSearch');

return res.status(200).json({
response
});
Expand Down Expand Up @@ -119,7 +130,8 @@ exports.pingElasticSearch = async (req, res, next) => {
};

/**
* Data dump into ES
* Fetch data from knowledge graph and dump into ES
* NOTE: It overwrites the index
* @param {*} req
* @param {*} res
* @param {*} next
Expand All @@ -139,6 +151,10 @@ exports.dataDump = async (req, res, next) => {
}
};

/** This function allows for upload already fetched data
* into ES. It will NOT call the knowledge graph as it assumes
* user already have the data. NOTE: It overwrites the index
*/
exports.bulkElasticSearchImport = (req, res, next) => {
const log = req.logger;
log.info('bulkElasticSearchImport(): Function entry');
Expand Down
9 changes: 9 additions & 0 deletions resfulservice/src/controllers/fileController.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
const mongoose = require('mongoose');
const { PassThrough } = require('stream');
const fsFiles = require('../models/fsFiles');
const { errorWriter, successWriter } = require('../utils/logWriter');

const _createEmptyStream = () => new PassThrough('').end();

exports.imageMigration = async (req, res, next) => {
const { imageType } = req.params;

Expand Down Expand Up @@ -29,6 +33,11 @@ exports.fileContent = async (req, res, next) => {

try {
const _id = new mongoose.Types.ObjectId(fileId);
const exist = await fsFiles.findById(_id).limit(1);
if (!exist) {
res.setHeader('Content-Type', 'image/png');
return _createEmptyStream().pipe(res);
}
const downloadStream = bucket.openDownloadStream(_id);
downloadStream.pipe(res);
} catch (error) {
Expand Down
62 changes: 45 additions & 17 deletions resfulservice/src/controllers/kgWrapperController.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,47 @@ const _outboundRequest = async (req, next) => {
const log = req.logger;
log.info('_outboundRequest(): Function entry');

if (!req.env.KNOWLEDGE_ADDRESS) {
return next(errorWriter(req, 'Knowledge endpoint address missing', '_outboundRequest', 422));
}

const query = req?.query;
const type = query?.type;
const uri = query?.uri || constant[type];
let url = query?.uri;
let altMethod;

if (!url) {
url = `${req.env.KNOWLEDGE_ADDRESS}/${constant[type]}`;
altMethod = 'get';
}

if (!type) {
if (!query?.uri && !type) {
return next(errorWriter(req, 'Category type is missing', '_outboundRequest', 422));
}

if (!uri) {
if (!url) {
return next(errorWriter(req, 'URI is missing in the request body', '_outboundRequest', 422));
}

const response = await axios({
method: 'get',
url: uri,
httpsAgent: new https.Agent(httpsAgent)
});
const preparedRequest = {
method: altMethod ?? req.method,
httpsAgent: new https.Agent(httpsAgent),
url
};

if (query?.queryString) {
preparedRequest.params = {
query: query.queryString
};
}

if (req.isBackendCall || req.query?.responseType === 'json') {
preparedRequest.headers = {
accept: 'application/sparql-results+json'
};
}

const response = await axios(preparedRequest);

return {
type,
Expand Down Expand Up @@ -91,17 +115,21 @@ exports.getKnowledge = async (req, res, next) => {
*/
exports.getSparql = async (req, res, next) => {
try {
const query = req?.body?.query ?? req?.query?.query;
if (!req.env.KNOWLEDGE_ADDRESS) {
return next(errorWriter(req, 'Knowledge endpoint address missing', 'getSparql', 422));
}

req.query.queryString = req?.body?.query ?? req?.query?.query;
req.query.uri = `${req.env.KNOWLEDGE_ADDRESS}/${constant.sparql}`;

successWriter(req, { message: 'sending request' }, 'getSparql');
const response = await axios({
httpsAgent: new https.Agent(httpsAgent),
method: 'post',
url: constant.sparql,
params: {
query
}
});
const response = await _outboundRequest(req, next);
successWriter(req, { message: 'success' }, 'getSparql');

// Needed `isBackendCall` flag to enforce internal calls and return response
// through the function that triggers the call.
if (req.isBackendCall) return response?.data;

return res.status(200).json({ ...response?.data });
} catch (err) {
next(errorWriter(req, err, 'getSparql'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ input materialsInput {

input materialQueryInput {
"Column field name"
field: String!
field: String

"""
Pagination option for the page of columns to display to return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ const errorFormater = require('../../../utils/errorFormater');

const materialMutation = {
createXlsxCurationList: async (_, { input }, { user, req, isAuthenticated }) => {
console.log('entered function');
req.logger?.info('[createMaterialColumn] Function Entry:');
if (!isAuthenticated) {
req.logger?.error('[createMaterialColumn]: User not authenticated to create Material column');
return errorFormater('not authenticated', 401);
}
const { columns } = input;
const curatedList = columns.map(column => ({ ...column, user: user._id }));
console.log('columns:', columns);
console.log('curatedList:', curatedList);
const result = await insertMany(curatedList);
if (result) return errorFormater(result, 409);
return { columns: curatedList };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const materialQuery = {
return errorFormater('not authenticated', 401);
}
const { field, pageSize, pageNumber } = input;
const filter = { field: { $regex: new RegExp(field.toString(), 'gi') } };
const filter = field ? { field: { $regex: new RegExp(field.toString(), 'gi') } } : {};
try {
const pagination = pageSize || pageNumber ? paginator(await MaterialTemplate.countDocuments(filter), pageNumber, pageSize) : paginator(await MaterialTemplate.countDocuments(filter));
const curatedList = await MaterialTemplate.find(filter, null, { lean: true, populate: { path: 'user', select: 'displayName' } });
Expand Down
5 changes: 1 addition & 4 deletions resfulservice/src/routes/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ const { getInternal } = require('../middlewares/isInternal');

router
.route('/es/bulkinsert')
// .get(getInternal, AdminController.pingElasticSearch)
// .post(getInternal, AdminController.initializeElasticSearch);
.post(AdminController.bulkElasticSearchImport)
.put(AdminController.dataDump);

Expand All @@ -20,11 +18,10 @@ router.route('/populate-datasets')

router
.route('/es')
// .get(getInternal, AdminController.pingElasticSearch)
// .post(getInternal, AdminController.initializeElasticSearch);
.get(AdminController.pingElasticSearch)
.post(AdminController.initializeElasticSearch)
.put(getInternal, AdminController.loadElasticSearch);

// Note: Not in use. Deprecated for authService.js route.
router.route('/login').post(loginController.login);
module.exports = router;
3 changes: 2 additions & 1 deletion resfulservice/src/routes/kg-wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ router.route('/charts')
.get(getAllCharts);

router.route('/sparql')
.post(getSparql);
.post(getSparql)
.get(getSparql);

module.exports = router;
54 changes: 34 additions & 20 deletions resfulservice/src/utils/elasticSearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,20 @@ class ElasticSearch {
}

/**
* Deletes a type group and all its docs
* Deletes all documents of an index
* @param {String} type
* @returns {Object} response
* @returns response
*/
async deleteAType (type) {
const configResponse = await axios({
method: 'delete',
url: `http://${env.ESADDRESS}/${type}`,
headers: {
'Content-Type': 'application/json'
}
async deleteIndexDocs (type) {
return this.client.deleteByQuery({
index: type,
body: {
query: {
match_all: {}
}
},
timeout: '5m' // Todo: Increase when data becomes larger
});
return configResponse;
}

async _putMappings (type, schema) {
Expand All @@ -74,21 +75,34 @@ class ElasticSearch {
});
}

async _getExistingIndices () {
return await this.client.cat.indices({ format: 'json' });
}

async initES (req) {
const log = req.logger;
log.info('elasticsearch.initES(): Function entry');

try {
const allSchemas = {
articles: configPayload.articles,
samples: configPayload.samples,
charts: configPayload.charts,
images: configPayload.images
};
// Check and ignore existing indexes before create
const existingIndexes = await this._getExistingIndices();
if (existingIndexes.length >= Object.keys(configPayload).length) {
log.info('elasticsearch.initES(): All indexes exist in Elastic search');
return;
}

// Remove elastic search index config from list of keys
let preparedKeys = Object.keys(configPayload)?.filter(e => e !== 'config');
if (existingIndexes.length) {
preparedKeys = preparedKeys.filter(preppedKey => !existingIndexes.some(existingIndex => (existingIndex?.index === preppedKey)));
log.info(`elasticsearch.initES(): Adding the following missing index(es) ${preparedKeys.join(',')}`);
}

Object.entries(allSchemas).forEach(async ([key, value]) => {
await this._createConfig(key);
await this._putMappings(key, value);
try {
Object.entries(configPayload).forEach(async ([key, value]) => {
if (preparedKeys.includes(key)) {
await this._createConfig(key);
await this._putMappings(key, value);
}
});

return {
Expand Down
1 change: 1 addition & 0 deletions resfulservice/src/utils/iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ exports.generateMongoUrl = (req) => {
* @returns {Promise}
*/
exports.iteration = (arr, iterationFn, batchSize) => new Promise((resolve, reject) => {
// const chunks: Buffer[] = arr;
let pendingPromises = [];
const pausePromises = async () => {
try {
Expand Down

0 comments on commit 3d3eb01

Please sign in to comment.