diff --git a/CHANGELOG.md b/CHANGELOG.md index a17db9f820c..e08043eaa04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,11 @@ External tooling making use of `searchContext` in the `GET` `/granules/` endpoin - Updated `@cumulus/api/bin/serveUtils` to no longer add records to ElasticSearch - Removed ElasticSearch from local API server code - Updated CollectionSearch to filter granule fields in addition to time frame for active collections +- **CUMULUS-3847** + - remove remaining ES indexing in code and tests + - for asyncOperations test data, change any ES related values to other options + - remove code from `@cumulus/api/lambdas/cleanExecutions` leaving a dummy handler, as the code worked with ES. lambda will be rewritten with CUMULUS-3982 + - remove `@cumulus/api/endpoints/elasticsearch`, `@cumulus/api/lambdas/bootstrap`, and `@cumulus/api/lambdas/index-from-database` ## [Unreleased] diff --git a/example/deployments/cumulus/sandbox.tfvars b/example/deployments/cumulus/sandbox.tfvars index 106ee0f8c1c..680caba85e5 100644 --- a/example/deployments/cumulus/sandbox.tfvars +++ b/example/deployments/cumulus/sandbox.tfvars @@ -45,11 +45,6 @@ csdap_host_url = "https://auth.csdap.uat.earthdatacloud.nasa.gov" default_s3_multipart_chunksize_mb = 128 -elasticsearch_client_config = { - create_reconciliation_report_es_scroll_duration = "8m" - create_reconciliation_report_es_scroll_size = 1500 -} - launchpad_api = "https://api.launchpad.nasa.gov/icam/api/sm/v1" launchpad_certificate = "launchpad.pfx" diff --git a/example/spec/serial/AsyncOperationRunnerFailingLambdaSpec.js b/example/spec/serial/AsyncOperationRunnerFailingLambdaSpec.js index adcea147676..29f5e5bfd63 100644 --- a/example/spec/serial/AsyncOperationRunnerFailingLambdaSpec.js +++ b/example/spec/serial/AsyncOperationRunnerFailingLambdaSpec.js @@ -49,7 +49,7 @@ describe('The AsyncOperation task runner executing a failing lambda function', ( id: asyncOperationId, taskArn: randomString(), description: 'Some description', - operationType: 'ES Index', + operationType: 'Bulk Granules', status: 'RUNNING', }; diff --git a/example/spec/serial/AsyncOperationRunnerNonExistentLambdaSpec.js b/example/spec/serial/AsyncOperationRunnerNonExistentLambdaSpec.js index 7306851040c..10e98ff91fe 100644 --- a/example/spec/serial/AsyncOperationRunnerNonExistentLambdaSpec.js +++ b/example/spec/serial/AsyncOperationRunnerNonExistentLambdaSpec.js @@ -43,7 +43,7 @@ describe('The AsyncOperation task runner running a non-existent lambda function' id: asyncOperationId, taskArn: randomString(), description: 'Some description', - operationType: 'ES Index', + operationType: 'Bulk Granules', status: 'RUNNING', }; diff --git a/example/spec/serial/AsyncOperationRunnerNonExistentPayloadSpec.js b/example/spec/serial/AsyncOperationRunnerNonExistentPayloadSpec.js index b12dc77c5aa..c9676b7449a 100644 --- a/example/spec/serial/AsyncOperationRunnerNonExistentPayloadSpec.js +++ b/example/spec/serial/AsyncOperationRunnerNonExistentPayloadSpec.js @@ -43,7 +43,7 @@ describe('The AsyncOperation task runner with a non-existent payload', () => { id: asyncOperationId, taskArn: randomString(), description: 'Some description', - operationType: 'ES Index', + operationType: 'Bulk Granules', status: 'RUNNING', }; diff --git a/example/spec/serial/AsyncOperationRunnerNonJsonPayloadSpec.js b/example/spec/serial/AsyncOperationRunnerNonJsonPayloadSpec.js index 77403068625..c05dbfc9ab9 100644 --- a/example/spec/serial/AsyncOperationRunnerNonJsonPayloadSpec.js +++ b/example/spec/serial/AsyncOperationRunnerNonJsonPayloadSpec.js @@ -52,7 +52,7 @@ describe('The AsyncOperation task runner with a non-JSON payload', () => { id: asyncOperationId, taskArn: randomString(), description: 'Some description', - operationType: 'ES Index', + operationType: 'Kinesis Replay', status: 'RUNNING', }; diff --git a/example/spec/serial/AsyncOperationRunnerSuccessfulLambdaSpec.js b/example/spec/serial/AsyncOperationRunnerSuccessfulLambdaSpec.js index 198dc8b8a9a..c497b91dbfc 100644 --- a/example/spec/serial/AsyncOperationRunnerSuccessfulLambdaSpec.js +++ b/example/spec/serial/AsyncOperationRunnerSuccessfulLambdaSpec.js @@ -50,7 +50,7 @@ describe('The AsyncOperation task runner executing a successful lambda function' const asyncOperationObject = { description: 'Some description', - operationType: 'ES Index', + operationType: 'Bulk Granules', id: asyncOperationId, taskArn: randomString(), status: 'RUNNING', diff --git a/packages/api/app/routes.js b/packages/api/app/routes.js index cf655198c86..25a8f76de78 100644 --- a/packages/api/app/routes.js +++ b/packages/api/app/routes.js @@ -23,7 +23,6 @@ const stats = require('../endpoints/stats'); const version = require('../endpoints/version'); const workflows = require('../endpoints/workflows'); const dashboard = require('../endpoints/dashboard'); -const elasticsearch = require('../endpoints/elasticsearch'); const deadLetterArchive = require('../endpoints/dead-letter-archive'); const { launchpadProtectedAuth } = require('./launchpadAuth'); const launchpadSaml = require('../endpoints/launchpadSaml'); @@ -110,8 +109,6 @@ router.delete('/tokenDelete/:token', token.deleteTokenEndpoint); router.use('/dashboard', dashboard); -router.use('/elasticsearch', ensureAuthorized, elasticsearch.router); - // Catch and send the error message down (instead of just 500: internal server error) router.use(defaultErrorHandler); diff --git a/packages/api/ecs/async-operation/tests/test-index.js b/packages/api/ecs/async-operation/tests/test-index.js index 97649c89bda..4475bef1e9c 100644 --- a/packages/api/ecs/async-operation/tests/test-index.js +++ b/packages/api/ecs/async-operation/tests/test-index.js @@ -32,7 +32,7 @@ test.beforeEach(async (t) => { t.context.testAsyncOperation = { id: t.context.asyncOperationId, description: 'test description', - operationType: 'ES Index', + operationType: 'Reconciliation Report', status: 'RUNNING', createdAt: Date.now(), }; diff --git a/packages/api/endpoints/elasticsearch.js b/packages/api/endpoints/elasticsearch.js deleted file mode 100644 index b09bb81572e..00000000000 --- a/packages/api/endpoints/elasticsearch.js +++ /dev/null @@ -1,233 +0,0 @@ -'use strict'; - -const router = require('express-promise-router')(); -const { v4: uuidv4 } = require('uuid'); - -const log = require('@cumulus/common/log'); -const { IndexExistsError } = require('@cumulus/errors'); -const { defaultIndexAlias, getEsClient } = require('@cumulus/es-client/search'); -const { createIndex } = require('@cumulus/es-client/indexer'); - -const { asyncOperationEndpointErrorHandler } = require('../app/middleware'); -const { getFunctionNameFromRequestContext } = require('../lib/request'); -const startAsyncOperation = require('../lib/startAsyncOperation'); - -// const snapshotRepoName = 'cumulus-es-snapshots'; - -function timestampedIndexName() { - const date = new Date(); - return `cumulus-${date.getFullYear()}-${date.getMonth() + 1}-${date.getDate()}`; -} - -function createEsSnapshot(req, res) { - return res.boom.badRequest('Functionality not yet implemented'); -} - -async function reindex(req, res) { - let sourceIndex = req.body.sourceIndex; - let destIndex = req.body.destIndex; - const aliasName = req.body.aliasName || defaultIndexAlias; - - const esClient = await getEsClient(); - - if (!sourceIndex) { - const alias = await esClient.client.indices.getAlias({ - name: aliasName, - }).then((response) => response.body); - - // alias keys = index name - const indices = Object.keys(alias); - - if (indices.length > 1) { - // We don't know which index to use as the source, throw error - return res.boom.badRequest(`Multiple indices found for alias ${aliasName}. Specify source index as one of [${indices.sort().join(', ')}].`); - } - - sourceIndex = indices[0]; - } else { - const sourceExists = await esClient.client.indices.exists({ index: sourceIndex }) - .then((response) => response.body); - - if (!sourceExists) { - return res.boom.badRequest(`Source index ${sourceIndex} does not exist.`); - } - } - - if (!destIndex) { - destIndex = timestampedIndexName(); - } - - if (sourceIndex === destIndex) { - return res.boom.badRequest(`source index(${sourceIndex}) and destination index(${destIndex}) must be different.`); - } - - const destExists = await esClient.client.indices.exists({ index: destIndex }) - .then((response) => response.body); - - if (!destExists) { - try { - await createIndex(esClient, destIndex); - log.info(`Created destination index ${destIndex}.`); - } catch (error) { - return res.boom.badRequest(`Error creating index ${destIndex}: ${error.message}`); - } - } - - // reindex - esClient.client.reindex({ - body: { - source: { index: sourceIndex }, - dest: { index: destIndex }, - }, - }); - - const message = `Reindexing to ${destIndex} from ${sourceIndex}. Check the reindex-status endpoint for status.`; - - return res.status(200).send({ message }); -} - -async function reindexStatus(req, res) { - const esClient = await getEsClient(); - - const reindexTaskStatus = await esClient.client.tasks.list({ actions: ['*reindex'] }) - .then((response) => response.body); - - await esClient.client.indices.refresh(); - - const indexStatus = await esClient.client.indices.stats({ - metric: 'docs', - }).then((response) => response.body); - - const status = { - reindexStatus: reindexTaskStatus, - indexStatus, - }; - - return res.send(status); -} - -async function changeIndex(req, res) { - const deleteSource = req.body.deleteSource; - const aliasName = req.body.aliasName || defaultIndexAlias; - const currentIndex = req.body.currentIndex; - const newIndex = req.body.newIndex; - - const esClient = await getEsClient(); - - if (!currentIndex || !newIndex) { - return res.boom.badRequest('Please explicity specify a current and new index.'); - } - - if (currentIndex === newIndex) { - return res.boom.badRequest('The current index cannot be the same as the new index.'); - } - - const currentExists = await esClient.client.indices.exists({ index: currentIndex }) - .then((response) => response.body); - - if (!currentExists) { - return res.boom.badRequest(`Current index ${currentIndex} does not exist.`); - } - - const destExists = await esClient.client.indices.exists({ index: newIndex }) - .then((response) => response.body); - - if (!destExists) { - try { - await createIndex(esClient, newIndex); - log.info(`Created destination index ${newIndex}.`); - } catch (error) { - return res.boom.badRequest(`Error creating index ${newIndex}: ${error.message}`); - } - } - - try { - await esClient.client.indices.updateAliases({ - body: { - actions: [ - { remove: { index: currentIndex, alias: aliasName } }, - { add: { index: newIndex, alias: aliasName } }, - ], - }, - }); - - log.info(`Removed alias ${aliasName} from index ${currentIndex} and added alias to ${newIndex}`); - } catch (error) { - return res.boom.badRequest( - `Error removing alias ${aliasName} from index ${currentIndex} and adding alias to ${newIndex}: ${error}` - ); - } - - let message = `Change index success - alias ${aliasName} now pointing to ${newIndex}`; - - if (deleteSource) { - await esClient.client.indices.delete({ index: currentIndex }); - log.info(`Deleted index ${currentIndex}`); - message = `${message} and index ${currentIndex} deleted`; - } - - return res.send({ message }); -} - -async function indicesStatus(req, res) { - const esClient = await getEsClient(); - - return res.send(await esClient.client.cat.indices({})); -} - -async function indexFromDatabase(req, res) { - const esClient = await getEsClient(); - const indexName = req.body.indexName || timestampedIndexName(); - const { postgresResultPageSize, postgresConnectionPoolSize, esRequestConcurrency } = req.body; - - await createIndex(esClient, indexName) - .catch((error) => { - if (!(error instanceof IndexExistsError)) throw error; - }); - - const asyncOperationId = uuidv4(); - const asyncOperationEvent = { - asyncOperationId, - callerLambdaName: getFunctionNameFromRequestContext(req), - lambdaName: process.env.IndexFromDatabaseLambda, - description: 'Elasticsearch index from database', - operationType: 'ES Index', - payload: { - indexName, - reconciliationReportsTable: process.env.ReconciliationReportsTable, - esHost: process.env.ES_HOST, - esRequestConcurrency: esRequestConcurrency || process.env.ES_CONCURRENCY, - postgresResultPageSize, - postgresConnectionPoolSize, - }, - }; - - log.debug(`About to invoke lambda to start async operation ${asyncOperationId}`); - await startAsyncOperation.invokeStartAsyncOperationLambda(asyncOperationEvent); - return res.send({ message: `Indexing database to ${indexName}. Operation id: ${asyncOperationId}` }); -} - -async function getCurrentIndex(req, res) { - const esClient = await getEsClient(); - const alias = req.params.alias || defaultIndexAlias; - - const aliasIndices = await esClient.client.indices.getAlias({ name: alias }) - .then((response) => response.body); - - return res.send(Object.keys(aliasIndices)); -} - -// express routes -router.put('/create-snapshot', createEsSnapshot); -router.post('/reindex', reindex); -router.get('/reindex-status', reindexStatus); -router.post('/change-index', changeIndex); -router.post('/index-from-database', indexFromDatabase, asyncOperationEndpointErrorHandler); -router.get('/indices-status', indicesStatus); -router.get('/current-index/:alias', getCurrentIndex); -router.get('/current-index', getCurrentIndex); - -module.exports = { - indexFromDatabase, - router, -}; diff --git a/packages/api/lambdas/bootstrap.js b/packages/api/lambdas/bootstrap.js deleted file mode 100644 index 5a038058070..00000000000 --- a/packages/api/lambdas/bootstrap.js +++ /dev/null @@ -1,35 +0,0 @@ -/* this module is intended to be used for bootstraping - * the cloudformation deployment of a DAAC. - * - * It helps: - * - adding ElasticSearch index mapping when a new index is created - */ - -'use strict'; - -const log = require('@cumulus/common/log'); -const { bootstrapElasticSearch } = require('@cumulus/es-client/bootstrap'); - -/** - * Bootstrap Elasticsearch indexes - * - * @param {Object} event - AWS Lambda event input - * @returns {Promise} a Terraform Lambda invocation response - */ -const handler = async ({ elasticsearchHostname, removeAliasConflict, testContext = {} }) => { - const bootstrapFunction = testContext.bootstrapFunction || bootstrapElasticSearch; - try { - await bootstrapFunction({ - host: elasticsearchHostname, - removeAliasConflict, - }); - return { Status: 'SUCCESS', Data: {} }; - } catch (error) { - log.error(error); - throw error; - } -}; - -module.exports = { - handler, -}; diff --git a/packages/api/lambdas/bulk-operation.js b/packages/api/lambdas/bulk-operation.js index 2b19194a705..763c97e685a 100644 --- a/packages/api/lambdas/bulk-operation.js +++ b/packages/api/lambdas/bulk-operation.js @@ -93,8 +93,8 @@ async function applyWorkflowToGranules({ * Defaults to `concurrency` * @param {number} [payload.concurrency] * granule concurrency for the bulk deletion operation. Defaults to 10 - * @param {Object} [payload.query] - Optional parameter of query to send to ES - * @param {string} [payload.index] - Optional parameter of ES index to query. + * @param {Object} [payload.query] - Optional parameter of query to send to ES (Cloud Metrics) + * @param {string} [payload.index] - Optional parameter of ES index to query (Cloud Metrics). * Must exist if payload.query exists. * @param {Object} [payload.granules] - Optional list of granule unique IDs to bulk operate on * e.g. { granuleId: xxx, collectionID: xxx } @@ -178,8 +178,8 @@ async function bulkGranuleDelete( * @param {string} payload.workflowName - name of the workflow that will be applied to each granule. * @param {Object} [payload.meta] - Optional meta to add to workflow input * @param {string} [payload.queueUrl] - Optional name of queue that will be used to start workflows - * @param {Object} [payload.query] - Optional parameter of query to send to ES - * @param {string} [payload.index] - Optional parameter of ES index to query. + * @param {Object} [payload.query] - Optional parameter of query to send to ES (Cloud Metrics) + * @param {string} [payload.index] - Optional parameter of ES index to query (Cloud Metrics). * Must exist if payload.query exists. * @param {Object} [payload.granules] - Optional list of granule unique IDs to bulk operate on * e.g. { granuleId: xxx, collectionID: xxx } diff --git a/packages/api/lambdas/cleanExecutions.js b/packages/api/lambdas/cleanExecutions.js index 9699b0ce9d7..01051970ff6 100644 --- a/packages/api/lambdas/cleanExecutions.js +++ b/packages/api/lambdas/cleanExecutions.js @@ -2,107 +2,17 @@ 'use strict'; -const { getEsClient, esConfig } = require('@cumulus/es-client/search'); -const moment = require('moment'); +/** + * This lambda has a dummy handler because it needs to be rewritten for PG instead of running + * in ElasticSearch. This will be done in CUMULUS-3982. + * When this is being rewritten, redo the test file also. + */ + const Logger = require('@cumulus/logger'); -const { sleep } = require('@cumulus/common'); const log = new Logger({ sender: '@cumulus/api/lambdas/cleanExecutions', }); -/** - * @typedef {import('@cumulus/db').PostgresExecutionRecord} PostgresExecutionRecord - * @typedef {import('knex').Knex} Knex - */ - -/** - * Extract expiration dates and identify greater and lesser bounds - * - * @param {number} payloadTimeout - Maximum number of days a record should be held onto - * @returns {Date} - */ -const getExpirationDate = ( - payloadTimeout -) => moment().subtract(payloadTimeout, 'days').toDate(); - -/** - * Clean up Elasticsearch executions that have expired - * - * @param {number} payloadTimeout - Maximum number of days a record should be held onto - * @param {boolean} cleanupRunning - Enable removal of running execution - * payloads - * @param {boolean} cleanupNonRunning - Enable removal of execution payloads for - * statuses other than 'running' - * @param {number} updateLimit - maximum number of records to update - * @param {string} index - Elasticsearch index to cleanup - * @returns {Promise} -*/ -const cleanupExpiredESExecutionPayloads = async ( - payloadTimeout, - cleanupRunning, - cleanupNonRunning, - updateLimit, - index -) => { - const _expiration = getExpirationDate(payloadTimeout); - const expiration = _expiration.getTime(); - - const must = [ - { range: { updatedAt: { lte: expiration } } }, - { - bool: { - should: [ - { exists: { field: 'finalPayload' } }, - { exists: { field: 'originalPayload' } }, - ], - }, - }, - ]; - const mustNot = []; - - if (cleanupRunning && !cleanupNonRunning) { - must.push({ term: { status: 'running' } }); - } else if (!cleanupRunning && cleanupNonRunning) { - mustNot.push({ term: { status: 'running' } }); - } - const removePayloadScript = "ctx._source.remove('finalPayload'); ctx._source.remove('originalPayload')"; - - const script = { inline: removePayloadScript }; - const body = { - query: { - bool: { - must, - mustNot, - }, - }, - script: script, - }; - const esClient = await getEsClient(); - const [{ node }] = await esConfig(); - // this launches the job for ES to perform, asynchronously - const updateTask = await esClient._client.updateByQuery({ - index, - type: 'execution', - size: updateLimit, - body, - conflicts: 'proceed', - wait_for_completion: false, - refresh: true, - }); - let taskStatus; - // this async and poll method allows us to avoid http timeouts - // and persist in case of lambda timeout - log.info(`launched async elasticsearch task id ${updateTask.body.task} - to check on this task outside this lambda, or to stop this task run the following`); - log.info(` > curl --request GET ${node}/_tasks/${updateTask.body.task}`); - log.info(` > curl --request POST ${node}/_tasks/${updateTask.body.task}/_cancel`); - do { - sleep(10000); - // eslint-disable-next-line no-await-in-loop - taskStatus = await esClient._client?.tasks.get({ task_id: updateTask.body.task }); - } while (taskStatus?.body.completed === false); - log.info(`elasticsearch task completed with status ${JSON.stringify(taskStatus?.body.task.status)}`); -}; /** * parse out environment variable configuration * @returns {{ @@ -135,33 +45,9 @@ const parseEnvironment = () => { }; }; -/** - * parse environment variables to extract configuration and run cleanup of ES executions - * - * @returns {Promise} - */ -async function cleanExecutionPayloads() { +function handler(_event) { const envConfig = parseEnvironment(); - log.info(`running cleanExecutions with configuration ${JSON.stringify(envConfig)}`); - const { - updateLimit, - cleanupRunning, - cleanupNonRunning, - payloadTimeout, - esIndex, - } = envConfig; - - await cleanupExpiredESExecutionPayloads( - payloadTimeout, - cleanupRunning, - cleanupNonRunning, - updateLimit, - esIndex - ); -} - -async function handler(_event) { - return await cleanExecutionPayloads(); + log.info(`running empty (to be updated) cleanExecutions with configuration ${JSON.stringify(envConfig)}`); } if (require.main === module) { @@ -176,7 +62,4 @@ if (require.main === module) { module.exports = { handler, - cleanExecutionPayloads, - getExpirationDate, - cleanupExpiredESExecutionPayloads, }; diff --git a/packages/api/lambdas/create-reconciliation-report-types.js b/packages/api/lambdas/create-reconciliation-report-types.js index 2ba35b18596..eac05e81296 100644 --- a/packages/api/lambdas/create-reconciliation-report-types.js +++ b/packages/api/lambdas/create-reconciliation-report-types.js @@ -5,7 +5,6 @@ /** * @typedef {Object} Env * @property {string} [CONCURRENCY] - The concurrency level for processing. - * @property {string} [ES_INDEX] - The Elasticsearch index. * @property {string} [AWS_REGION] - The AWS region. * @property {string} [AWS_ACCESS_KEY_ID] - The AWS access key ID. * @property {string} [AWS_SECRET_ACCESS_KEY] - The AWS secret access key. diff --git a/packages/api/lambdas/create-reconciliation-report.js b/packages/api/lambdas/create-reconciliation-report.js index 3822420a9fe..0269898bd06 100644 --- a/packages/api/lambdas/create-reconciliation-report.js +++ b/packages/api/lambdas/create-reconciliation-report.js @@ -29,8 +29,6 @@ const { translatePostgresFileToApiFile, } = require('@cumulus/db'); const Logger = require('@cumulus/logger'); -const { getEsClient } = require('@cumulus/es-client/search'); -const { indexReconciliationReport } = require('@cumulus/es-client/indexer'); const { ReconciliationReportPgModel, @@ -54,7 +52,6 @@ const isDataBucket = (bucketConfig) => ['private', 'public', 'protected'].includ /** * @typedef {typeof process.env } ProcessEnv * @typedef {import('knex').Knex} Knex - * @typedef {import('@cumulus/es-client/search').EsClient} EsClient * @typedef {import('../lib/types').NormalizedRecReportParams } NormalizedRecReportParams * @typedef {import('../lib/types').EnhancedNormalizedRecReportParams} * EnhancedNormalizedRecReportParams @@ -870,7 +867,6 @@ async function createReconciliationReport(recReportParams) { * @param {string} params.reportName - the name of the report * @param {Env} params.env - the environment variables * @param {Knex} params.knex - Optional Instance of a Knex client for testing - * @param {EsClient} params.esClient - Optional Instance of an Elasticsearch client for testing * @returns {Promise} report record saved to the database */ async function processRequest(params) { @@ -882,7 +878,6 @@ async function processRequest(params) { systemBucket, stackName, knex = await getKnexClient({ env }), - esClient = await getEsClient(), } = params; const createStartTime = moment.utc(); const reportRecordName = reportName @@ -900,9 +895,9 @@ async function processRequest(params) { location: buildS3Uri(systemBucket, reportKey), }; let [reportPgRecord] = await reconciliationReportPgModel.create(knex, builtReportRecord); + // api format was being logged prior to ES removal, so keeping format for consistency let reportApiRecord = translatePostgresReconReportToApiReconReport(reportPgRecord); - await indexReconciliationReport(esClient, reportApiRecord, process.env.ES_INDEX); - log.info(`Report added to database as pending: ${JSON.stringify(reportApiRecord)}.`); + log.info(`Report added to database as Pending: ${JSON.stringify(reportApiRecord)}.`); const concurrency = env.CONCURRENCY || '3'; @@ -936,8 +931,6 @@ async function processRequest(params) { status: 'Generated', }; [reportPgRecord] = await reconciliationReportPgModel.upsert(knex, generatedRecord); - reportApiRecord = translatePostgresReconReportToApiReconReport(reportPgRecord); - await indexReconciliationReport(esClient, reportApiRecord, process.env.ES_INDEX); } catch (error) { log.error(`Error caught in createReconciliationReport creating ${reportType} report ${reportRecordName}. ${error}`); // eslint-disable-line max-len const erroredRecord = { @@ -951,16 +944,14 @@ async function processRequest(params) { }; [reportPgRecord] = await reconciliationReportPgModel.upsert(knex, erroredRecord); reportApiRecord = translatePostgresReconReportToApiReconReport(reportPgRecord); - await indexReconciliationReport( - esClient, - reportApiRecord, - process.env.ES_INDEX - ); + log.error(`Report updated in database as Failed including error: ${JSON.stringify(reportApiRecord)}`); throw error; } reportPgRecord = await reconciliationReportPgModel.get(knex, { name: builtReportRecord.name }); - return translatePostgresReconReportToApiReconReport(reportPgRecord); + reportApiRecord = translatePostgresReconReportToApiReconReport(reportPgRecord); + log.info(`Report updated in database as Generated: ${JSON.stringify(reportApiRecord)}.`); + return reportApiRecord; } async function handler(event) { @@ -968,10 +959,9 @@ async function handler(event) { process.env.CMR_LIMIT = process.env.CMR_LIMIT || '5000'; process.env.CMR_PAGE_SIZE = process.env.CMR_PAGE_SIZE || '200'; - //TODO: Remove irrelevant env vars from terraform after ES reports are removed - const varsToLog = ['CMR_LIMIT', 'CMR_PAGE_SIZE', 'ES_SCROLL', 'ES_SCROLL_SIZE']; + const varsToLog = ['CMR_LIMIT', 'CMR_PAGE_SIZE']; const envsToLog = pickBy(process.env, (value, key) => varsToLog.includes(key)); - log.info(`CMR and ES Environment variables: ${JSON.stringify(envsToLog)}`); + log.info(`CMR Environment variables: ${JSON.stringify(envsToLog)}`); return await processRequest(event); } diff --git a/packages/api/lambdas/index-from-database.js b/packages/api/lambdas/index-from-database.js deleted file mode 100644 index e0666992a18..00000000000 --- a/packages/api/lambdas/index-from-database.js +++ /dev/null @@ -1,324 +0,0 @@ -'use strict'; - -const isNil = require('lodash/isNil'); -const pLimit = require('p-limit'); - -const DynamoDbSearchQueue = require('@cumulus/aws-client/DynamoDbSearchQueue'); -const log = require('@cumulus/common/log'); - -const { getEsClient } = require('@cumulus/es-client/search'); -const { - CollectionPgModel, - ExecutionPgModel, - AsyncOperationPgModel, - GranulePgModel, - ProviderPgModel, - RulePgModel, - PdrPgModel, - getKnexClient, - translatePostgresCollectionToApiCollection, - translatePostgresExecutionToApiExecution, - translatePostgresAsyncOperationToApiAsyncOperation, - translatePostgresGranuleToApiGranule, - translatePostgresProviderToApiProvider, - translatePostgresPdrToApiPdr, - translatePostgresRuleToApiRule, -} = require('@cumulus/db'); -const indexer = require('@cumulus/es-client/indexer'); - -/** - * Return specified concurrency for ES requests. - * - * Returned value is used with [p-limit](https://github.com/sindresorhus/p-limit), which - * does not accept 0. - * - * @param {Object} event - Incoming Lambda event - * @returns {number} - Specified request concurrency. Defaults to 10. - * @throws {TypeError} - */ -const getEsRequestConcurrency = (event) => { - if (!isNil(event.esRequestConcurrency)) { - const parsedValue = Number.parseInt(event.esRequestConcurrency, 10); - - if (Number.isInteger(parsedValue) && parsedValue > 0) { - return parsedValue; - } - - throw new TypeError('event.esRequestConcurrency must be an integer greater than 0'); - } - - if (!isNil(process.env.ES_CONCURRENCY)) { - const parsedValue = Number.parseInt(process.env.ES_CONCURRENCY, 10); - - if (Number.isInteger(parsedValue) && parsedValue > 0) { - return parsedValue; - } - - throw new TypeError('The ES_CONCURRENCY environment variable must be an integer greater than 0'); - } - - return 10; -}; - -// Legacy method used for indexing Reconciliation Reports only -async function indexReconciliationReports({ - esClient, - tableName, - esIndex, - indexFn, - limitEsRequests, -}) { - const scanQueue = new DynamoDbSearchQueue({ - TableName: tableName, - }); - - let itemsComplete = false; - let totalItemsIndexed = 0; - - /* eslint-disable no-await-in-loop */ - while (itemsComplete === false) { - await scanQueue.fetchItems(); - - itemsComplete = scanQueue.items[scanQueue.items.length - 1] === null; - - if (itemsComplete) { - // pop the null item off - scanQueue.items.pop(); - } - - if (scanQueue.items.length === 0) { - log.info(`No records to index for ${tableName}`); - return true; - } - - log.info(`Attempting to index ${scanQueue.items.length} records from ${tableName}`); - - const input = scanQueue.items.map( - (item) => limitEsRequests( - async () => { - try { - return await indexFn(esClient, item, esIndex); - } catch (error) { - log.error(`Error indexing record ${JSON.stringify(item)}, error: ${error}`); - return false; - } - } - ) - ); - const results = await Promise.all(input); - const successfulResults = results.filter((result) => result !== false); - totalItemsIndexed += successfulResults; - - log.info(`Completed index of ${successfulResults.length} records from ${tableName}`); - } - /* eslint-enable no-await-in-loop */ - - return totalItemsIndexed; -} - -/** -* indexModel - Index a postgres RDS table's contents to ElasticSearch -* -* @param {Object} params -- parameters -* @param {any} params.esClient -- ElasticSearch client -* @param {any} params.postgresModel -- @cumulus/db model -* @param {string} params.esIndex -- esIndex to write records to -* @param {any} params.indexFn -- Indexer function that maps to the database model -* @param {any} params.limitEsRequests -- limitEsRequests method (used for testing) -* @param {Knex} params.knex -- configured knex instance -* @param {any} params.translationFunction -- function to translate postgres record -* to API record for ES -* @param {number} params.pageSize -- Page size for postgres pagination -* @returns {number} -- number of items indexed -*/ -async function indexModel({ - esClient, - postgresModel, - esIndex, - indexFn, - limitEsRequests, - knex, - translationFunction, - pageSize, -}) { - let startId = 1; - let totalItemsIndexed = 0; - let done; - let maxIndex = await postgresModel.getMaxCumulusId(knex); - let failCount = 0; - - log.info(`Starting index of ${postgresModel.tableName} with max cumulus_id of ${maxIndex}`); - /* eslint-disable no-await-in-loop */ - while (done !== true && maxIndex > 0) { - const pageResults = await postgresModel.paginateByCumulusId(knex, startId, pageSize); - log.info( - `Attempting to index ${pageResults.length} records from ${postgresModel.tableName}` - ); - - const indexPromises = pageResults.map((pageResult) => limitEsRequests(async () => { - let translationResult; - try { - translationResult = await translationFunction(pageResult); - await esClient.refreshClient(); - return await indexFn(esClient, translationResult, esIndex); - } catch (error) { - log.error( - `Error indexing record ${JSON.stringify(translationResult)}, error: ${error.message}` - ); - return false; - } - })); - - const results = await Promise.all(indexPromises); - const successfulResults = results.filter((result) => result !== false); - failCount += (results.length - successfulResults.length); - - totalItemsIndexed += successfulResults.length; - - log.info(`Completed index of ${successfulResults.length} records from ${postgresModel.tableName}`); - startId += pageSize; - if (startId > maxIndex) { - startId = maxIndex; - log.info(`Continuing indexing from cumulus_id ${startId} to account for new rows from ${postgresModel.tableName}`); - const oldMaxIndex = maxIndex; - maxIndex = await postgresModel.getMaxCumulusId(knex); - if (maxIndex <= oldMaxIndex) { - done = true; - } - } - } - /* eslint-enable no-await-in-loop */ - log.info(`Completed successful index of ${totalItemsIndexed} records from ${postgresModel.tableName}`); - if (failCount) { - log.warn(`${failCount} records failed indexing from ${postgresModel.tableName}`); - } - return totalItemsIndexed; -} - -async function indexFromDatabase(event) { - const { - indexName: esIndex, - esHost = process.env.ES_HOST, - reconciliationReportsTable = process.env.ReconciliationReportsTable, - postgresResultPageSize, - postgresConnectionPoolSize, - } = event; - const esClient = await getEsClient(esHost); - const knex = event.knex || (await getKnexClient({ - env: { - dbMaxPool: Number.parseInt(postgresConnectionPoolSize, 10) || 10, - ...process.env, - }, - })); - - const pageSize = Number.parseInt(postgresResultPageSize, 10) || 1000; - const esRequestConcurrency = getEsRequestConcurrency(event); - log.info( - `Tuning configuration: esRequestConcurrency: ${esRequestConcurrency}, postgresResultPageSize: ${pageSize}, postgresConnectionPoolSize: ${postgresConnectionPoolSize}` - ); - - const limitEsRequests = pLimit(esRequestConcurrency); - - await Promise.all([ - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexCollection, - limitEsRequests, - postgresModel: new CollectionPgModel(), - translationFunction: translatePostgresCollectionToApiCollection, - knex, - pageSize, - }), - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexExecution, - limitEsRequests, - postgresModel: new ExecutionPgModel(), - translationFunction: (record) => - translatePostgresExecutionToApiExecution(record, knex), - knex, - pageSize, - }), - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexAsyncOperation, - limitEsRequests, - postgresModel: new AsyncOperationPgModel(), - translationFunction: translatePostgresAsyncOperationToApiAsyncOperation, - knex, - pageSize, - }), - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexGranule, - limitEsRequests, - postgresModel: new GranulePgModel(), - translationFunction: (record) => - translatePostgresGranuleToApiGranule({ - granulePgRecord: record, - knexOrTransaction: knex, - }), - knex, - pageSize, - }), - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexPdr, - limitEsRequests, - postgresModel: new PdrPgModel(), - translationFunction: (record) => - translatePostgresPdrToApiPdr(record, knex), - knex, - pageSize, - }), - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexProvider, - limitEsRequests, - postgresModel: new ProviderPgModel(), - translationFunction: translatePostgresProviderToApiProvider, - knex, - pageSize, - }), - indexReconciliationReports({ - esClient, - tableName: reconciliationReportsTable, - esIndex, - indexFn: indexer.indexReconciliationReport, - limitEsRequests, - }), - indexModel({ - esClient, - esIndex, - indexFn: indexer.indexRule, - limitEsRequests, - postgresModel: new RulePgModel(), - translationFunction: (record) => - translatePostgresRuleToApiRule(record, knex), - knex, - pageSize, - }), - ]); -} - -async function handler(event) { - log.info(`Starting index from database for index ${event.indexName}`); - - await indexFromDatabase(event); - - log.info('Index from database complete'); - - return 'Index from database complete'; -} - -module.exports = { - handler, - indexFromDatabase, - getEsRequestConcurrency, -}; diff --git a/packages/api/lambdas/process-s3-dead-letter-archive.js b/packages/api/lambdas/process-s3-dead-letter-archive.js index ea0dc3d5541..fedec5b4cf9 100644 --- a/packages/api/lambdas/process-s3-dead-letter-archive.js +++ b/packages/api/lambdas/process-s3-dead-letter-archive.js @@ -4,9 +4,6 @@ const pSettle = require('p-settle'); const log = require('@cumulus/common/log'); -const { - getEsClient, -} = require('@cumulus/es-client/search'); const S3 = require('@cumulus/aws-client/S3'); const { s3 } = require('@cumulus/aws-client/services'); const { getJsonS3Object, deleteS3Object } = require('@cumulus/aws-client/S3'); @@ -101,13 +98,10 @@ async function processDeadLetterArchive({ let continuationToken; let allSuccessKeys = []; const allFailedKeys = []; - const esClient = await getEsClient(); let batchNumber = 1; /* eslint-disable no-await-in-loop */ do { log.info(`Processing batch ${batchNumber}`); - // Refresh ES client to avoid credentials timeout for long running processes - esClient.refreshClient(); listObjectsResponse = await s3().listObjectsV2({ Bucket: bucket, Prefix: path, @@ -120,7 +114,7 @@ async function processDeadLetterArchive({ const deadLetterMessage = await getJsonS3Object(bucket, deadLetterObject.Key); const cumulusMessage = await unwrapDeadLetterCumulusMessage(deadLetterMessage); try { - await writeRecordsFunction({ cumulusMessage, knex, esClient }); + await writeRecordsFunction({ cumulusMessage, knex }); return deadLetterObject.Key; } catch (error) { log.error(`Failed to write records from cumulusMessage for dead letter ${deadLetterObject.Key} due to '${error}'`); diff --git a/packages/api/lambdas/sf-event-sqs-to-db-records/index.js b/packages/api/lambdas/sf-event-sqs-to-db-records/index.js index e1609c90bc9..411079ae937 100644 --- a/packages/api/lambdas/sf-event-sqs-to-db-records/index.js +++ b/packages/api/lambdas/sf-event-sqs-to-db-records/index.js @@ -52,14 +52,12 @@ const log = new Logger({ sender: '@cumulus/api/lambdas/sf-event-sqs-to-db-record * @param {Object} params * @param {Object} params.cumulusMessage - Cumulus workflow message * @param {Knex} params.knex - Knex client - * @param {EsClient} params.esClient - Elasticsearch client * @param {Object} [params.testOverrides] * Optional override/mock object used for testing */ const writeRecords = async ({ cumulusMessage, knex, - esClient, testOverrides = {}, }) => { const messageCollectionNameVersion = getCollectionNameAndVersionFromMessage(cumulusMessage); @@ -103,7 +101,6 @@ const writeRecords = async ({ asyncOperationCumulusId, parentExecutionCumulusId, knex, - esClient, }); const providerCumulusId = await getMessageProviderCumulusId(cumulusMessage, knex); @@ -114,13 +111,11 @@ const writeRecords = async ({ providerCumulusId, knex, executionCumulusId, - esClient, }); return writeGranulesFromMessage({ cumulusMessage, executionCumulusId, - esClient, knex, testOverrides, }); diff --git a/packages/api/lambdas/sf-event-sqs-to-db-records/write-pdr.js b/packages/api/lambdas/sf-event-sqs-to-db-records/write-pdr.js index 26fb565ac0e..262b7377a5d 100644 --- a/packages/api/lambdas/sf-event-sqs-to-db-records/write-pdr.js +++ b/packages/api/lambdas/sf-event-sqs-to-db-records/write-pdr.js @@ -5,8 +5,6 @@ const { PdrPgModel, translatePostgresPdrToApiPdr, } = require('@cumulus/db'); -const { upsertPdr } = require('@cumulus/es-client/indexer'); -const { getEsClient } = require('@cumulus/es-client/search'); const { getMessagePdrName, messageHasPdr, @@ -14,7 +12,6 @@ const { getMessagePdrPANSent, getMessagePdrPANMessage, getPdrPercentCompletion, - generatePdrApiRecordFromMessage, } = require('@cumulus/message/PDRs'); const { getMetaStatus, @@ -87,23 +84,6 @@ const writePdrViaTransaction = async ({ return pdr; }; -const writePdrToEs = async (params) => { - const { - cumulusMessage, - updatedAt = Date.now(), - esClient = await getEsClient(), - } = params; - const pdrApiRecord = generatePdrApiRecordFromMessage(cumulusMessage, updatedAt); - if (!pdrApiRecord) { - return; - } - await upsertPdr({ - esClient, - updates: pdrApiRecord, - index: process.env.ES_INDEX, - }); -}; - const writePdr = async ({ cumulusMessage, collectionCumulusId, @@ -111,7 +91,6 @@ const writePdr = async ({ executionCumulusId, knex, updatedAt = Date.now(), - esClient, }) => { let pgPdr; // If there is no PDR in the message, then there's nothing to do here, which is fine @@ -133,11 +112,6 @@ const writePdr = async ({ executionCumulusId, updatedAt, }); - await writePdrToEs({ - cumulusMessage, - updatedAt, - esClient, - }); return pgPdr.cumulus_id; }); const pdrToPublish = await translatePostgresPdrToApiPdr(pgPdr, knex); @@ -149,5 +123,4 @@ module.exports = { generatePdrRecord, writePdrViaTransaction, writePdr, - writePdrToEs, }; diff --git a/packages/api/lib/granules.js b/packages/api/lib/granules.js index 1a9fbfc4e1f..ee0b908d0e8 100644 --- a/packages/api/lib/granules.js +++ b/packages/api/lib/granules.js @@ -234,10 +234,10 @@ function getTotalHits(bodyHits) { } /** - * Returns an array of granules from ElasticSearch query + * Returns an array of granules from an ElasticSearch query * * @param {Object} payload - * @param {string} [payload.index] - ES index to query + * @param {string} [payload.index] - ES index to query (Cloud Metrics) * @param {string} [payload.query] - ES query * @param {Object} [payload.source] - List of IDs to operate on * @param {Object} [payload.testBodyHits] - Optional body.hits for testing. @@ -284,12 +284,12 @@ async function granuleEsQuery({ index, query, source, testBodyHits }) { /** * Return a unique list of granules based on the provided list or the response from the - * query to ES using the provided query and index. + * query to ES (Cloud Metrics) using the provided query and index. * * @param {Object} payload * @param {Object} [payload.granules] - Optional list of granules with granuleId and collectionId - * @param {Object} [payload.query] - Optional parameter of query to send to ES - * @param {string} [payload.index] - Optional parameter of ES index to query. + * @param {Object} [payload.query] - Optional parameter of query to send to ES (Cloud Metrics) + * @param {string} [payload.index] - Optional parameter of ES index to query (Cloud Metrics). * Must exist if payload.query exists. * @returns {Promise>} */ @@ -297,7 +297,7 @@ async function getGranulesForPayload(payload) { const { granules, index, query } = payload; const queryGranules = granules || []; - // query ElasticSearch if needed + // query ElasticSearch (Cloud Metrics) if needed if (queryGranules.length === 0 && query) { log.info('No granules detected. Searching for granules in ElasticSearch.'); diff --git a/packages/api/lib/mmt.js b/packages/api/lib/mmt.js index d37ddc4db77..68581bcc2f0 100644 --- a/packages/api/lib/mmt.js +++ b/packages/api/lib/mmt.js @@ -42,16 +42,16 @@ const buildMMTLink = (conceptId, cmrEnv = process.env.CMR_ENVIRONMENT) => { }; /** - * Updates the Collection query results from ES with an MMTLink when the + * Updates the Collection query results with a MMTLink when the * matching CMR entry contains a collection_id. * - * @param {Array} esResults - collection query results from Cumulus' elasticsearch + * @param {Array} queryResults - collection query results from Cumulus DB * @param {Array} cmrEntries - cmr response feed entry that should match the * results collections - * @returns {Array} - Array of shallow clones of esResults objects with + * @returns {Array} - Array of shallow clones of queryResults objects with * MMTLinks added to them */ -const updateResponseWithMMT = (esResults, cmrEntries) => esResults.map((res) => { +const updateResponseWithMMT = (queryResults, cmrEntries) => queryResults.map((res) => { const matchedCmr = cmrEntries.filter( (entry) => entry.short_name === res.name && entry.version_id === res.version ); @@ -61,7 +61,7 @@ const updateResponseWithMMT = (esResults, cmrEntries) => esResults.map((res) => }); /** - * Simplifies and transforms The returned ES results from a collection query + * Simplifies and transforms the results from a collection query * into a list of objects suitable for a compound call to CMR to retrieve * collection_id information. * Transforms each object in the results array into an new object. @@ -69,7 +69,7 @@ const updateResponseWithMMT = (esResults, cmrEntries) => esResults.map((res) => * inputObject.version => outputObject.version * all other input object keys are dropped. * - * @param {Object} results - The elasticsearch results array returned from either + * @param {Object} results - The results array returned from either * Collection.query() or Collection.queryCollectionsWithActiveGranules() * @returns {Arary} - list of Objects with two keys (short_name and version). */ @@ -80,10 +80,10 @@ const parseResults = (results) => })); /** - * parses the elasticsearch collection lists and for each result inserts a "MMTLink" + * parses the query collection lists and for each result inserts a "MMTLink" * into the collection object. * - * @param {Object} inputResponse - an elasticsearch reponse returned from either + * @param {Object} inputResponse - a reponse returned from either * Collection.query() or Collection.queryCollectionsWithActiveGranules() * @returns {Object} a copy of input response object where each collection * has been updated to include a link to the Metadata Management Tool diff --git a/packages/api/lib/orca.js b/packages/api/lib/orca.js index dcc22aa148f..ea075c0cc7b 100644 --- a/packages/api/lib/orca.js +++ b/packages/api/lib/orca.js @@ -85,7 +85,7 @@ const getOrcaRecoveryStatusByGranuleIdAndCollection = async (granuleId, collecti /** * add recovery status for each granule in the granule list response * - * @param {Object} inputResponse - an elasticsearch response returned from granules query + * @param {Object} inputResponse - a response returned from a granules query * @returns {Object} a copy of input response object where each granule * has been updated to include orca recovery status */ diff --git a/packages/api/lib/reconciliationReport.js b/packages/api/lib/reconciliationReport.js index fdacf692709..f063d3112ad 100644 --- a/packages/api/lib/reconciliationReport.js +++ b/packages/api/lib/reconciliationReport.js @@ -2,9 +2,6 @@ 'use strict'; -const isEqual = require('lodash/isEqual'); -const omit = require('lodash/omit'); - const { removeNilProperties } = require('@cumulus/common/util'); const { constructCollectionId, deconstructCollectionId } = require('@cumulus/message/Collections'); const Logger = require('@cumulus/logger'); @@ -183,49 +180,6 @@ function filterDBCollections(collections, recReportParams) { return collections; } -/** - * Compare granules from Elasticsearch and API for deep equality. - * - * @param {Object} esGranule - Granule from Elasticsearch - * @param {Object} apiGranule - API Granule (translated from PostgreSQL) - * @returns {boolean} - */ -function compareEsGranuleAndApiGranule(esGranule, apiGranule) { - // Ignore files in initial comparison so we can ignore file order - // in comparison - const fieldsIgnored = ['timestamp', 'updatedAt', 'files']; - // "dataType" and "version" fields do not exist in the PostgreSQL database - // granules table which is now the source of truth - const esFieldsIgnored = [...fieldsIgnored, 'dataType', 'version']; - const granulesAreEqual = isEqual( - omit(esGranule, esFieldsIgnored), - omit(apiGranule, fieldsIgnored) - ); - - if (granulesAreEqual === false) return granulesAreEqual; - - const esGranulesHasFiles = esGranule.files !== undefined; - const apiGranuleHasFiles = apiGranule.files.length !== 0; - - // If neither granule has files, then return the previous equality result - if (!esGranulesHasFiles && !apiGranuleHasFiles) return granulesAreEqual; - // If either ES or PG granule does not have files, but the other granule does - // have files, then the granules don't match, so return false - if ((esGranulesHasFiles && !apiGranuleHasFiles) - || (!esGranulesHasFiles && apiGranuleHasFiles)) { - return false; - } - - // Compare files one-by-one to ignore sort order for comparison - return esGranule.files.every((esFile) => { - const matchingFile = apiGranule.files.find( - (apiFile) => apiFile.bucket === esFile.bucket && apiFile.key === esFile.key - ); - if (!matchingFile) return false; - return isEqual(esFile, matchingFile); - }); -} - module.exports = { cmrGranuleSearchParams, convertToDBCollectionSearchObject, @@ -233,5 +187,4 @@ module.exports = { convertToOrcaGranuleSearchParams, filterDBCollections, initialReportHeader, - compareEsGranuleAndApiGranule, }; diff --git a/packages/api/lib/testUtils.js b/packages/api/lib/testUtils.js index e13ba2132ea..506948315ac 100644 --- a/packages/api/lib/testUtils.js +++ b/packages/api/lib/testUtils.js @@ -21,9 +21,6 @@ const { translateApiRuleToPostgresRuleRaw, translatePostgresRuleToApiRule, } = require('@cumulus/db'); -const { - deleteExecution, -} = require('@cumulus/es-client/indexer'); const { constructCollectionId, } = require('@cumulus/message/Collections'); @@ -233,7 +230,7 @@ function fakeAsyncOperationFactory(params = {}) { taskArn: randomId('arn'), id: uuidv4(), description: randomId('description'), - operationType: 'ES Index', + operationType: 'Reconciliation Report', status: 'SUCCEEDED', createdAt: Date.now() - 180.5 * 1000, updatedAt: Date.now(), @@ -620,22 +617,6 @@ const createAsyncOperationTestRecords = async (context) => { }; }; -const cleanupExecutionTestRecords = async (context, { arn }) => { - const { - knex, - executionPgModel, - esClient, - esIndex, - } = context; - - await executionPgModel.delete(knex, { arn }); - await deleteExecution({ - esClient, - arn, - index: esIndex, - }); -}; - module.exports = { createFakeJwtAuthToken, createSqsQueues, @@ -666,6 +647,5 @@ module.exports = { createRuleTestRecords, createPdrTestRecords, createExecutionTestRecords, - cleanupExecutionTestRecords, createAsyncOperationTestRecords, }; diff --git a/packages/api/tests/endpoints/providers/delete-provider.js b/packages/api/tests/endpoints/providers/delete-provider.js index 70e9f06e34f..ae3dcdd88a2 100644 --- a/packages/api/tests/endpoints/providers/delete-provider.js +++ b/packages/api/tests/endpoints/providers/delete-provider.js @@ -133,28 +133,6 @@ test('Deleting a provider removes the provider from postgres', async (t) => { t.false(await providerPgModel.exists(t.context.testKnex, { name })); }); -test('Deleting a provider that exists in PostgreSQL and not Elasticsearch succeeds', async (t) => { - const testPgProvider = fakeProviderRecordFactory(); - await t.context.providerPgModel - .create( - t.context.testKnex, - testPgProvider - ); - - await request(app) - .delete(`/providers/${testPgProvider.name}`) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.false( - await t.context.providerPgModel.exists( - t.context.testKnex, - { name: testPgProvider.name } - ) - ); -}); - test('Deleting a provider that does not exist in PostgreSQL returns a 404', async (t) => { const { status } = await request(app) .delete(`/providers/${randomString}`) diff --git a/packages/api/tests/endpoints/test-elasticsearch.js b/packages/api/tests/endpoints/test-elasticsearch.js deleted file mode 100644 index 811587e93c3..00000000000 --- a/packages/api/tests/endpoints/test-elasticsearch.js +++ /dev/null @@ -1,698 +0,0 @@ -'use strict'; - -const request = require('supertest'); -const test = require('ava'); -const get = require('lodash/get'); -const sinon = require('sinon'); - -const { - localStackConnectionEnv, - generateLocalTestDb, - destroyLocalTestDb, - migrationDir, -} = require('@cumulus/db'); -const awsServices = require('@cumulus/aws-client/services'); -const { - recursivelyDeleteS3Bucket, -} = require('@cumulus/aws-client/S3'); -const { randomString, randomId } = require('@cumulus/common/test-utils'); -const { IndexExistsError } = require('@cumulus/errors'); -const { bootstrapElasticSearch } = require('@cumulus/es-client/bootstrap'); -const { getEsClient, defaultIndexAlias } = require('@cumulus/es-client/search'); -const mappings = require('@cumulus/es-client/config/mappings.json'); -const startAsyncOperation = require('../../lib/startAsyncOperation'); - -const models = require('../../models'); -const assertions = require('../../lib/assertions'); -const { - createFakeJwtAuthToken, - setAuthorizedOAuthUsers, -} = require('../../lib/testUtils'); - -const esIndex = randomId('esindex'); - -process.env.AccessTokensTable = randomString(); -process.env.TOKEN_SECRET = randomString(); -process.env.stackName = randomString(); -process.env.system_bucket = randomString(); - -// import the express app after setting the env variables -const { app } = require('../../app'); -const { indexFromDatabase } = require('../../endpoints/elasticsearch'); - -let jwtAuthToken; -let accessTokenModel; -let esClient; - -/** - * Index fake data - * - * @returns {undefined} - none - */ -async function indexData() { - const rules = [ - { name: 'Rule1' }, - { name: 'Rule2' }, - { name: 'Rule3' }, - ]; - - await Promise.all(rules.map(async (rule) => { - await esClient.client.index({ - index: esIndex, - type: 'rule', - id: rule.name, - body: rule, - }); - })); - - await esClient.client.indices.refresh(); -} - -/** - * Create and alias index by going through ES bootstrap - * - * @param {string} indexName - index name - * @param {string} aliasName - alias name - * @returns {undefined} - none - */ -async function createIndex(indexName, aliasName) { - await bootstrapElasticSearch({ - host: 'fakehost', - index: indexName, - alias: aliasName, - }); - esClient = await getEsClient(); -} - -const testDbName = randomId('elasticsearch'); - -test.before(async (t) => { - await awsServices.s3().createBucket({ Bucket: process.env.system_bucket }); - - const username = randomString(); - await setAuthorizedOAuthUsers([username]); - - accessTokenModel = new models.AccessToken(); - await accessTokenModel.createTable(); - - jwtAuthToken = await createFakeJwtAuthToken({ accessTokenModel, username }); - - t.context.esAlias = randomString(); - process.env.ES_INDEX = t.context.esAlias; - process.env = { - ...process.env, - ...localStackConnectionEnv, - PG_DATABASE: testDbName, - }; - - const { knex, knexAdmin } = await generateLocalTestDb(testDbName, migrationDir); - t.context.testKnex = knex; - t.context.testKnexAdmin = knexAdmin; - - // create the elasticsearch index and add mapping - await createIndex(esIndex, t.context.esAlias); - - await indexData(); -}); - -test.after.always(async (t) => { - await accessTokenModel.deleteTable(); - await esClient.client.indices.delete({ index: esIndex }); - await destroyLocalTestDb({ - knex: t.context.testKnex, - knexAdmin: t.context.testKnexAdmin, - testDbName, - }); - await recursivelyDeleteS3Bucket(process.env.system_bucket); -}); - -test('PUT snapshot without an Authorization header returns an Authorization Missing response', async (t) => { - const response = await request(app) - .post('/elasticsearch/create-snapshot') - .set('Accept', 'application/json') - .expect(401); - - assertions.isAuthorizationMissingResponse(t, response); -}); - -test('PUT snapshot with an invalid access token returns an unauthorized response', async (t) => { - const response = await request(app) - .post('/elasticsearch/create-snapshot') - .set('Accept', 'application/json') - .set('Authorization', 'Bearer ThisIsAnInvalidAuthorizationToken') - .expect(401); - - assertions.isInvalidAccessTokenResponse(t, response); -}); - -test.serial('Reindex - multiple aliases found', async (t) => { - // Prefixes for error message predictability - const indexName = `z-${randomString()}`; - const otherIndexName = `a-${randomString()}`; - - const aliasName = randomString(); - - await esClient.client.indices.create({ - index: indexName, - body: { mappings }, - }); - - await esClient.client.indices.putAlias({ - index: indexName, - name: aliasName, - }); - - await esClient.client.indices.create({ - index: otherIndexName, - body: { mappings }, - }); - - await esClient.client.indices.putAlias({ - index: otherIndexName, - name: aliasName, - }); - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ aliasName }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, `Multiple indices found for alias ${aliasName}. Specify source index as one of [${otherIndexName}, ${indexName}].`); - - await esClient.client.indices.delete({ index: indexName }); - await esClient.client.indices.delete({ index: otherIndexName }); -}); - -test.serial('Reindex - specify a source index that does not exist', async (t) => { - const { esAlias } = t.context; - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ aliasName: esAlias, sourceIndex: 'source-index' }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, 'Source index source-index does not exist.'); -}); - -test.serial('Reindex - specify a source index that is not aliased', async (t) => { - const { esAlias } = t.context; - const indexName = 'source-index'; - const destIndex = randomString(); - - await esClient.client.indices.create({ - index: indexName, - body: { mappings }, - }); - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ - aliasName: esAlias, - sourceIndex: indexName, - destIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.is(response.body.message, `Reindexing to ${destIndex} from ${indexName}. Check the reindex-status endpoint for status.`); - - // Check the reindex status endpoint to see if the operation has completed - let statusResponse = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - /* eslint-disable no-await-in-loop */ - while (Object.keys(statusResponse.body.reindexStatus.nodes).length > 0) { - statusResponse = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - } - /* eslint-enable no-await-in-loop */ - - await esClient.client.indices.delete({ index: indexName }); - await esClient.client.indices.delete({ index: destIndex }); -}); - -test.serial('Reindex request returns 400 with the expected message when source index matches destination index.', async (t) => { - const indexName = randomId('index'); - await esClient.client.indices.create({ - index: indexName, - body: { mappings }, - }); - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ destIndex: indexName, sourceIndex: indexName }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, `source index(${indexName}) and destination index(${indexName}) must be different.`); - await esClient.client.indices.delete({ index: indexName }); -}); - -test.serial('Reindex request returns 400 with the expected message when source index matches the default destination index.', async (t) => { - const date = new Date(); - const defaultIndexName = `cumulus-${date.getFullYear()}-${date.getMonth() + 1}-${date.getDate()}`; - - try { - await createIndex(defaultIndexName); - } catch (error) { - if (!(error instanceof IndexExistsError)) throw error; - } - - t.teardown(async () => { - await esClient.client.indices.delete({ index: defaultIndexName }); - }); - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ sourceIndex: defaultIndexName }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, `source index(${defaultIndexName}) and destination index(${defaultIndexName}) must be different.`); -}); - -test.serial('Reindex success', async (t) => { - const { esAlias } = t.context; - const destIndex = randomString(); - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ - aliasName: esAlias, - destIndex, - sourceIndex: esIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.is(response.body.message, `Reindexing to ${destIndex} from ${esIndex}. Check the reindex-status endpoint for status.`); - - // Check the reindex status endpoint to see if the operation has completed - let statusResponse = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - /* eslint-disable no-await-in-loop */ - while (Object.keys(statusResponse.body.reindexStatus.nodes).length > 0) { - statusResponse = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - } - /* eslint-enable no-await-in-loop */ - - const indexStatus = statusResponse.body.indexStatus.indices[destIndex]; - - t.is(3, indexStatus.primaries.docs.count); - - // Validate destination index mappings are correct - const fieldMappings = await esClient.client.indices.getMapping() - .then((mappingsResponse) => mappingsResponse.body); - - const sourceMapping = get(fieldMappings, esIndex); - const destMapping = get(fieldMappings, destIndex); - - t.deepEqual(sourceMapping.mappings, destMapping.mappings); - - await esClient.client.indices.delete({ index: destIndex }); -}); - -test.serial('Reindex - destination index exists', async (t) => { - const { esAlias } = t.context; - const destIndex = randomString(); - const newAlias = randomString(); - - await createIndex(destIndex, newAlias); - - const response = await request(app) - .post('/elasticsearch/reindex') - .send({ - aliasName: esAlias, - destIndex: destIndex, - sourceIndex: esIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.is(response.body.message, `Reindexing to ${destIndex} from ${esIndex}. Check the reindex-status endpoint for status.`); - - // Check the reindex status endpoint to see if the operation has completed - let statusResponse = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - /* eslint-disable no-await-in-loop */ - while (Object.keys(statusResponse.body.reindexStatus.nodes).length > 0) { - statusResponse = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - } - /* eslint-enable no-await-in-loop */ - - await esClient.client.indices.delete({ index: destIndex }); -}); - -test.serial('Reindex status, no task running', async (t) => { - const response = await request(app) - .get('/elasticsearch/reindex-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.deepEqual(response.body.reindexStatus, { nodes: {} }); -}); - -test.serial('Change index - no current', async (t) => { - const { esAlias } = t.context; - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName: esAlias, - newIndex: 'dest-index', - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, 'Please explicity specify a current and new index.'); -}); - -test.serial('Change index - no new', async (t) => { - const { esAlias } = t.context; - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName: esAlias, - currentIndex: 'source-index', - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, 'Please explicity specify a current and new index.'); -}); - -test.serial('Change index - current index does not exist', async (t) => { - const { esAlias } = t.context; - - const currentIndex = 'source-index'; - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName: esAlias, - currentIndex, - newIndex: 'dest-index', - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, `Current index ${currentIndex} does not exist.`); -}); - -test.serial('Change index - new index does not exist', async (t) => { - const { esAlias } = t.context; - - const newIndex = 'dest-index'; - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName: esAlias, - currentIndex: esIndex, - newIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.is(response.body.message, `Change index success - alias ${esAlias} now pointing to ${newIndex}`); - - await esClient.client.indices.delete({ index: newIndex }); -}); - -test.serial('Change index - current index same as new index', async (t) => { - const { esAlias } = t.context; - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName: esAlias, - currentIndex: 'source', - newIndex: 'source', - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(400); - - t.is(response.body.message, 'The current index cannot be the same as the new index.'); -}); - -test.serial('Change index', async (t) => { - const sourceIndex = randomString(); - const aliasName = randomString(); - const destIndex = randomString(); - - await createIndex(sourceIndex, aliasName); - - await request(app) - .post('/elasticsearch/reindex') - .send({ - aliasName, - sourceIndex, - destIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName, - currentIndex: sourceIndex, - newIndex: destIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.is(response.body.message, - `Change index success - alias ${aliasName} now pointing to ${destIndex}`); - - const alias = await esClient.client.indices.getAlias({ name: aliasName }) - .then((aliasResponse) => aliasResponse.body); - - // Test that the only index connected to the alias is the destination index - t.deepEqual(Object.keys(alias), [destIndex]); - - t.is((await esClient.client.indices.exists({ index: sourceIndex })).body, true); - - await esClient.client.indices.delete({ index: destIndex }); -}); - -test.serial('Change index and delete source index', async (t) => { - const sourceIndex = randomString(); - const aliasName = randomString(); - const destIndex = randomString(); - - await createIndex(sourceIndex, aliasName); - - await request(app) - .post('/elasticsearch/reindex') - .send({ - aliasName, - sourceIndex, - destIndex, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - const response = await request(app) - .post('/elasticsearch/change-index') - .send({ - aliasName, - currentIndex: sourceIndex, - newIndex: destIndex, - deleteSource: true, - }) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.is(response.body.message, - `Change index success - alias ${aliasName} now pointing to ${destIndex} and index ${sourceIndex} deleted`); - t.is((await esClient.client.indices.exists({ index: sourceIndex })).body, false); - - await esClient.client.indices.delete({ index: destIndex }); -}); - -test.serial('Reindex from database - startAsyncOperation is called with expected payload', async (t) => { - const indexName = randomString(); - const processEnv = { ...process.env }; - process.env.ES_HOST = 'fakeEsHost'; - process.env.ReconciliationReportsTable = 'fakeReportsTable'; - - const asyncOperationsStub = sinon.stub(startAsyncOperation, 'invokeStartAsyncOperationLambda'); - const payload = { - indexName, - esRequestConcurrency: 'fakeEsRequestConcurrency', - postgresResultPageSize: 'fakePostgresResultPageSize', - postgresConnectionPoolSize: 'fakePostgresConnectionPoolSize', - }; - - try { - await request(app) - .post('/elasticsearch/index-from-database') - .send( - payload - ) - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - t.deepEqual(asyncOperationsStub.getCall(0).args[0].payload, { - ...payload, - esHost: process.env.ES_HOST, - reconciliationReportsTable: process.env.ReconciliationReportsTable, - }); - } finally { - process.env = processEnv; - await esClient.client.indices.delete({ index: indexName }); - asyncOperationsStub.restore(); - } -}); - -test.serial('Indices status', async (t) => { - const indexName = `z-${randomString()}`; - const otherIndexName = `a-${randomString()}`; - - await esClient.client.indices.create({ - index: indexName, - body: { mappings }, - }); - - await esClient.client.indices.create({ - index: otherIndexName, - body: { mappings }, - }); - - const response = await request(app) - .get('/elasticsearch/indices-status') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.true(response.text.includes(indexName)); - t.true(response.text.includes(otherIndexName)); - - await esClient.client.indices.delete({ index: indexName }); - await esClient.client.indices.delete({ index: otherIndexName }); -}); - -test.serial('Current index - default alias', async (t) => { - const indexName = randomString(); - await createIndex(indexName, defaultIndexAlias); - t.teardown(() => esClient.client.indices.delete({ index: indexName })); - - const response = await request(app) - .get('/elasticsearch/current-index') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.true(response.body.includes(indexName)); -}); - -test.serial('Current index - custom alias', async (t) => { - const indexName = randomString(); - const customAlias = randomString(); - await createIndex(indexName, customAlias); - - const response = await request(app) - .get(`/elasticsearch/current-index/${customAlias}`) - .set('Authorization', `Bearer ${jwtAuthToken}`) - .expect(200); - - t.deepEqual(response.body, [indexName]); - - await esClient.client.indices.delete({ index: indexName }); -}); - -test.serial('request to /elasticsearch/index-from-database endpoint returns 500 if invoking StartAsyncOperation lambda throws unexpected error', async (t) => { - const asyncOperationStartStub = sinon.stub(startAsyncOperation, 'invokeStartAsyncOperationLambda').throws( - new Error('failed to start') - ); - - try { - const response = await request(app) - .post('/elasticsearch/index-from-database') - .set('Accept', 'application/json') - .set('Authorization', `Bearer ${jwtAuthToken}`) - .send({}); - t.is(response.status, 500); - } finally { - asyncOperationStartStub.restore(); - } -}); - -test.serial('indexFromDatabase request completes successfully', async (t) => { - const stub = sinon.stub(startAsyncOperation, 'invokeStartAsyncOperationLambda'); - const functionName = randomId('lambda'); - const fakeRequest = { - apiGateway: { - context: { - functionName, - }, - }, - body: { - indexName: t.context.esAlias, - }, - }; - - const fakeResponse = { - send: sinon.stub(), - }; - - await t.notThrowsAsync(indexFromDatabase(fakeRequest, fakeResponse)); - t.true(fakeResponse.send.called); - stub.restore(); -}); - -test.serial('indexFromDatabase uses correct caller lambda function name', async (t) => { - const stub = sinon.stub(startAsyncOperation, 'invokeStartAsyncOperationLambda'); - const functionName = randomId('lambda'); - const fakeRequest = { - apiGateway: { - context: { - functionName, - }, - }, - body: { - indexName: randomId('index'), - }, - }; - const fakeResponse = { - send: sinon.stub(), - }; - - await indexFromDatabase(fakeRequest, fakeResponse); - t.is(stub.getCall(0).firstArg.callerLambdaName, functionName); - stub.restore(); -}); diff --git a/packages/api/tests/endpoints/test-executions.js b/packages/api/tests/endpoints/test-executions.js index 96439c58b26..fac59d3f4d5 100644 --- a/packages/api/tests/endpoints/test-executions.js +++ b/packages/api/tests/endpoints/test-executions.js @@ -233,7 +233,7 @@ test.beforeEach(async (t) => { ]; // create fake Postgres granule records - // es records are for Metrics search + // es records are for Cloud Metrics search t.context.fakePGGranules = await Promise.all(t.context.fakeGranules.map(async (fakeGranule) => { await indexer.indexGranule(esClient, fakeGranule, esIndex); const granulePgRecord = await translateApiGranuleToPostgresGranule({ diff --git a/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-index.js b/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-index.js index dfa6d152e0b..15fafbe37ad 100644 --- a/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-index.js +++ b/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-index.js @@ -29,13 +29,6 @@ const { const { UnmetRequirementsError, } = require('@cumulus/errors'); -const { - Search, -} = require('@cumulus/es-client/search'); -const { - createTestIndex, - cleanupTestIndex, -} = require('@cumulus/es-client/testUtils'); const { constructCollectionId, } = require('@cumulus/message/Collections'); @@ -140,26 +133,6 @@ test.before(async (t) => { t.context.testKnex = knex; t.context.testKnexAdmin = knexAdmin; - const { esIndex, esClient } = await createTestIndex(); - t.context.esIndex = esIndex; - t.context.esClient = esClient; - - t.context.esExecutionsClient = new Search( - {}, - 'execution', - t.context.esIndex - ); - t.context.esPdrsClient = new Search( - {}, - 'pdr', - t.context.esIndex - ); - t.context.esGranulesClient = new Search( - {}, - 'granule', - t.context.esIndex - ); - t.context.collectionPgModel = new CollectionPgModel(); t.context.executionPgModel = new ExecutionPgModel(); t.context.granulePgModel = new GranulePgModel(); @@ -283,7 +256,6 @@ test.after.always(async (t) => { knexAdmin: t.context.testKnexAdmin, testDbName: t.context.testDbName, }); - await cleanupTestIndex(t.context); await sns().send(new DeleteTopicCommand({ TopicArn: ExecutionsTopicArn })); await sns().send(new DeleteTopicCommand({ TopicArn: PdrsTopicArn })); }); diff --git a/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-write-pdr.js b/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-write-pdr.js index 0dbe9e54272..8a44fcf3e4d 100644 --- a/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-write-pdr.js +++ b/packages/api/tests/lambdas/sf-event-sqs-to-db-records/test-write-pdr.js @@ -17,7 +17,6 @@ const { translatePostgresPdrToApiPdr, migrationDir, } = require('@cumulus/db'); -const { Search } = require('@cumulus/es-client/search'); const { createSnsTopic } = require('@cumulus/aws-client/SNS'); const { sns, sqs } = require('@cumulus/aws-client/services'); const { @@ -25,10 +24,6 @@ const { DeleteTopicCommand, } = require('@aws-sdk/client-sns'); const { ReceiveMessageCommand } = require('@aws-sdk/client-sqs'); -const { - createTestIndex, - cleanupTestIndex, -} = require('@cumulus/es-client/testUtils'); const { generatePdrRecord, @@ -45,15 +40,6 @@ test.before(async (t) => { ); t.context.knexAdmin = knexAdmin; t.context.knex = knex; - - const { esIndex, esClient } = await createTestIndex(); - t.context.esIndex = esIndex; - t.context.esClient = esClient; - t.context.esPdrClient = new Search( - {}, - 'pdr', - t.context.esIndex - ); }); test.beforeEach(async (t) => { @@ -166,7 +152,6 @@ test.after.always(async (t) => { await destroyLocalTestDb({ ...t.context, }); - await cleanupTestIndex(t.context); }); test('generatePdrRecord() generates correct PDR record', (t) => { @@ -320,7 +305,6 @@ test.serial('writePdr() does not update PDR record if update is from an older ex }); const pgRecord = await pdrPgModel.get(knex, { name: pdr.name }); - const esRecord = await t.context.esPdrClient.get(pdr.name); const stats = { processing: 0, @@ -330,10 +314,6 @@ test.serial('writePdr() does not update PDR record if update is from an older ex status: 'completed', stats, }); - t.like(esRecord, { - status: 'completed', - stats, - }); cumulusMessage.meta.status = 'running'; cumulusMessage.payload.running = ['arn2']; @@ -349,18 +329,13 @@ test.serial('writePdr() does not update PDR record if update is from an older ex }); const updatedPgRecord = await pdrPgModel.get(knex, { name: pdr.name }); - const updatedEsRecord = await t.context.esPdrClient.get(pdr.name); t.like(updatedPgRecord, { status: 'completed', stats, }); - t.like(updatedEsRecord, { - status: 'completed', - stats, - }); }); -test.serial('writePdr() saves a PDR record to PostgreSQL/Elasticsearch if PostgreSQL write is enabled', async (t) => { +test.serial('writePdr() saves a PDR record to PostgreSQL', async (t) => { const { cumulusMessage, knex, @@ -380,35 +355,9 @@ test.serial('writePdr() saves a PDR record to PostgreSQL/Elasticsearch if Postgr }); t.true(await pdrPgModel.exists(knex, { name: pdr.name })); - t.true(await t.context.esPdrClient.exists(pdr.name)); }); -test.serial('writePdr() saves a PDR record to PostgreSQL/Elasticsearch with same timestamps', async (t) => { - const { - cumulusMessage, - knex, - collectionCumulusId, - providerCumulusId, - executionCumulusId, - pdr, - pdrPgModel, - } = t.context; - - await writePdr({ - cumulusMessage, - collectionCumulusId, - providerCumulusId, - executionCumulusId: executionCumulusId, - knex, - }); - - const pgRecord = await pdrPgModel.get(knex, { name: pdr.name }); - const esRecord = await t.context.esPdrClient.get(pdr.name); - t.is(pgRecord.created_at.getTime(), esRecord.createdAt); - t.is(pgRecord.updated_at.getTime(), esRecord.updatedAt); -}); - -test.serial('writePdr() does not write to PostgreSQL/Elasticsearch if PostgreSQL write fails', async (t) => { +test.serial('writePdr() does not write to PostgreSQL if PostgreSQL write fails', async (t) => { const { cumulusMessage, knex, @@ -450,51 +399,6 @@ test.serial('writePdr() does not write to PostgreSQL/Elasticsearch if PostgreSQL ); t.false(await pdrPgModel.exists(knex, { name: pdr.name })); - t.false(await t.context.esPdrClient.exists(pdr.name)); -}); - -test.serial('writePdr() does not write to PostgreSQL/Elasticsearch if Elasticsearch write fails', async (t) => { - const { - cumulusMessage, - knex, - collectionCumulusId, - providerCumulusId, - pdrPgModel, - } = t.context; - - const pdr = { - name: cryptoRandomString({ length: 5 }), - PANSent: false, - PANmessage: 'test', - }; - cumulusMessage.payload = { - pdr, - }; - - cumulusMessage.meta.status = 'completed'; - - const fakeEsClient = { - initializeEsClient: () => Promise.resolve(), - client: { - update: () => { - throw new Error('PDR ES error'); - }, - }, - }; - - await t.throwsAsync( - writePdr({ - cumulusMessage, - collectionCumulusId, - providerCumulusId, - knex, - esClient: fakeEsClient, - }), - { message: 'PDR ES error' } - ); - - t.false(await pdrPgModel.exists(knex, { name: pdr.name })); - t.false(await t.context.esPdrClient.exists(pdr.name)); }); test.serial('writePdr() successfully publishes an SNS message', async (t) => { diff --git a/packages/api/tests/lambdas/test-bootstrap.js b/packages/api/tests/lambdas/test-bootstrap.js deleted file mode 100644 index c0797c6496b..00000000000 --- a/packages/api/tests/lambdas/test-bootstrap.js +++ /dev/null @@ -1,43 +0,0 @@ -const test = require('ava'); -const sinon = require('sinon'); - -const { handler } = require('../../lambdas/bootstrap'); - -test('handler calls bootstrapFunction with expected values', async (t) => { - const bootstrapFunctionStub = sinon.stub(); - const testContext = { - bootstrapFunction: bootstrapFunctionStub, - }; - - const hostName = 'fakehost'; - - const actual = await handler({ - testContext, - removeAliasConflict: true, - elasticsearchHostname: hostName, - }); - - t.deepEqual(actual, { Data: {}, Status: 'SUCCESS' }); - t.true(bootstrapFunctionStub.calledWith({ - host: hostName, - removeAliasConflict: true, - })); -}); - -test('handler throws with error/status on bootstrap function failure', async (t) => { - const errorMessage = 'Fake Error'; - const bootstrapFunctionStub = () => { - throw new Error(errorMessage); - }; - const testContext = { - bootstrapFunction: bootstrapFunctionStub, - }; - - const hostName = 'fakehost'; - - await t.throwsAsync(handler({ - testContext, - removeAliasConflict: true, - elasticsearchHostname: hostName, - }), { message: errorMessage }); -}); diff --git a/packages/api/tests/lambdas/test-bulk-granule-delete.js b/packages/api/tests/lambdas/test-bulk-granule-delete.js index c1f16faaad8..e42afe304dd 100644 --- a/packages/api/tests/lambdas/test-bulk-granule-delete.js +++ b/packages/api/tests/lambdas/test-bulk-granule-delete.js @@ -13,11 +13,7 @@ const { } = require('@cumulus/db'); const { createBucket, deleteS3Buckets } = require('@cumulus/aws-client/S3'); const { randomId, randomString } = require('@cumulus/common/test-utils'); -const { Search } = require('@cumulus/es-client/search'); -const { - createTestIndex, - cleanupTestIndex, -} = require('@cumulus/es-client/testUtils'); + const { sns, sqs } = require('@cumulus/aws-client/services'); const { SubscribeCommand, @@ -44,11 +40,6 @@ test.before(async (t) => { const { knex, knexAdmin } = await generateLocalTestDb(testDbName, migrationDir); t.context.knex = knex; t.context.knexAdmin = knexAdmin; - - const { esIndex, esClient } = await createTestIndex(); - t.context.esIndex = esIndex; - t.context.esClient = esClient; - t.context.esGranulesClient = new Search({}, 'granule', t.context.esIndex); }); test.beforeEach(async (t) => { @@ -87,7 +78,6 @@ test.after.always(async (t) => { knexAdmin: t.context.knexAdmin, testDbName, }); - await cleanupTestIndex(t.context); }); test('bulkGranuleDelete does not fail on published granules if payload.forceRemoveFromCmr is true', async (t) => { @@ -164,17 +154,6 @@ test('bulkGranuleDelete does not fail on published granules if payload.forceRemo { granule_id: pgGranuleId2, collection_cumulus_id: pgCollectionCumulusId2 } )); - t.false( - await t.context.esGranulesClient.exists( - pgGranuleId1 - ) - ); - t.false( - await t.context.esGranulesClient.exists( - pgGranuleId2 - ) - ); - const s3Buckets = granules[0].s3Buckets; t.teardown(() => deleteS3Buckets([ s3Buckets.protected.name, diff --git a/packages/api/tests/lambdas/test-bulk-operation.js b/packages/api/tests/lambdas/test-bulk-operation.js index 3b28d46c994..f858da57e74 100644 --- a/packages/api/tests/lambdas/test-bulk-operation.js +++ b/packages/api/tests/lambdas/test-bulk-operation.js @@ -49,7 +49,6 @@ const esSearchStub = sandbox.stub(); const esScrollStub = sandbox.stub(); FakeEsClient.prototype.scroll = esScrollStub; FakeEsClient.prototype.search = esSearchStub; - const bulkOperation = proxyquire('../../lambdas/bulk-operation', { '../lib/granules': proxyquire('../../lib/granules', { '@cumulus/es-client/search': { @@ -392,6 +391,7 @@ test.serial('bulk operation BULK_GRANULE applies workflow to granules returned b }); await verifyGranulesQueuedStatus(t); }); + test.serial('applyWorkflowToGranules sets the granules status to queued', async (t) => { await setUpExistingDatabaseRecords(t); const workflowName = 'test-workflow'; diff --git a/packages/api/tests/lambdas/test-cleanExecutions.js b/packages/api/tests/lambdas/test-cleanExecutions.js deleted file mode 100644 index e19e251b058..00000000000 --- a/packages/api/tests/lambdas/test-cleanExecutions.js +++ /dev/null @@ -1,553 +0,0 @@ -/* eslint-disable no-await-in-loop */ -const test = require('ava'); -const moment = require('moment'); -const clone = require('lodash/clone'); -const { - translatePostgresExecutionToApiExecution, - fakeExecutionRecordFactory, - localStackConnectionEnv, -} = require('@cumulus/db'); -const { cleanupTestIndex, createTestIndex } = require('@cumulus/es-client/testUtils'); -const { handler, getExpirationDate, cleanupExpiredESExecutionPayloads } = require('../../lambdas/cleanExecutions'); -test.beforeEach(async (t) => { - const { esIndex, esClient, searchClient } = await createTestIndex(); - t.context.esIndex = esIndex; - t.context.esClient = esClient; - t.context.searchClient = searchClient; - - const records = []; - for (let i = 0; i < 20; i += 2) { - records.push(await translatePostgresExecutionToApiExecution(fakeExecutionRecordFactory({ - updated_at: moment().subtract(i, 'days').toDate(), - final_payload: '{"a": "b"}', - original_payload: '{"b": "c"}', - status: 'completed', - cumulus_id: i, - }))); - records.push(await translatePostgresExecutionToApiExecution(fakeExecutionRecordFactory({ - updated_at: moment().subtract(i, 'days').toDate(), - final_payload: '{"a": "b"}', - original_payload: '{"b": "c"}', - status: 'running', - cumulus_id: i + 1, - }))); - } - for (const record of records) { - await t.context.esClient.client.index({ - body: record, - id: record.cumulusId, - index: t.context.esIndex, - type: 'execution', - refresh: true, - }); - } -}); - -test.afterEach.always(async (t) => { - await cleanupTestIndex(t.context); -}); - -const esPayloadsEmpty = (entry) => !entry.finalPayload && !entry.orginalPayload; - -test.serial('handler() handles running expiration', async (t) => { - const env = clone(process.env); - process.env = localStackConnectionEnv; - process.env.PG_DATABASE = t.context.testDbName; - process.env.ES_INDEX = t.context.esIndex; - process.env.LOCAL_ES_HOST = 'localhost'; - let expirationDays = 4; - let expirationDate = getExpirationDate(expirationDays); - process.env.CLEANUP_NON_RUNNING = 'false'; - process.env.CLEANUP_RUNNING = 'true'; - process.env.PAYLOAD_TIMEOUT = expirationDays; - - await handler(); - - let massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - massagedEsExecutions.results.forEach((massagedExecution) => { - if (massagedExecution.updatedAt <= expirationDate && massagedExecution.status === 'running') { - t.true(esPayloadsEmpty(massagedExecution)); - } else { - t.false(esPayloadsEmpty(massagedExecution)); - } - }); - - expirationDays = 2; - expirationDate = getExpirationDate(expirationDays); - process.env.PAYLOAD_TIMEOUT = expirationDays; - - await handler(); - - massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - massagedEsExecutions.results.forEach((massagedExecution) => { - if (massagedExecution.updatedAt <= expirationDate.getTime() && massagedExecution.status === 'running') { - t.true(esPayloadsEmpty(massagedExecution)); - } else { - t.false(esPayloadsEmpty(massagedExecution)); - } - }); - process.env = env; -}); - -test.serial('handler() handles non running expiration', async (t) => { - const env = clone(process.env); - process.env = localStackConnectionEnv; - process.env.PG_DATABASE = t.context.testDbName; - process.env.ES_INDEX = t.context.esIndex; - let expirationDays = 5; - let expirationDate = getExpirationDate(expirationDays); - process.env.CLEANUP_NON_RUNNING = 'true'; - process.env.CLEANUP_RUNNING = 'false'; - process.env.PAYLOAD_TIMEOUT = expirationDays; - await handler(); - - let massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - - massagedEsExecutions.results.forEach((massagedExecution) => { - if (massagedExecution.updatedAt <= expirationDate && massagedExecution.status !== 'running') { - t.true(esPayloadsEmpty(massagedExecution)); - } else { - t.false(esPayloadsEmpty(massagedExecution)); - } - }); - - expirationDays = 3; - expirationDate = getExpirationDate(expirationDays); - process.env.PAYLOAD_TIMEOUT = expirationDays; - - await handler(); - - massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - massagedEsExecutions.results.forEach((massagedExecution) => { - if (massagedExecution.updatedAt <= expirationDate.getTime() && massagedExecution.status !== 'running') { - t.true(esPayloadsEmpty(massagedExecution)); - } else { - t.false(esPayloadsEmpty(massagedExecution)); - } - }); - process.env = env; -}); - -test.serial('handler() handles both expirations', async (t) => { - const env = clone(process.env); - process.env = localStackConnectionEnv; - process.env.PG_DATABASE = t.context.testDbName; - process.env.ES_INDEX = t.context.esIndex; - process.env.LOCAL_ES_HOST = 'localhost'; - let payloadTimeout = 9; - let payloadExpiration = getExpirationDate(payloadTimeout); - - process.env.CLEANUP_RUNNING = 'true'; - process.env.CLEANUP_NON_RUNNING = 'true'; - process.env.PAYLOAD_TIMEOUT = payloadTimeout; - - await handler(); - - let massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - massagedEsExecutions.results.forEach((massagedExecution) => { - if (massagedExecution.updatedAt <= payloadExpiration.getTime()) { - t.true(esPayloadsEmpty(massagedExecution)); - } else { - t.false(esPayloadsEmpty(massagedExecution)); - } - }); - payloadTimeout = 8; - - payloadExpiration = getExpirationDate(payloadTimeout); - process.env.PAYLOAD_TIMEOUT = payloadTimeout; - - await handler(); - - massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - massagedEsExecutions.results.forEach((massagedExecution) => { - if (massagedExecution.updatedAt <= payloadExpiration.getTime()) { - t.true(esPayloadsEmpty(massagedExecution)); - } else { - t.false(esPayloadsEmpty(massagedExecution)); - } - }); - process.env = env; -}); - -test.serial('handler() throws errors when misconfigured', async (t) => { - const env = clone(process.env); - process.env.CLEANUP_RUNNING = 'false'; - process.env.CLEANUP_NON_RUNNING = 'false'; - - await t.throwsAsync(handler(), { - message: 'running and non-running executions configured to be skipped, nothing to do', - }); - - process.env.CLEANUP_RUNNING = 'false'; - process.env.CLEANUP_NON_RUNNING = 'true'; - process.env.PAYLOAD_TIMEOUT = 'frogs'; - await t.throwsAsync(handler(), { - message: 'Invalid number of days specified in configuration for payloadTimeout: frogs', - }); - process.env = env; -}); - -test.serial('handler() iterates through data in batches when updateLimit is set low', async (t) => { - const env = clone(process.env); - - process.env = localStackConnectionEnv; - process.env.PG_DATABASE = t.context.testDbName; - process.env.ES_INDEX = t.context.esIndex; - process.env.LOCAL_ES_HOST = 'localhost'; - - process.env.CLEANUP_RUNNING = 'true'; - process.env.CLEANUP_NON_RUNNING = 'true'; - process.env.PAYLOAD_TIMEOUT = 2; - - process.env.UPDATE_LIMIT = 2; - - await handler(); - - let massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - let esCleanedCount = 0; - massagedEsExecutions.results.forEach((massagedExecution) => { - if (esPayloadsEmpty(massagedExecution)) esCleanedCount += 1; - }); - t.is(esCleanedCount, 2); - - await handler(); - - massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - esCleanedCount = 0; - massagedEsExecutions.results.forEach((massagedExecution) => { - if (esPayloadsEmpty(massagedExecution)) esCleanedCount += 1; - }); - t.is(esCleanedCount, 4); - - process.env.UPDATE_LIMIT = 12; - - await handler(); - - massagedEsExecutions = await t.context.searchClient.query({ - index: t.context.esIndex, - type: 'execution', - body: {}, - size: 30, - }); - esCleanedCount = 0; - massagedEsExecutions.results.forEach((massagedExecution) => { - if (esPayloadsEmpty(massagedExecution)) esCleanedCount += 1; - }); - t.is(esCleanedCount, 16); - - process.env = env; -}); - -test('cleanupExpiredEsExecutionPayloads() for just running removes expired running executions', async (t) => { - let timeoutDays = 6; - await cleanupExpiredESExecutionPayloads( - timeoutDays, - true, - false, - 100, - t.context.esIndex - ); - // await es refresh - - let expiration = moment().subtract(timeoutDays, 'days').toDate().getTime(); - let relevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - lte: expiration, - }, - }, - }, - }, - } - ); - for (const execution of relevantExecutions.results) { - if (execution.status === 'running') { - t.true(execution.finalPayload === undefined); - t.true(execution.originalPayload === undefined); - } else { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } - } - let irrelevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - gt: expiration, - }, - }, - }, - }, - } - ); - for (const execution of irrelevantExecutions.results) { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } - - timeoutDays = 2; - await cleanupExpiredESExecutionPayloads( - timeoutDays, - true, - false, - 100, - t.context.esIndex - ); - - expiration = moment().subtract(timeoutDays, 'days').toDate().getTime(); - relevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - lte: expiration, - }, - }, - }, - }, - } - ); - for (const execution of relevantExecutions.results) { - if (execution.status === 'running') { - t.true(execution.finalPayload === undefined); - t.true(execution.originalPayload === undefined); - } else { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } - } - irrelevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - gt: expiration, - }, - }, - }, - }, - } - ); - for (const execution of irrelevantExecutions.results) { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } -}); - -test('cleanupExpiredEsExecutionPayloads() for just nonRunning removes expired non running executions', async (t) => { - let timeoutDays = 6; - await cleanupExpiredESExecutionPayloads( - timeoutDays, - false, - true, - 100, - t.context.esIndex - ); - - let expiration = moment().subtract(timeoutDays, 'days').toDate().getTime(); - - let relevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - lte: expiration, - }, - }, - }, - }, - } - ); - for (const execution of relevantExecutions.results) { - if (execution.status !== 'running') { - t.true(execution.finalPayload === undefined); - t.true(execution.originalPayload === undefined); - } else { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } - } - let irrelevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - gt: expiration, - }, - }, - }, - }, - } - ); - for (const execution of irrelevantExecutions.results) { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } - - timeoutDays = 2; - await cleanupExpiredESExecutionPayloads( - timeoutDays, - false, - true, - 100, - t.context.esIndex - ); - - expiration = moment().subtract(timeoutDays, 'days').toDate().getTime(); - relevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - lte: expiration, - }, - }, - }, - }, - } - ); - for (const execution of relevantExecutions.results) { - if (execution.status !== 'running') { - t.true(execution.finalPayload === undefined); - t.true(execution.originalPayload === undefined); - } else { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } - } - irrelevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - gt: expiration, - }, - }, - }, - }, - } - ); - for (const execution of irrelevantExecutions.results) { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } -}); - -test('cleanupExpiredEsExecutionPayloads() for running and nonRunning executions', async (t) => { - const timeoutDays = 5; - await cleanupExpiredESExecutionPayloads( - timeoutDays, - true, - true, - 100, - t.context.esIndex - ); - - const expiration = moment().subtract(timeoutDays, 'days').toDate().getTime(); - - const relevant = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - lte: expiration, - }, - }, - }, - }, - } - ); - for (const execution of relevant.results) { - t.true(execution.finalPayload === undefined); - t.true(execution.originalPayload === undefined); - } - const irrelevantExecutions = await t.context.searchClient.query( - { - index: t.context.esIndex, - type: 'execution', - body: { - query: { - range: { - updatedAt: { - gt: expiration, - }, - }, - }, - }, - } - ); - for (const execution of irrelevantExecutions.results) { - t.false(execution.finalPayload === undefined); - t.false(execution.originalPayload === undefined); - } -}); diff --git a/packages/api/tests/lambdas/test-create-reconciliation-report.js b/packages/api/tests/lambdas/test-create-reconciliation-report.js index fead1d4b13b..25b988d21bd 100644 --- a/packages/api/tests/lambdas/test-create-reconciliation-report.js +++ b/packages/api/tests/lambdas/test-create-reconciliation-report.js @@ -46,8 +46,6 @@ const { translatePostgresReconReportToApiReconReport, } = require('@cumulus/db'); const { getDistributionBucketMapKey } = require('@cumulus/distribution-utils'); -const { Search } = require('@cumulus/es-client/search'); -const { bootstrapElasticSearch } = require('@cumulus/es-client/bootstrap'); const { fakeGranuleFactoryV2, @@ -62,9 +60,6 @@ const ORCASearchCatalogQueue = require('../../lib/ORCASearchCatalogQueue'); // Call normalize event on all input events before calling the handler. const handler = (event) => unwrappedHandler(normalizeEvent(event)); -let esAlias; -let esIndex; - const createBucket = (Bucket) => awsServices.s3().createBucket({ Bucket }); const requiredStaticCollectionFields = { granuleIdExtraction: randomString(), @@ -253,16 +248,16 @@ const randomBetween = (a, b) => Math.floor(Math.random() * (b - a + 1) + a); const randomTimeBetween = (t1, t2) => randomBetween(t1, t2); /** - * Prepares localstack with a number of active granules. Sets up ES with + * Prepares localstack with a number of active granules. Sets up pg with * random collections where some fall within the start and end timestamps. - * Also creates a number that are only in ES, as well as some that are only + * Also creates a number that are only in pg, as well as some that are only * "returned by CMR" (as a stubbed function) * * @param t.t * @param {object} t - AVA test context. * @param t.params * @returns {object} setupVars - Object with information about the current - * state of elasticsearch and CMR mock. + * state of pg and CMR mock. * The object returned has: * + startTimestamp - beginning of matching timerange * + endTimestamp - end of matching timerange @@ -270,11 +265,11 @@ const randomTimeBetween = (t1, t2) => randomBetween(t1, t2); * timestamps and included in the CMR mock * + matchingCollectionsOutsiderange - active collections dated not between the * start and end timestamps and included in the CMR mock - * + extraESCollections - collections within the timestamp range, but excluded - * from CMR mock. (only in ES) - * + extraESCollectionsOutOfRange - collections outside the timestamp range and - * excluded from CMR mock. (only in ES out of range) - * + extraCmrCollections - collections not in ES but returned by the CMR mock. + * + extraPgCollections - collections within the timestamp range, but excluded + * from CMR mock + * + extraPgCollectionsOutOfRange - collections outside the timestamp range and + * excluded from CMR mock + * + extraCmrCollections - collections not in pg but returned by the CMR mock */ const setupDatabaseAndCMRForTests = async ({ t, params = {} }) => { const dataBuckets = range(2).map(() => randomId('bucket')); @@ -294,8 +289,8 @@ const setupDatabaseAndCMRForTests = async ({ t, params = {} }) => { const { numMatchingCollections = randomBetween(10, 15), numMatchingCollectionsOutOfRange = randomBetween(5, 10), - numExtraESCollections = randomBetween(5, 10), - numExtraESCollectionsOutOfRange = randomBetween(5, 10), + numExtraPgCollections = randomBetween(5, 10), + numExtraPgCollectionsOutOfRange = randomBetween(5, 10), numExtraCmrCollections = randomBetween(5, 10), } = params; @@ -304,31 +299,31 @@ const setupDatabaseAndCMRForTests = async ({ t, params = {} }) => { const endTimestamp = new Date('2020-07-01T00:00:00.000Z').getTime(); const monthLater = moment(endTimestamp).add(1, 'month').valueOf(); - // Create collections that are in sync ES/CMR during the time period + // Create collections that are in sync pg/CMR during the time period const matchingCollections = range(numMatchingCollections).map((r) => ({ ...requiredStaticCollectionFields, name: randomId(`name${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(startTimestamp, endTimestamp), })); - // Create collections in sync ES/CMR outside of the timestamps range + // Create collections in sync pg/CMR outside of the timestamps range const matchingCollectionsOutsideRange = range(numMatchingCollectionsOutOfRange).map((r) => ({ ...requiredStaticCollectionFields, name: randomId(`name${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(monthEarlier, startTimestamp - 1), })); - // Create collections in ES only within the timestamp range - const extraESCollections = range(numExtraESCollections).map((r) => ({ + // Create collections in pg only within the timestamp range + const extraPgCollections = range(numExtraPgCollections).map((r) => ({ ...requiredStaticCollectionFields, - name: randomId(`extraES${r}-`), + name: randomId(`extraPg${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(startTimestamp, endTimestamp), })); - // Create collections in ES only outside of the timestamp range - const extraESCollectionsOutOfRange = range(numExtraESCollectionsOutOfRange).map((r) => ({ + // Create collections in pg only outside of the timestamp range + const extraPgCollectionsOutOfRange = range(numExtraPgCollectionsOutOfRange).map((r) => ({ ...requiredStaticCollectionFields, - name: randomId(`extraES${r}-`), + name: randomId(`extraPg${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(endTimestamp + 1, monthLater), })); @@ -360,8 +355,8 @@ const setupDatabaseAndCMRForTests = async ({ t, params = {} }) => { await storeCollectionsWithGranuleToPostgres( matchingCollections .concat(matchingCollectionsOutsideRange) - .concat(extraESCollections) - .concat(extraESCollectionsOutOfRange), + .concat(extraPgCollections) + .concat(extraPgCollectionsOutOfRange), t.context ); @@ -376,8 +371,8 @@ const setupDatabaseAndCMRForTests = async ({ t, params = {} }) => { endTimestamp, matchingCollections, matchingCollectionsOutsideRange, - extraESCollections, - extraESCollectionsOutOfRange, + extraPgCollections, + extraPgCollectionsOutOfRange, extraCmrCollections, collectionGranules, mappedProviders, @@ -421,20 +416,6 @@ test.beforeEach(async (t) => { cmrSearchStub.withArgs('collections').resolves([]); cmrSearchStub.withArgs('granules').resolves([]); - esAlias = randomId('esalias'); - esIndex = randomId('esindex'); - process.env.ES_INDEX = esAlias; - await bootstrapElasticSearch({ - host: 'fakehost', - index: esIndex, - alias: esAlias, - }); - t.context.esReportClient = new Search( - {}, - 'reconciliationReport', - process.env.ES_INDEX - ); - // write 4 providers to the database t.context.providers = await Promise.all(new Array(4).fill().map(async () => { const [pgProvider] = await t.context.providerPgModel.create( @@ -514,12 +495,8 @@ test.serial('Generates valid reconciliation report for no buckets', async (t) => t.true(createStartTime <= createEndTime); t.is(report.reportStartTime, (new Date(startTimestamp)).toISOString()); t.is(report.reportEndTime, (new Date(endTimestamp)).toISOString()); - - const esRecord = await t.context.esReportClient.get(reportRecord.name); - t.like(esRecord, reportRecord); }); -// TODO - use this to make generic the data to PG test.serial('Generates valid GNF reconciliation report when everything is in sync', async (t) => { const { files, matchingColls } = await generateRandomGranules(t); const event = { @@ -554,9 +531,6 @@ test.serial('Generates valid GNF reconciliation report when everything is in syn const createStartTime = moment(report.createStartTime); const createEndTime = moment(report.createEndTime); t.true(createStartTime <= createEndTime); - - const esRecord = await t.context.esReportClient.get(reportRecord.name); - t.like(esRecord, reportRecord); }); test.serial('Generates a valid Inventory reconciliation report when everything is in sync', async (t) => { @@ -787,7 +761,7 @@ test.serial('Generates valid reconciliation report when internally, there are bo test.serial('Generates valid reconciliation report when there are both extra postGres and CMR collections', async (t) => { const params = { numMatchingCollectionsOutOfRange: 0, - numExtraESCollectionsOutOfRange: 0, + numExtraPgCollectionsOutOfRange: 0, }; const setupVars = await setupDatabaseAndCMRForTests({ t, params }); @@ -807,8 +781,8 @@ test.serial('Generates valid reconciliation report when there are both extra pos t.is(report.error, undefined); t.is(collectionsInCumulusCmr.okCount, setupVars.matchingCollections.length); - t.is(collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraESCollections.length); - setupVars.extraESCollections.map((collection) => + t.is(collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraPgCollections.length); + setupVars.extraPgCollections.map((collection) => t.true(collectionsInCumulusCmr.onlyInCumulus .includes(constructCollectionId(collection.name, collection.version)))); @@ -843,14 +817,14 @@ test.serial( t.is(report.error, undefined); t.is(collectionsInCumulusCmr.okCount, setupVars.matchingCollections.length); - t.is(collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraESCollections.length); + t.is(collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraPgCollections.length); // Each extra collection in timerange is included - setupVars.extraESCollections.map((collection) => + setupVars.extraPgCollections.map((collection) => t.true(collectionsInCumulusCmr.onlyInCumulus .includes(constructCollectionId(collection.name, collection.version)))); // No collections that were out of timestamp are included - setupVars.extraESCollectionsOutOfRange.map((collection) => + setupVars.extraPgCollectionsOutOfRange.map((collection) => t.false(collectionsInCumulusCmr.onlyInCumulus .includes(constructCollectionId(collection.name, collection.version)))); @@ -949,7 +923,7 @@ test.serial( async (t) => { const params = { numMatchingCollectionsOutOfRange: 0, - numExtraESCollectionsOutOfRange: 0, + numExtraPgCollectionsOutOfRange: 0, }; const setupVars = await setupDatabaseAndCMRForTests({ t, params }); @@ -970,8 +944,8 @@ test.serial( t.is(collectionsInCumulusCmr.okCount, setupVars.matchingCollections.length); t.is(report.filesInCumulus.okCount, 0); - t.is(collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraESCollections.length); - setupVars.extraESCollections.map((collection) => + t.is(collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraPgCollections.length); + setupVars.extraPgCollections.map((collection) => t.true(collectionsInCumulusCmr.onlyInCumulus .includes(constructCollectionId(collection.name, collection.version)))); @@ -1009,12 +983,12 @@ test.serial( // all extra DB collections are found t.is( collectionsInCumulusCmr.onlyInCumulus.length, - setupVars.extraESCollections.length + setupVars.extraESCollectionsOutOfRange.length + setupVars.extraPgCollections.length + setupVars.extraPgCollectionsOutOfRange.length ); - setupVars.extraESCollections.map((collection) => + setupVars.extraPgCollections.map((collection) => t.true(collectionsInCumulusCmr.onlyInCumulus .includes(constructCollectionId(collection.name, collection.version)))); - setupVars.extraESCollectionsOutOfRange.map((collection) => + setupVars.extraPgCollectionsOutOfRange.map((collection) => t.true(collectionsInCumulusCmr.onlyInCumulus .includes(constructCollectionId(collection.name, collection.version)))); @@ -1037,8 +1011,8 @@ test.serial( const testCollection = [ setupVars.matchingCollections[3], setupVars.extraCmrCollections[1], - setupVars.extraESCollections[1], - setupVars.extraESCollectionsOutOfRange[0], + setupVars.extraPgCollections[1], + setupVars.extraPgCollectionsOutOfRange[0], ]; const collectionId = testCollection.map((c) => constructCollectionId(c.name, c.version)); @@ -1120,7 +1094,7 @@ test.serial( const testCollection = [ setupVars.extraCmrCollections[3], setupVars.matchingCollections[2], - setupVars.extraESCollections[1], + setupVars.extraPgCollections[1], ]; const collectionId = testCollection.map((c) => constructCollectionId(c.name, c.version)); console.log(`testCollection: ${JSON.stringify(collectionId)}`); @@ -1155,7 +1129,7 @@ test.serial( async (t) => { const setupVars = await setupDatabaseAndCMRForTests({ t }); - const testCollection = setupVars.extraESCollections[3]; + const testCollection = setupVars.extraPgCollections[3]; console.log(`testCollection: ${JSON.stringify(testCollection)}`); const event = { @@ -1187,15 +1161,15 @@ test.serial( ); test.serial( - 'Generates valid ONE WAY reconciliation report with time params and filters by granuleIds when there are extra cumulus/ES and CMR collections', + 'Generates valid ONE WAY reconciliation report with time params and filters by granuleIds when there are extra cumulus/pg and CMR collections', async (t) => { const { startTimestamp, endTimestamp, ...setupVars } = await setupDatabaseAndCMRForTests({ t }); const testCollection = [ setupVars.matchingCollections[3], setupVars.extraCmrCollections[1], - setupVars.extraESCollections[1], - setupVars.extraESCollectionsOutOfRange[0], + setupVars.extraPgCollections[1], + setupVars.extraPgCollectionsOutOfRange[0], ]; const testCollectionIds = testCollection.map((c) => constructCollectionId(c.name, c.version)); @@ -1251,7 +1225,7 @@ test.serial( const testCollection = [ setupVars.extraCmrCollections[3], setupVars.matchingCollections[2], - setupVars.extraESCollections[1], + setupVars.extraPgCollections[1], ]; const testCollectionIds = testCollection.map((c) => constructCollectionId(c.name, c.version)); @@ -1296,7 +1270,7 @@ test.serial( const testCollection = [ setupVars.extraCmrCollections[3], setupVars.matchingCollections[2], - setupVars.extraESCollections[1], + setupVars.extraPgCollections[1], ]; const testCollectionIds = testCollection.map((c) => constructCollectionId(c.name, c.version)); @@ -1765,7 +1739,7 @@ test.serial('When report creation fails, reconciliation report status is set to t.context.knex, { name: reportName } ); // reconciliation report lambda outputs the translated API version, not the PG version, so - // it should be translated for comparison, at least for the comparison with the ES (API) version + // it should be translated for comparison const reportApiRecord = translatePostgresReconReportToApiReconReport(reportPgRecord); t.is(reportApiRecord.status, 'Failed'); t.is(reportApiRecord.type, 'Inventory'); @@ -1774,9 +1748,6 @@ test.serial('When report creation fails, reconciliation report status is set to const report = await getJsonS3Object(t.context.systemBucket, reportKey); t.is(report.status, 'Failed'); t.truthy(report.error); - - const esRecord = await t.context.esReportClient.get(reportName); - t.like(esRecord, reportApiRecord); }); test.serial('Creates a valid Granule Inventory report', async (t) => { @@ -1824,9 +1795,6 @@ test.serial('Creates a valid Granule Inventory report', async (t) => { const header = '"granuleUr","collectionId","createdAt","startDateTime","endDateTime","status","updatedAt","published","provider"'; t.is(reportHeader, header); t.is(reportRows.length, 10); - - const esRecord = await t.context.esReportClient.get(reportRecord.name); - t.like(esRecord, reportRecord); }); test.serial('A valid ORCA Backup reconciliation report is generated', async (t) => { @@ -1920,9 +1888,6 @@ test.serial('A valid ORCA Backup reconciliation report is generated', async (t) t.is(report.granules.onlyInCumulus.length, 0); t.is(report.granules.onlyInOrca.length, 0); t.is(report.granules.withConflicts.length, 0); - - const esRecord = await t.context.esReportClient.get(reportRecord.name); - t.like(esRecord, reportRecord); }); test.serial('Inventory reconciliation report JSON is formatted', async (t) => { @@ -2022,7 +1987,7 @@ test.serial('When there is an error for an ORCA backup report, it throws', async t.context.knex, { name: reportName } ); // reconciliation report lambda outputs the translated API version, not the PG version, so - // it should be translated for comparison, at least for the comparison with the ES (API) version + // it should be translated for comparison const reportApiRecord = translatePostgresReconReportToApiReconReport(reportPgRecord); t.is(reportApiRecord.status, 'Failed'); t.is(reportApiRecord.type, event.reportType); @@ -2031,9 +1996,6 @@ test.serial('When there is an error for an ORCA backup report, it throws', async const report = await getJsonS3Object(t.context.systemBucket, reportKey); t.is(report.status, 'Failed'); t.is(report.reportType, event.reportType); - - const esRecord = await t.context.esReportClient.get(reportName); - t.like(esRecord, reportApiRecord); }); test.serial('Internal reconciliation report type throws an error', async (t) => { diff --git a/packages/api/tests/lambdas/test-index-from-database.js b/packages/api/tests/lambdas/test-index-from-database.js deleted file mode 100644 index 8e647736154..00000000000 --- a/packages/api/tests/lambdas/test-index-from-database.js +++ /dev/null @@ -1,512 +0,0 @@ -'use strict'; - -const cryptoRandomString = require('crypto-random-string'); -const sinon = require('sinon'); -const test = require('ava'); -const omit = require('lodash/omit'); - -const awsServices = require('@cumulus/aws-client/services'); -const { - promiseS3Upload, - recursivelyDeleteS3Bucket, -} = require('@cumulus/aws-client/S3'); -const { randomString } = require('@cumulus/common/test-utils'); -const { bootstrapElasticSearch } = require('@cumulus/es-client/bootstrap'); -const indexer = require('@cumulus/es-client/indexer'); -const { EsClient, Search } = require('@cumulus/es-client/search'); -const { - CollectionPgModel, - destroyLocalTestDb, - ExecutionPgModel, - fakeCollectionRecordFactory, - fakeExecutionRecordFactory, - fakeGranuleRecordFactory, - fakePdrRecordFactory, - fakeProviderRecordFactory, - generateLocalTestDb, - GranulePgModel, - migrationDir, - PdrPgModel, - ProviderPgModel, - translatePostgresCollectionToApiCollection, - translatePostgresExecutionToApiExecution, - translatePostgresGranuleToApiGranule, - translatePostgresPdrToApiPdr, - translatePostgresProviderToApiProvider, -} = require('@cumulus/db'); - -const { - fakeReconciliationReportFactory, -} = require('../../lib/testUtils'); - -const models = require('../../models'); -const indexFromDatabase = require('../../lambdas/index-from-database'); -const { - getWorkflowList, -} = require('../../lib/testUtils'); - -const workflowList = getWorkflowList(); -process.env.ReconciliationReportsTable = randomString(); -const reconciliationReportModel = new models.ReconciliationReport(); - -// create all the variables needed across this test -process.env.system_bucket = randomString(); -process.env.stackName = randomString(); - -const reconciliationReportsTable = process.env.ReconciliationReportsTable; - -function sortAndFilter(input, omitList, sortKey) { - return input.map((r) => omit(r, omitList)) - .sort((a, b) => (a[sortKey] > b[sortKey] ? 1 : -1)); -} - -async function addFakeDynamoData(numItems, factory, model, factoryParams = {}) { - const items = []; - - /* eslint-disable no-await-in-loop */ - for (let i = 0; i < numItems; i += 1) { - const item = factory(factoryParams); - items.push(item); - await model.create(item); - } - /* eslint-enable no-await-in-loop */ - - return items; -} - -async function addFakeData(knex, numItems, factory, model, factoryParams = {}) { - const items = []; - for (let i = 0; i < numItems; i += 1) { - const item = factory(factoryParams); - items.push(model.create(knex, item, '*')); - } - return (await Promise.all(items)).map((result) => result[0]); -} - -function searchEs(type, index, limit = 10) { - const executionQuery = new Search({ queryStringParameters: { limit } }, type, index); - return executionQuery.query(); -} - -test.before(async (t) => { - t.context.esIndices = []; - - await awsServices.s3().createBucket({ Bucket: process.env.system_bucket }); - await reconciliationReportModel.createTable(); - - const wKey = `${process.env.stackName}/workflows/${workflowList[0].name}.json`; - const tKey = `${process.env.stackName}/workflow_template.json`; - await Promise.all([ - promiseS3Upload({ - params: { - Bucket: process.env.system_bucket, - Key: wKey, - Body: JSON.stringify(workflowList[0]), - }, - }), - promiseS3Upload({ - params: { - Bucket: process.env.system_bucket, - Key: tKey, - Body: JSON.stringify({}), - }, - }), - ]); -}); - -test.beforeEach(async (t) => { - t.context.testDbName = `test_index_${cryptoRandomString({ length: 10 })}`; - const { knex, knexAdmin } = await generateLocalTestDb(t.context.testDbName, migrationDir); - t.context.knex = knex; - t.context.knexAdmin = knexAdmin; - t.context.esIndex = randomString(); - t.context.esAlias = randomString(); - await bootstrapElasticSearch({ - host: 'fakehost', - index: t.context.esIndex, - alias: t.context.esAlias, - }); - - t.context.esClient = new EsClient('fakehost'); - await t.context.esClient.initializeEsClient(); -}); - -test.afterEach.always(async (t) => { - const { esClient, esIndex, testDbName } = t.context; - await esClient.client.indices.delete({ index: esIndex }); - await destroyLocalTestDb({ - knex: t.context.knex, - knexAdmin: t.context.knexAdmin, - testDbName, - }); -}); - -test.after.always(async () => { - await recursivelyDeleteS3Bucket(process.env.system_bucket); -}); - -test('getEsRequestConcurrency respects concurrency value in payload', (t) => { - t.is(indexFromDatabase.getEsRequestConcurrency({ - esRequestConcurrency: 5, - }), 5); -}); - -test.serial('getEsRequestConcurrency respects ES_CONCURRENCY environment variable', (t) => { - process.env.ES_CONCURRENCY = 35; - t.is(indexFromDatabase.getEsRequestConcurrency({}), 35); - delete process.env.ES_CONCURRENCY; -}); - -test('getEsRequestConcurrency correctly returns 10 when nothing is specified', (t) => { - t.is(indexFromDatabase.getEsRequestConcurrency({}), 10); -}); - -test.serial('getEsRequestConcurrency throws an error when -1 is specified', (t) => { - t.throws( - () => indexFromDatabase.getEsRequestConcurrency({ - esRequestConcurrency: -1, - }), - { instanceOf: TypeError } - ); - - process.env.ES_CONCURRENCY = -1; - t.teardown(() => { - delete process.env.ES_CONCURRENCY; - }); - t.throws( - () => indexFromDatabase.getEsRequestConcurrency({}), - { instanceOf: TypeError } - ); -}); - -test.serial('getEsRequestConcurrency throws an error when "asdf" is specified', (t) => { - t.throws( - () => indexFromDatabase.getEsRequestConcurrency({ - esRequestConcurrency: 'asdf', - }), - { instanceOf: TypeError } - ); - - process.env.ES_CONCURRENCY = 'asdf'; - t.teardown(() => { - delete process.env.ES_CONCURRENCY; - }); - t.throws( - () => indexFromDatabase.getEsRequestConcurrency({}), - { instanceOf: TypeError } - ); -}); - -test.serial('getEsRequestConcurrency throws an error when 0 is specified', (t) => { - t.throws( - () => indexFromDatabase.getEsRequestConcurrency({ - esRequestConcurrency: 0, - }), - { instanceOf: TypeError } - ); - - process.env.ES_CONCURRENCY = 0; - t.teardown(() => { - delete process.env.ES_CONCURRENCY; - }); - t.throws( - () => indexFromDatabase.getEsRequestConcurrency({}), - { instanceOf: TypeError } - ); -}); - -test('No error is thrown if nothing is in the database', async (t) => { - const { esAlias, knex } = t.context; - - await t.notThrowsAsync(() => indexFromDatabase.indexFromDatabase({ - indexName: esAlias, - reconciliationReportsTable, - knex, - })); -}); - -test.serial('Lambda successfully indexes records of all types', async (t) => { - const knex = t.context.knex; - const { esAlias } = t.context; - - const numItems = 20; - - const fakeData = []; - const dateObject = { created_at: new Date(), updated_at: new Date() }; - const fakeCollectionRecords = await addFakeData( - knex, - numItems, - fakeCollectionRecordFactory, - new CollectionPgModel(), - dateObject - ); - fakeData.push(fakeCollectionRecords); - - const fakeExecutionRecords = await addFakeData( - knex, - numItems, - fakeExecutionRecordFactory, - new ExecutionPgModel(), - { ...dateObject } - ); - - const fakeGranuleRecords = await addFakeData( - knex, - numItems, - fakeGranuleRecordFactory, - new GranulePgModel(), - { collection_cumulus_id: fakeCollectionRecords[0].cumulus_id, ...dateObject } - ); - - const fakeProviderRecords = await addFakeData( - knex, - numItems, - fakeProviderRecordFactory, - new ProviderPgModel(), - dateObject - ); - - const fakePdrRecords = await addFakeData(knex, numItems, fakePdrRecordFactory, new PdrPgModel(), { - collection_cumulus_id: fakeCollectionRecords[0].cumulus_id, - provider_cumulus_id: fakeProviderRecords[0].cumulus_id, - ...dateObject, - }); - - const fakeReconciliationReportRecords = await addFakeDynamoData( - numItems, - fakeReconciliationReportFactory, - reconciliationReportModel - ); - - await indexFromDatabase.handler({ - indexName: esAlias, - pageSize: 6, - knex, - }); - - const searchResults = await Promise.all([ - searchEs('collection', esAlias, '20'), - searchEs('execution', esAlias, '20'), - searchEs('granule', esAlias, '20'), - searchEs('pdr', esAlias, '20'), - searchEs('provider', esAlias, '20'), - searchEs('reconciliationReport', esAlias, '20'), - ]); - - searchResults.map((res) => t.is(res.meta.count, numItems)); - - const collectionResults = await Promise.all( - fakeCollectionRecords.map((r) => - translatePostgresCollectionToApiCollection(r)) - ); - const executionResults = await Promise.all( - fakeExecutionRecords.map((r) => translatePostgresExecutionToApiExecution(r)) - ); - const granuleResults = await Promise.all( - fakeGranuleRecords.map((r) => - translatePostgresGranuleToApiGranule({ - granulePgRecord: r, - knexOrTransaction: knex, - })) - ); - const pdrResults = await Promise.all( - fakePdrRecords.map((r) => translatePostgresPdrToApiPdr(r, knex)) - ); - const providerResults = await Promise.all( - fakeProviderRecords.map((r) => translatePostgresProviderToApiProvider(r)) - ); - - t.deepEqual( - searchResults[0].results - .map((r) => omit(r, ['timestamp'])) - .sort((a, b) => (a.name > b.name ? 1 : -1)), - collectionResults - .sort((a, b) => (a.name > b.name ? 1 : -1)) - ); - - t.deepEqual( - sortAndFilter(searchResults[1].results, ['timestamp'], 'name'), - sortAndFilter(executionResults, ['timestamp'], 'name') - ); - - t.deepEqual( - sortAndFilter(searchResults[2].results, ['timestamp'], 'granuleId'), - sortAndFilter(granuleResults, ['timestamp'], 'granuleId') - ); - - t.deepEqual( - sortAndFilter(searchResults[3].results, ['timestamp'], 'pdrName'), - sortAndFilter(pdrResults, ['timestamp'], 'pdrName') - ); - - t.deepEqual( - sortAndFilter(searchResults[4].results, ['timestamp'], 'id'), - sortAndFilter(providerResults, ['timestamp'], 'id') - ); - - t.deepEqual( - sortAndFilter(searchResults[5].results, ['timestamp'], 'name'), - sortAndFilter(fakeReconciliationReportRecords, ['timestamp'], 'name') - ); -}); - -test.serial('failure in indexing record of specific type should not prevent indexing of other records with same type', async (t) => { - const { esAlias, esClient, knex } = t.context; - const granulePgModel = new GranulePgModel(); - const numItems = 7; - const collectionRecord = await addFakeData( - knex, - 1, - fakeCollectionRecordFactory, - new CollectionPgModel() - ); - const fakeData = await addFakeData(knex, numItems, fakeGranuleRecordFactory, granulePgModel, { - collection_cumulus_id: collectionRecord[0].cumulus_id, - created_at: new Date(), - updated_at: new Date(), - }); - - let numCalls = 0; - const originalIndexGranule = indexer.indexGranule; - const successCount = 4; - const indexGranuleStub = sinon.stub(indexer, 'indexGranule') - .callsFake(( - esClientArg, - payload, - index - ) => { - numCalls += 1; - if (numCalls <= successCount) { - return originalIndexGranule(esClientArg, payload, index); - } - throw new Error('fake error'); - }); - - let searchResults; - try { - await indexFromDatabase.handler({ - indexName: esAlias, - reconciliationReportsTable, - knex, - }); - - searchResults = await searchEs('granule', esAlias); - - t.is(searchResults.meta.count, successCount); - - searchResults.results.forEach((result) => { - const sourceData = fakeData.find((data) => data.granule_id === result.granuleId); - const expected = { - collectionId: `${collectionRecord[0].name}___${collectionRecord[0].version}`, - granuleId: sourceData.granule_id, - status: sourceData.status, - }; - const actual = { - collectionId: result.collectionId, - granuleId: result.granuleId, - status: result.status, - }; - - t.deepEqual(expected, actual); - }); - } finally { - indexGranuleStub.restore(); - await Promise.all(fakeData.map( - // eslint-disable-next-line camelcase - ({ granule_id }) => granulePgModel.delete(knex, { granule_id }) - )); - await Promise.all(searchResults.results.map( - (result) => - esClient.client.delete({ - index: esAlias, - type: 'granule', - id: result.granuleId, - parent: result.collectionId, - refresh: true, - }) - )); - } -}); - -test.serial( - 'failure in indexing record of one type should not prevent indexing of other records with different type', - async (t) => { - const { esAlias, esClient, knex } = t.context; - const numItems = 2; - const collectionRecord = await addFakeData( - knex, - 1, - fakeCollectionRecordFactory, - new CollectionPgModel() - ); - const [fakeProviderData, fakeGranuleData] = await Promise.all([ - addFakeData( - knex, - numItems, - fakeProviderRecordFactory, - new ProviderPgModel() - ), - addFakeData( - knex, - numItems, - fakeGranuleRecordFactory, - new GranulePgModel(), - { collection_cumulus_id: collectionRecord[0].cumulus_id } - ), - ]); - - const indexGranuleStub = sinon - .stub(indexer, 'indexGranule') - .throws(new Error('error')); - - let searchResults; - try { - await indexFromDatabase.handler({ - indexName: esAlias, - reconciliationReportsTable, - knex, - }); - - searchResults = await searchEs('provider', esAlias); - - t.is(searchResults.meta.count, numItems); - - searchResults.results.forEach((result) => { - const sourceData = fakeProviderData.find( - (data) => data.name === result.id - ); - t.deepEqual( - { host: result.host, id: result.id, protocol: result.protocol }, - { - host: sourceData.host, - id: sourceData.name, - protocol: sourceData.protocol, - } - ); - }); - } finally { - indexGranuleStub.restore(); - await Promise.all( - fakeProviderData.map(({ name }) => { - const pgModel = new ProviderPgModel(); - return pgModel.delete(knex, { name }); - }) - ); - await Promise.all( - fakeGranuleData.map( - // eslint-disable-next-line camelcase - ({ granule_id }) => new GranulePgModel().delete(knex, { granule_id }) - ) - ); - await Promise.all( - searchResults.results.map((result) => - esClient.client.delete({ - index: esAlias, - type: 'provider', - id: result.id, - refresh: true, - })) - ); - } - } -); diff --git a/packages/api/tests/lib/test-ingest.js b/packages/api/tests/lib/test-ingest.js index 521948ab899..2673f3f4756 100644 --- a/packages/api/tests/lib/test-ingest.js +++ b/packages/api/tests/lib/test-ingest.js @@ -22,9 +22,6 @@ const { fakeCollectionRecordFactory, getUniqueGranuleByGranuleId, } = require('@cumulus/db'); -const { - createTestIndex, -} = require('@cumulus/es-client/testUtils'); const { fakeGranuleFactoryV2, fakeCollectionFactory, @@ -37,11 +34,6 @@ const { const testDbName = randomString(12); const sandbox = sinon.createSandbox(); -const FakeEsClient = sandbox.stub(); -const esSearchStub = sandbox.stub(); -const esScrollStub = sandbox.stub(); -FakeEsClient.prototype.scroll = esScrollStub; -FakeEsClient.prototype.search = esSearchStub; let fakeExecution; let testCumulusMessage; @@ -64,10 +56,6 @@ test.before(async (t) => { t.context.knexAdmin = knexAdmin; t.context.granuleId = randomString(); - const { esIndex, esClient } = await createTestIndex(); - t.context.esIndex = esIndex; - t.context.esClient = esClient; - const { TopicArn } = await createSnsTopic(randomString()); t.context.granules_sns_topic_arn = TopicArn; process.env.granule_sns_topic_arn = t.context.granules_sns_topic_arn; diff --git a/packages/api/tests/lib/test-reconciliationReport.js b/packages/api/tests/lib/test-reconciliationReport.js index 2c7255475c7..e707df916de 100644 --- a/packages/api/tests/lib/test-reconciliationReport.js +++ b/packages/api/tests/lib/test-reconciliationReport.js @@ -1,5 +1,4 @@ const test = require('ava'); -const cryptoRandomString = require('crypto-random-string'); const rewire = require('rewire'); const range = require('lodash/range'); @@ -10,7 +9,6 @@ const { convertToDBCollectionSearchObject, convertToOrcaGranuleSearchParams, filterDBCollections, - compareEsGranuleAndApiGranule, } = require('../../lib/reconciliationReport'); const { fakeCollectionFactory } = require('../../lib/testUtils'); @@ -146,90 +144,3 @@ test("filterDBCollections filters collections by recReportParams's collectionIds t.deepEqual(actual, expected); }); - -test('compareEsGranuleAndApiGranule returns true for matching granules', (t) => { - const granule = { - granuleId: cryptoRandomString({ length: 5 }), - }; - const granule2 = { ...granule, files: [] }; - t.true(compareEsGranuleAndApiGranule(granule, granule2)); -}); - -test('compareEsGranuleAndApiGranule returns false for granules with different values', (t) => { - const granule = { - granuleId: cryptoRandomString({ length: 5 }), - }; - const granule2 = { ...granule, foo: 'bar' }; - t.false(compareEsGranuleAndApiGranule(granule, granule2)); -}); - -test('compareEsGranuleAndApiGranule returns false if one granule has files and other does not', (t) => { - const granule = { - granuleId: cryptoRandomString({ length: 5 }), - }; - const granule2 = { - ...granule, - files: [{ - bucket: 'bucket', - key: 'key', - }], - }; - t.false(compareEsGranuleAndApiGranule(granule, granule2)); -}); - -test('compareEsGranuleAndApiGranule returns false if granule file is missing from second granule', (t) => { - const granule = { - granuleId: cryptoRandomString({ length: 5 }), - files: [{ - bucket: 'bucket', - key: 'key', - }], - }; - const granule2 = { - ...granule, - files: [{ - bucket: 'bucket', - key: 'key2', - }], - }; - t.false(compareEsGranuleAndApiGranule(granule, granule2)); -}); - -test('compareEsGranuleAndApiGranule returns false if granule files have different properties', (t) => { - const granule = { - granuleId: cryptoRandomString({ length: 5 }), - files: [{ - bucket: 'bucket', - key: 'key', - }], - }; - const granule2 = { - ...granule, - files: [{ - bucket: 'bucket', - key: 'key', - size: 5, - }], - }; - t.false(compareEsGranuleAndApiGranule(granule, granule2)); -}); - -test('compareEsGranuleAndApiGranule returns false if granule files have different values for same property', (t) => { - const granule = { - granuleId: cryptoRandomString({ length: 5 }), - files: [{ - bucket: 'bucket', - key: 'key', - size: 1, - }], - }; - const granule2 = { - ...granule, - files: [{ - bucket: 'bucket', - key: 'key', - size: 5, - }], - }; - t.false(compareEsGranuleAndApiGranule(granule, granule2)); -}); diff --git a/packages/api/tests/performance/lib/test-write-granules.js b/packages/api/tests/performance/lib/test-write-granules.js index fb6aced84bf..2a123c7ef27 100644 --- a/packages/api/tests/performance/lib/test-write-granules.js +++ b/packages/api/tests/performance/lib/test-write-granules.js @@ -6,10 +6,6 @@ const pSettle = require('p-settle'); const cryptoRandomString = require('crypto-random-string'); const cloneDeep = require('lodash/cloneDeep'); -const { - getEsClient, - Search, -} = require('@cumulus/es-client/search'); const { createSnsTopic } = require('@cumulus/aws-client/SNS'); const StepFunctions = require('@cumulus/aws-client/StepFunctions'); @@ -39,10 +35,6 @@ const { const { getExecutionUrlFromArn, } = require('@cumulus/message/Executions'); -const { - createTestIndex, - cleanupTestIndex, -} = require('@cumulus/es-client/testUtils'); const { writeGranulesFromMessage, @@ -71,15 +63,6 @@ test.before(async (t) => { t.context.knex = knex; console.log(`Test DB max connection pool: ${t.context.knex.client.pool.max}`); - - const { esIndex, esClient } = await createTestIndex(); - t.context.esIndex = esIndex; - t.context.esClient = esClient; - t.context.esGranulesClient = new Search( - {}, - 'granule', - t.context.esIndex - ); }); test.beforeEach(async (t) => { @@ -220,13 +203,12 @@ test.after.always(async (t) => { await destroyLocalTestDb({ ...t.context, }); - await cleanupTestIndex(t.context); }); // This test is a performance test designed to run with a large number of messages // in a memory constrained test environment, it is not intended to run as part of // the normal unit test suite. -test('writeGranulesFromMessage operates on 2k granules with 10 files each within 1GB of ram when an instance of EsClient is passed in and concurrency is set to 60 and db connections are set to 60', async (t) => { +test('writeGranulesFromMessage operates on 2k granules with 10 files each within 1GB of ram when concurrency is set to 60 and db connections are set to 60', async (t) => { const { cumulusMessages, knex, @@ -237,13 +219,11 @@ test('writeGranulesFromMessage operates on 2k granules with 10 files each within // Message must be completed or files will not update - const esClient = await getEsClient(); await pSettle(cumulusMessages.map((cumulusMessage) => () => writeGranulesFromMessage({ cumulusMessage, executionCumulusId, providerCumulusId, knex, - esClient, testOverrides: { stepFunctionUtils }, })), { concurrency: t.context.concurrency }); diff --git a/packages/api/webpack.config.js b/packages/api/webpack.config.js index b33246ed82a..ee2f8d27ad7 100644 --- a/packages/api/webpack.config.js +++ b/packages/api/webpack.config.js @@ -24,12 +24,10 @@ module.exports = { mode: process.env.PRODUCTION ? 'production' : 'development', entry: { app: './app/index.js', - bootstrap: './lambdas/bootstrap.js', bulkOperation: './lambdas/bulk-operation.js', cleanExecutions: './lambdas/cleanExecutions.js', createReconciliationReport: './lambdas/create-reconciliation-report.js', distribution: './app/distribution.js', - indexFromDatabase: './lambdas/index-from-database.js', manualConsumer: './lambdas/manual-consumer.js', messageConsumer: './lambdas/message-consumer.js', payloadLogger: './lambdas/payload-logger.js', diff --git a/packages/async-operations/tests/test-async_operations.js b/packages/async-operations/tests/test-async_operations.js index 3aceaa599af..a04caac38c5 100644 --- a/packages/async-operations/tests/test-async_operations.js +++ b/packages/async-operations/tests/test-async_operations.js @@ -61,7 +61,7 @@ test.before(async (t) => { t.context.functionConfig = { Environment: { Variables: { - ES_HOST: 'es-host', + Timeout: 300, }, }, }; @@ -80,7 +80,7 @@ test.beforeEach((t) => { status: 'RUNNING', taskArn: cryptoRandomString({ length: 5 }), description: 'testing', - operationType: 'ES Index', + operationType: 'Reconciliation Report', createdAt: Date.now(), updatedAt: Date.now(), }; @@ -110,7 +110,7 @@ test.serial('startAsyncOperation uploads the payload to S3', async (t) => { callerLambdaName: randomString(), lambdaName: randomString(), description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload, stackName, knexConfig: knexConfig, @@ -142,7 +142,7 @@ test.serial('The AsyncOperation start method starts an ECS task with the correct lambdaName, callerLambdaName, description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload, stackName, knexConfig: knexConfig, @@ -188,7 +188,7 @@ test.serial('The AsyncOperation start method starts an ECS task with the asyncOp lambdaName, callerLambdaName, description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload, stackName, knexConfig: knexConfig, @@ -225,7 +225,7 @@ test.serial('The startAsyncOperation method throws error and calls createAsyncOp callerLambdaName: randomString(), lambdaName: randomString(), description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload: {}, stackName: randomString(), knexConfig: knexConfig, @@ -263,7 +263,7 @@ test.serial('The startAsyncOperation method throws error and calls createAsyncOp test('The startAsyncOperation writes records to the database', async (t) => { const description = randomString(); const stackName = randomString(); - const operationType = 'ES Index'; + const operationType = 'Reconciliation Report'; const taskArn = randomString(); stubbedEcsRunTaskResult = { @@ -291,7 +291,7 @@ test('The startAsyncOperation writes records to the database', async (t) => { const expected = { description, id, - operationType: 'ES Index', + operationType: 'Reconciliation Report', status: 'RUNNING', taskArn, }; @@ -317,7 +317,7 @@ test.serial('The startAsyncOperation method returns the newly-generated record', callerLambdaName: randomString(), lambdaName: randomString(), description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload: {}, stackName, knexConfig: knexConfig, @@ -340,7 +340,7 @@ test.serial('The startAsyncOperation method throws error if callerLambdaName par cluster: randomString, lambdaName: randomString, description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload: { x: randomString() }, stackName: randomString, knexConfig: knexConfig, @@ -360,7 +360,7 @@ test('getLambdaEnvironmentVariables returns expected environment variables', (t) const vars = getLambdaEnvironmentVariables(t.context.functionConfig); t.deepEqual(new Set(vars), new Set([ - { name: 'ES_HOST', value: 'es-host' }, + { name: 'Timeout', value: 300 }, ])); }); @@ -378,7 +378,7 @@ test.serial('ECS task params contain lambda environment variables when useLambda callerLambdaName: randomString(), lambdaName: randomString(), description: randomString(), - operationType: 'ES Index', + operationType: 'Reconciliation Report', payload: {}, useLambdaEnvironmentVariables: true, stackName, @@ -391,7 +391,7 @@ test.serial('ECS task params contain lambda environment variables when useLambda environmentOverrides[env.name] = env.value; }); - t.is(environmentOverrides.ES_HOST, 'es-host'); + t.is(environmentOverrides.Timeout, 300); }); test.serial('createAsyncOperation throws if stackName is not provided', async (t) => { diff --git a/packages/aws-client/src/services.ts b/packages/aws-client/src/services.ts index ff6bfd5572c..04f97ee8ea2 100644 --- a/packages/aws-client/src/services.ts +++ b/packages/aws-client/src/services.ts @@ -15,7 +15,6 @@ import { SNS } from '@aws-sdk/client-sns'; import { STS } from '@aws-sdk/client-sts'; import { ECS } from '@aws-sdk/client-ecs'; import { EC2 } from '@aws-sdk/client-ec2'; -import { ElasticsearchService } from '@aws-sdk/client-elasticsearch-service'; import awsClient from './client'; @@ -31,7 +30,6 @@ export const dynamodbDocClient = (docClientOptions?: TranslateConfig, dynamoOpti docClientOptions ); export const cf = awsClient(CloudFormation, '2010-05-15'); -export const es = awsClient(ElasticsearchService, '2015-01-01'); export const kinesis = awsClient(Kinesis, '2013-12-02'); export const kms = awsClient(KMS, '2014-11-01'); export const lambda = awsClient(Lambda, '2015-03-31'); diff --git a/packages/aws-client/tests/test-services.js b/packages/aws-client/tests/test-services.js index 994d27a68a0..9677f99ed58 100644 --- a/packages/aws-client/tests/test-services.js +++ b/packages/aws-client/tests/test-services.js @@ -6,7 +6,6 @@ const { CloudFormation } = require('@aws-sdk/client-cloudformation'); const { DynamoDB } = require('@aws-sdk/client-dynamodb'); const { ECS } = require('@aws-sdk/client-ecs'); const { EC2 } = require('@aws-sdk/client-ec2'); -const { ElasticsearchService } = require('@aws-sdk/client-elasticsearch-service'); const { Kinesis } = require('@aws-sdk/client-kinesis'); const { Lambda } = require('@aws-sdk/client-lambda'); const { S3 } = require('@aws-sdk/client-s3'); @@ -188,27 +187,6 @@ test('ec2() service defaults to localstack in test mode', async (t) => { ); }); -test('es() service defaults to localstack in test mode', async (t) => { - const es = services.es(); - const { - credentials, - endpoint, - } = localStackAwsClientOptions(ElasticsearchService); - t.like( - await es.config.credentials(), - credentials - ); - const esEndpoint = await es.config.endpoint(); - const localSatckEndpoint = new URL(endpoint); - t.like( - esEndpoint, - { - hostname: localSatckEndpoint.hostname, - port: Number.parseInt(localSatckEndpoint.port, 10), - } - ); -}); - test('kinesis() service defaults to localstack in test mode', async (t) => { const kinesis = services.kinesis(); const { diff --git a/packages/db/src/models/reconciliation_report.ts b/packages/db/src/models/reconciliation_report.ts index a8ff040d943..b9cf548f8ca 100644 --- a/packages/db/src/models/reconciliation_report.ts +++ b/packages/db/src/models/reconciliation_report.ts @@ -18,7 +18,7 @@ class ReconciliationReportPgModel extends BasePgModel { return super.create(knexOrTransaction, item, '*') as Promise; } diff --git a/packages/db/src/test-utils.ts b/packages/db/src/test-utils.ts index 6870a80d9c5..ae57ccd6451 100644 --- a/packages/db/src/test-utils.ts +++ b/packages/db/src/test-utils.ts @@ -138,7 +138,7 @@ export const fakeAsyncOperationRecordFactory = ( ): PostgresAsyncOperation => ({ id: uuidv4(), description: cryptoRandomString({ length: 10 }), - operation_type: 'ES Index', + operation_type: 'Reconciliation Report', status: 'RUNNING', output: { test: 'output' }, task_arn: cryptoRandomString({ length: 3 }), diff --git a/packages/db/tests/translate/test-async-operations.js b/packages/db/tests/translate/test-async-operations.js index d4030b2f7f1..f824497cc72 100644 --- a/packages/db/tests/translate/test-async-operations.js +++ b/packages/db/tests/translate/test-async-operations.js @@ -14,7 +14,7 @@ test('translateApiAsyncOperationToPostgresAsyncOperation converts a camelCase re status: 'RUNNING', taskArn: 'aws:arn:ecs:task:someTask', description: 'dummy operation', - operationType: 'ES Index', + operationType: 'Reconciliation Report', }; const expected = { @@ -38,7 +38,7 @@ test('translateApiAsyncOperationToPostgresAsyncOperation parses output from JSON status: 'SUCCEEDED', taskArn: 'aws:arn:ecs:task:someTask', description: 'dummy operation', - operationType: 'ES Index', + operationType: 'Reconciliation Report', output: JSON.stringify(operationOutput), }; @@ -60,7 +60,7 @@ test('translateApiAsyncOperationToPostgresAsyncOperation parses output from JSON status: 'SUCCEEDED', taskArn: 'aws:arn:ecs:task:someTask', description: 'dummy operation', - operationType: 'ES Index', + operationType: 'Reconciliation Report', output: operationOutput, }; @@ -83,7 +83,7 @@ test('translateApiAsyncOperationToPostgresAsyncOperation parses output from stri status: 'SUCCEEDED', taskArn: 'aws:arn:ecs:task:someTask', description: 'dummy operation', - operationType: 'ES Index', + operationType: 'Reconciliation Report', output: operationOutput, }; @@ -106,7 +106,7 @@ test('translateApiAsyncOperationToPostgresAsyncOperation parses output from JSON status: 'SUCCEEDED', taskArn: 'aws:arn:ecs:task:someTask', description: 'dummy operation', - operationType: 'ES Index', + operationType: 'Reconciliation Report', output: operationOutput, }; @@ -127,7 +127,7 @@ test('translateApiAsyncOperationToPostgresAsyncOperation discards \'none\' outpu status: 'SUCCEEDED', taskArn: 'aws:arn:ecs:task:someTask', description: 'dummy operation', - operationType: 'ES Index', + operationType: 'Reconciliation Report', output: 'none', }; @@ -162,7 +162,7 @@ test('translatePostgresAsyncOperationToApiAsyncOperation translates PostgreSQL r status: 'RUNNING', taskArn, description, - operationType: 'ES Index', + operationType: 'Reconciliation Report', output: JSON.stringify({ test: 'output' }), createdAt: createdAt.getTime(), updatedAt: updatedAt.getTime(), diff --git a/packages/tf-inventory/src/inventory.js b/packages/tf-inventory/src/inventory.js index da596087d63..b1764938880 100644 --- a/packages/tf-inventory/src/inventory.js +++ b/packages/tf-inventory/src/inventory.js @@ -1,6 +1,6 @@ 'use strict'; -const { ecs, ec2, es } = require('@cumulus/aws-client/services'); +const { ecs, ec2 } = require('@cumulus/aws-client/services'); const mergeWith = require('lodash/mergeWith'); const difference = require('lodash/difference'); @@ -80,13 +80,9 @@ async function listAwsResources() { ec2Instances = [].concat(...ec2Instances.Reservations.map((e) => e.Instances)); ec2Instances = ec2Instances.map((inst) => inst.InstanceId); - let esDomainNames = await es().listDomainNames(); - esDomainNames = esDomainNames.DomainNames.map((e) => e.DomainName); - return { ecsClusters: ecsClusters.clusterArns, ec2Instances, - esDomainNames, }; } diff --git a/packages/tf-inventory/tests/inventory.js b/packages/tf-inventory/tests/inventory.js index 3ef54f23341..960406631c8 100644 --- a/packages/tf-inventory/tests/inventory.js +++ b/packages/tf-inventory/tests/inventory.js @@ -3,7 +3,7 @@ const test = require('ava'); const rewire = require('rewire'); const sinon = require('sinon'); -const { ecs, ec2, es } = require('@cumulus/aws-client/services'); +const { ecs, ec2 } = require('@cumulus/aws-client/services'); const inventory = rewire('../src/inventory'); const mergeResourceLists = inventory.__get__('mergeResourceLists'); const resourceDiff = inventory.__get__('resourceDiff'); @@ -15,7 +15,6 @@ let listResourcesForFileStub; let listTfStateFilesStub; let ecsStub; let ec2Stub; -let esStub; /** * @@ -28,7 +27,6 @@ function resourcesForStateFile(sf) { return { ecsClusters: ['clusterArn1', 'clusterArn2'], ec2Instances: ['i-000'], - esDomainNames: ['cumulus-1-es5vpc'], }; } @@ -36,7 +34,6 @@ function resourcesForStateFile(sf) { return { ecsClusters: ['clusterArn3'], ec2Instances: ['i-111', 'i-222'], - esDomainNames: ['cumulus-2-es5vpc'], }; } @@ -80,17 +77,6 @@ test.before(() => { ], }), }); - - esStub = sinon.stub(es(), 'listDomainNames') - .returns( - Promise.resolve({ - DomainNames: [ - { DomainName: 'cumulus-es5vpc' }, - { DomainName: 'cumulus-1-es5vpc' }, - { DomainName: 'cumulus-2-es5vpc' }, - ], - }) - ); }); test.after.always(() => { @@ -98,7 +84,6 @@ test.after.always(() => { listTfStateFilesStub.restore(); ecsStub.restore(); ec2Stub.restore(); - esStub.restore(); }); test('mergeResourceLists merges resource object by key', (t) => { @@ -235,7 +220,6 @@ test('listTfResources merges resources correctly', async (t) => { t.deepEqual(tfResources, { ecsClusters: ['clusterArn1', 'clusterArn2', 'clusterArn3'], ec2Instances: ['i-000', 'i-111', 'i-222'], - esDomainNames: ['cumulus-1-es5vpc', 'cumulus-2-es5vpc'], }); }); @@ -246,7 +230,6 @@ test('listAwsResources properly combines ec2 intsances', async (t) => { { ecsClusters: ['clusterArn1', 'clusterArn2', 'clusterArn3', 'clusterArn4'], ec2Instances: ['i-000', 'i-111', 'i-222', 'i-333'], - esDomainNames: ['cumulus-es5vpc', 'cumulus-1-es5vpc', 'cumulus-2-es5vpc'], }); }); @@ -257,6 +240,5 @@ test('reconcileResources returns only resources not specified in TF files', asyn { ecsClusters: ['clusterArn4'], ec2Instances: ['i-333'], - esDomainNames: ['cumulus-es5vpc'], }); }); diff --git a/tf-modules/archive/api.tf b/tf-modules/archive/api.tf index a1f12066b19..47044ca26ee 100644 --- a/tf-modules/archive/api.tf +++ b/tf-modules/archive/api.tf @@ -56,7 +56,6 @@ locals { execution_sns_topic_arn = aws_sns_topic.report_executions_topic.arn idleTimeoutMillis = var.rds_connection_timing_configuration.idleTimeoutMillis IDP_LOGIN = var.saml_idp_login - IndexFromDatabaseLambda = aws_lambda_function.index_from_database.arn invoke = var.schedule_sf_function_arn invokeArn = var.schedule_sf_function_arn invokeReconcileLambda = aws_lambda_function.create_reconciliation_report.arn diff --git a/tf-modules/archive/bootstrap.tf b/tf-modules/archive/bootstrap.tf deleted file mode 100644 index 00468fe6d19..00000000000 --- a/tf-modules/archive/bootstrap.tf +++ /dev/null @@ -1,40 +0,0 @@ -resource "aws_lambda_function" "custom_bootstrap" { - function_name = "${var.prefix}-CustomBootstrap" - filename = "${path.module}/../../packages/api/dist/bootstrap/lambda.zip" - source_code_hash = filebase64sha256("${path.module}/../../packages/api/dist/bootstrap/lambda.zip") - handler = "index.handler" - role = var.lambda_processing_role_arn - runtime = "nodejs20.x" - timeout = lookup(var.lambda_timeouts, "CustomBootstrap", 300) - memory_size = lookup(var.lambda_memory_sizes, "CustomBootstrap", 512) - environment { - variables = { - stackName = var.prefix - system_bucket = var.system_bucket - ES_INDEX_SHARDS = var.es_index_shards - } - } - - tags = var.tags - - dynamic "vpc_config" { - for_each = length(var.lambda_subnet_ids) == 0 ? [] : [1] - content { - subnet_ids = var.lambda_subnet_ids - security_group_ids = local.lambda_security_group_ids - } - } -} - -data "aws_lambda_invocation" "custom_bootstrap" { - count = var.elasticsearch_hostname != null ? 1 : 0 - depends_on = [aws_lambda_function.custom_bootstrap] - function_name = aws_lambda_function.custom_bootstrap.function_name - - input = jsonencode( - { - elasticsearchHostname = var.elasticsearch_hostname - removeAliasConflict = var.elasticsearch_remove_index_alias_conflict - replacementTrigger = timestamp() - }) -} diff --git a/tf-modules/archive/index_from_database.tf b/tf-modules/archive/index_from_database.tf deleted file mode 100644 index 61138dd4664..00000000000 --- a/tf-modules/archive/index_from_database.tf +++ /dev/null @@ -1,112 +0,0 @@ -resource "aws_lambda_function" "index_from_database" { - function_name = "${var.prefix}-IndexFromDatabase" - filename = "${path.module}/../../packages/api/dist/indexFromDatabase/lambda.zip" - source_code_hash = filebase64sha256("${path.module}/../../packages/api/dist/indexFromDatabase/lambda.zip") - handler = "index.handler" - role = aws_iam_role.index_from_database.arn - runtime = "nodejs20.x" - timeout = lookup(var.lambda_timeouts, "IndexFromDatabase", 300) - memory_size = lookup(var.lambda_memory_sizes, "IndexFromDatabase", 512) - environment { - variables = { - CMR_ENVIRONMENT = var.cmr_environment - CMR_HOST = var.cmr_custom_host - databaseCredentialSecretArn = var.rds_user_access_secret_arn - ES_CONCURRENCY = var.es_request_concurrency - ES_HOST = var.elasticsearch_hostname - ReconciliationReportsTable = var.dynamo_tables.reconciliation_reports.name - stackName = var.prefix - } - } - tags = var.tags - - dynamic "vpc_config" { - for_each = length(var.lambda_subnet_ids) == 0 ? [] : [1] - content { - subnet_ids = var.lambda_subnet_ids - security_group_ids = concat(local.lambda_security_group_ids, [var.rds_security_group]) - } - } -} - - -resource "aws_iam_role" "index_from_database" { - name = "${var.prefix}-index_from_database" - assume_role_policy = data.aws_iam_policy_document.lambda_assume_role_policy.json - permissions_boundary = var.permissions_boundary_arn - - tags = var.tags -} - - -resource "aws_iam_role_policy" "index_from_database" { - name = "${var.prefix}_index_from_database_policy" - role = aws_iam_role.index_from_database.id - policy = data.aws_iam_policy_document.index_from_database.json -} - - -data "aws_iam_policy_document" "index_from_database" { - statement { - actions = ["ecs:RunTask"] - resources = [aws_ecs_task_definition.async_operation.arn] - } - - statement { - actions = [ - "ec2:CreateNetworkInterface", - "ec2:DeleteNetworkInterface", - "ec2:DescribeNetworkInterfaces", - "logs:DescribeLogStreams", - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ] - resources = ["*"] - } - - statement { - actions = [ - "dynamodb:GetItem", - "dynamodb:Scan", - ] - resources = [for k, v in var.dynamo_tables : v.arn] - } - - statement { - actions = ["dynamodb:Query"] - resources = [for k, v in var.dynamo_tables : "${v.arn}/index/*"] - } - - statement { - actions = [ - "dynamodb:GetRecords", - "dynamodb:GetShardIterator", - "dynamodb:DescribeStream", - "dynamodb:ListStreams" - ] - resources = [for k, v in var.dynamo_tables : "${v.arn}/stream/*"] - } - - statement { - actions = ["dynamodb:ListTables"] - resources = ["*"] - } - - statement { - actions = ["secretsmanager:GetSecretValue"] - resources = [ - aws_secretsmanager_secret.api_cmr_password.arn, - aws_secretsmanager_secret.api_launchpad_passphrase.arn, - var.rds_user_access_secret_arn - ] - } - - statement { - actions = [ - "ssm:GetParameter" - ] - resources = [aws_ssm_parameter.dynamo_table_names.arn] - } -} - diff --git a/tf-modules/archive/reconciliation_report.tf b/tf-modules/archive/reconciliation_report.tf index b09ef5a337f..6db11219711 100644 --- a/tf-modules/archive/reconciliation_report.tf +++ b/tf-modules/archive/reconciliation_report.tf @@ -15,8 +15,6 @@ resource "aws_lambda_function" "create_reconciliation_report" { CMR_HOST = var.cmr_custom_host DISTRIBUTION_ENDPOINT = var.distribution_url ES_HOST = var.elasticsearch_hostname - ES_SCROLL = lookup(var.elasticsearch_client_config, "create_reconciliation_report_es_scroll_duration", "6m") - ES_SCROLL_SIZE = lookup(var.elasticsearch_client_config, "create_reconciliation_report_es_scroll_size", 1000) stackName = var.prefix system_bucket = var.system_bucket cmr_client_id = var.cmr_client_id