Skip to content

Commit

Permalink
Rename symbols that use elasticsearch to generic or opensearch (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
Phil Varner authored Dec 19, 2022
1 parent 592d80a commit 7e4c11f
Show file tree
Hide file tree
Showing 27 changed files with 104 additions and 101 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- [Architecture](#architecture)
- [Migration](#migration)
- [0.4.x -\> 0.5.x](#04x---05x)
- [OpenSearch migration](#opensearch-migration)
- [Elasticsearch to OpenSearch Migration](#elasticsearch-to-opensearch-migration)
- [Preferred Elasticsearch to OpenSearch Migration Process](#preferred-elasticsearch-to-opensearch-migration-process)
- [0.3.x -\> 0.4.x](#03x---04x)
- [Elasticsearch upgrade from 7.9 to 7.10](#elasticsearch-upgrade-from-79-to-710)
Expand Down Expand Up @@ -119,7 +119,7 @@ apiLambda --> opensearch

### 0.4.x -> 0.5.x

#### OpenSearch migration
#### Elasticsearch to OpenSearch Migration

By default, a new deployment of 0.5.x will use OpenSearch instead of Elasticsearch. There
are three options if you have an existing deployment that uses Elasticsearch:
Expand All @@ -142,6 +142,9 @@ are three options if you have an existing deployment that uses Elasticsearch:
upgrade the Elasticsearch domain to OpenSearch, and connect the domain to the new CF Stack.
This is described below.

Additionally, the `ES_HOST` variable used in the serverless.yml file has been
renamed `OPENSEARCH_HOST`.

#### Preferred Elasticsearch to OpenSearch Migration Process

**Note! The migration must be done carefully to avoid losing the database!**
Expand Down
38 changes: 19 additions & 19 deletions src/lambdas/api/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const createError = require('http-errors')
const express = require('express')
const logger = require('morgan')
const path = require('path')
const es = require('../../lib/es')
const database = require('../../lib/database')
const api = require('../../lib/api')
const { readFile } = require('../../lib/fs')
const { addEndpoint } = require('./middleware/add-endpoint')
Expand All @@ -28,7 +28,7 @@ app.use(addEndpoint)

app.get('/', async (req, res, next) => {
try {
res.json(await api.getCatalog(txnEnabled, es, req.endpoint))
res.json(await api.getCatalog(txnEnabled, database, req.endpoint))
} catch (error) {
next(error)
}
Expand Down Expand Up @@ -63,7 +63,7 @@ app.get('/conformance', async (_req, res, next) => {
app.get('/search', async (req, res, next) => {
try {
res.type('application/geo+json')
res.json(await api.searchItems(null, req.query, es, req.endpoint, 'GET'))
res.json(await api.searchItems(null, req.query, database, req.endpoint, 'GET'))
} catch (error) {
if (error instanceof api.ValidationError) {
next(createError(400, error.message))
Expand All @@ -76,7 +76,7 @@ app.get('/search', async (req, res, next) => {
app.post('/search', async (req, res, next) => {
try {
res.type('application/geo+json')
res.json(await api.searchItems(null, req.body, es, req.endpoint, 'POST'))
res.json(await api.searchItems(null, req.body, database, req.endpoint, 'POST'))
} catch (error) {
if (error instanceof api.ValidationError) {
next(createError(400, error.message))
Expand All @@ -88,7 +88,7 @@ app.post('/search', async (req, res, next) => {

app.get('/aggregate', async (req, res, next) => {
try {
res.json(await api.aggregate(req.query, es, req.endpoint, 'GET'))
res.json(await api.aggregate(req.query, database, req.endpoint, 'GET'))
} catch (error) {
if (error instanceof api.ValidationError) {
next(createError(400, error.message))
Expand All @@ -100,7 +100,7 @@ app.get('/aggregate', async (req, res, next) => {

app.post('/aggregate', async (req, res, next) => {
try {
res.json(await api.aggregate(req.body, es, req.endpoint, 'POST'))
res.json(await api.aggregate(req.body, database, req.endpoint, 'POST'))
} catch (error) {
if (error instanceof api.ValidationError) {
next(createError(400, error.message))
Expand All @@ -112,7 +112,7 @@ app.post('/aggregate', async (req, res, next) => {

app.get('/collections', async (req, res, next) => {
try {
res.json(await api.getCollections(es, req.endpoint))
res.json(await api.getCollections(database, req.endpoint))
} catch (error) {
next(error)
}
Expand All @@ -122,7 +122,7 @@ app.post('/collections', async (req, res, next) => {
if (txnEnabled) {
const collectionId = req.body.collection
try {
await api.createCollection(req.body, es)
await api.createCollection(req.body, database)
res.location(`${req.endpoint}/collections/${collectionId}`)
res.sendStatus(201)
} catch (error) {
Expand All @@ -142,7 +142,7 @@ app.post('/collections', async (req, res, next) => {
app.get('/collections/:collectionId', async (req, res, next) => {
const { collectionId } = req.params
try {
const response = await api.getCollection(collectionId, es, req.endpoint)
const response = await api.getCollection(collectionId, database, req.endpoint)

if (response instanceof Error) next(createError(404))
else res.json(response)
Expand All @@ -154,14 +154,14 @@ app.get('/collections/:collectionId', async (req, res, next) => {
app.get('/collections/:collectionId/items', async (req, res, next) => {
const { collectionId } = req.params
try {
const response = await api.getCollection(collectionId, es, req.endpoint)
const response = await api.getCollection(collectionId, database, req.endpoint)

if (response instanceof Error) next(createError(404))
else {
const items = await api.searchItems(
collectionId,
req.query,
es,
database,
req.endpoint,
'GET'
)
Expand All @@ -185,11 +185,11 @@ app.post('/collections/:collectionId/items', async (req, res, next) => {
if (req.body.collection && req.body.collection !== collectionId) {
next(createError(400, 'Collection resource URI must match collection in body'))
} else {
const collectionRes = await api.getCollection(collectionId, es, req.endpoint)
const collectionRes = await api.getCollection(collectionId, database, req.endpoint)
if (collectionRes instanceof Error) next(createError(404))
try {
req.body.collection = collectionId
await api.createItem(req.body, es)
await api.createItem(req.body, database)
res.location(`${req.endpoint}/collections/${collectionId}/items/${itemId}`)
res.sendStatus(201)
} catch (error) {
Expand All @@ -214,7 +214,7 @@ app.get('/collections/:collectionId/items/:itemId', async (req, res, next) => {
const response = await api.getItem(
collectionId,
itemId,
es,
database,
req.endpoint
)

Expand Down Expand Up @@ -242,13 +242,13 @@ app.put('/collections/:collectionId/items/:itemId', async (req, res, next) => {
} else if (req.body.id && req.body.id !== itemId) {
next(createError(400, 'Item ID in resource URI must match id in body'))
} else {
const itemRes = await api.getItem(collectionId, itemId, es, req.endpoint)
const itemRes = await api.getItem(collectionId, itemId, database, req.endpoint)
if (itemRes instanceof Error) next(createError(404))
else {
req.body.collection = collectionId
req.body.id = itemId
try {
await api.updateItem(req.body, es)
await api.updateItem(req.body, database)
res.sendStatus(204)
} catch (error) {
if (error instanceof Error
Expand All @@ -275,13 +275,13 @@ app.patch('/collections/:collectionId/items/:itemId', async (req, res, next) =>
} else if (req.body.id && req.body.id !== itemId) {
next(createError(400, 'Item ID in resource URI must match id in body'))
} else {
const itemRes = await api.getItem(collectionId, itemId, es, req.endpoint)
const itemRes = await api.getItem(collectionId, itemId, database, req.endpoint)
if (itemRes instanceof Error) next(createError(404))
else {
try {
//const item =
await api.partialUpdateItem(
collectionId, itemId, req.body, es, req.endpoint
collectionId, itemId, req.body, database, req.endpoint
)
// res.type('application/geo+json')
// res.json(item)
Expand All @@ -300,7 +300,7 @@ app.delete('/collections/:collectionId/items/:itemId', async (req, res, next) =>
if (txnEnabled) {
const { collectionId, itemId } = req.params
try {
const response = await api.deleteItem(collectionId, itemId, es)
const response = await api.deleteItem(collectionId, itemId, database)
if (response instanceof Error) next(createError(500))
else {
res.sendStatus(204)
Expand Down
6 changes: 3 additions & 3 deletions src/lambdas/ingest/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { default: got } = require('got')
const esClient = require('../../lib/esClient.js')
const stream = require('../../lib/esStream.js')
const dbClient = require('../../lib/databaseClient.js')
const stream = require('../../lib/databaseStream.js')
const ingest = require('../../lib/ingest.js')
const s3Utils = require('../../lib/s3-utils')

Expand Down Expand Up @@ -53,7 +53,7 @@ module.exports.handler = async function handler(event, context) {
logger.debug(`Event: ${JSON.stringify(event, undefined, 2)}`)

if (event.create_indices) {
await esClient.createIndex('collections')
await dbClient.createIndex('collections')
}

const stacItems = isSqsEvent(event)
Expand Down
2 changes: 1 addition & 1 deletion src/lib/api.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { pickBy, assign, get: getNested } = require('lodash')
const extent = require('@mapbox/extent')
const { DateTime } = require('luxon')
const { isIndexNotFoundError } = require('./es')
const { isIndexNotFoundError } = require('./database')
const logger = console

// max number of collections to retrieve
Expand Down
34 changes: 17 additions & 17 deletions src/lib/es.js → src/lib/database.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const esClient = require('./esClient')
const dbClient = require('./databaseClient')
const logger = console //require('./logger')

const COLLECTIONS_INDEX = process.env.COLLECTIONS_INDEX || 'collections'
Expand Down Expand Up @@ -228,11 +228,11 @@ function buildFieldsFilter(parameters) {
*
*/
async function indexCollection(collection) {
const client = await esClient.client()
const client = await dbClient.client()

const exists = await client.indices.exists({ index: COLLECTIONS_INDEX })
if (!exists.body) {
await esClient.createIndex(COLLECTIONS_INDEX)
await dbClient.createIndex(COLLECTIONS_INDEX)
}

const collectionDocResponse = await client.index({
Expand All @@ -242,7 +242,7 @@ async function indexCollection(collection) {
opType: 'create'
})

const indexCreateResponse = await esClient.createIndex(collection.id)
const indexCreateResponse = await dbClient.createIndex(collection.id)

return [collectionDocResponse, indexCreateResponse]
}
Expand All @@ -252,7 +252,7 @@ async function indexCollection(collection) {
*
*/
async function indexItem(item) {
const client = await esClient.client()
const client = await dbClient.client()

const exists = await client.indices.exists({ index: item.collection })
if (!exists.body) {
Expand Down Expand Up @@ -282,7 +282,7 @@ async function indexItem(item) {
*
*/
async function partialUpdateItem(collectionId, itemId, updateFields) {
const client = await esClient.client()
const client = await dbClient.client()

// Handle inserting required default properties to `updateFields`
const requiredProperties = {
Expand All @@ -309,7 +309,7 @@ async function partialUpdateItem(collectionId, itemId, updateFields) {
}

async function deleteItem(collectionId, itemId) {
const client = await esClient.client()
const client = await dbClient.client()
if (client === undefined) throw new Error('Client is undefined')
return await client.delete_by_query({
index: collectionId,
Expand All @@ -318,9 +318,9 @@ async function deleteItem(collectionId, itemId) {
})
}

async function esQuery(parameters) {
async function dbQuery(parameters) {
logger.info(`Search database query: ${JSON.stringify(parameters)}`)
const client = await esClient.client()
const client = await dbClient.client()
if (client === undefined) throw new Error('Client is undefined')
const response = await client.search(parameters)
logger.info(`Response: ${JSON.stringify(response)}`)
Expand All @@ -329,7 +329,7 @@ async function esQuery(parameters) {

// get single collection
async function getCollection(collectionId) {
const response = await esQuery({
const response = await dbQuery({
index: COLLECTIONS_INDEX,
body: buildIdQuery(collectionId)
})
Expand All @@ -342,7 +342,7 @@ async function getCollection(collectionId) {
// get all collections
async function getCollections(page = 1, limit = 100) {
try {
const response = await esQuery({
const response = await dbQuery({
index: COLLECTIONS_INDEX,
size: limit,
from: (page - 1) * limit
Expand Down Expand Up @@ -391,18 +391,18 @@ async function constructSearchParams(parameters, page, limit) {

async function search(parameters, page, limit = 10) {
const searchParams = await constructSearchParams(parameters, page, limit)
const esResponse = await esQuery({
const dbResponse = await dbQuery({
ignore_unavailable: true,
allow_no_indices: true,
...searchParams
})

const results = esResponse.body.hits.hits.map((r) => (r._source))
const results = dbResponse.body.hits.hits.map((r) => (r._source))
const response = {
results,
context: {
limit: Number(limit),
matched: esResponse.body.hits.total.value,
matched: dbResponse.body.hits.total.value,
returned: results.length
}
}
Expand Down Expand Up @@ -475,13 +475,13 @@ async function aggregate(parameters) {
}
}

const esResponse = await esQuery({
const dbResponse = await dbQuery({
ignore_unavailable: true,
allow_no_indices: true,
...searchParams
})

return esResponse
return dbResponse
}

const getItem = async (collectionId, itemId) => {
Expand All @@ -505,7 +505,7 @@ const getItemCreated = async (collectionId, itemId) => {
*
*/
async function updateItem(item) {
const client = await esClient.client()
const client = await dbClient.client()

const exists = await client.indices.exists({ index: item.collection })
if (!exists.body) {
Expand Down
14 changes: 7 additions & 7 deletions src/lib/esClient.js → src/lib/databaseClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const logger = console //require('./logger')
const { collectionsIndexConfiguration } = require('../../fixtures/collections')
const { itemsIndexConfiguration } = require('../../fixtures/items')

let _esClient
let _dbClient

function createClientWithUsernameAndPassword(host, username, password) {
const protocolAndHost = host.split('://')
Expand Down Expand Up @@ -71,19 +71,19 @@ async function connect() {
}

// get existing search database client or create a new one
async function esClient() {
if (_esClient) {
async function dbClient() {
if (_dbClient) {
logger.debug('Using existing search database connection')
} else {
_esClient = await connect()
_dbClient = await connect()
logger.debug('Connected to search database')
}

return _esClient
return _dbClient
}

async function createIndex(index) {
const client = await esClient()
const client = await dbClient()
const exists = await client.indices.exists({ index })
const indexConfiguration = index === 'collections'
? collectionsIndexConfiguration() : itemsIndexConfiguration()
Expand All @@ -101,7 +101,7 @@ async function createIndex(index) {
}

module.exports = {
client: esClient,
client: dbClient,
createIndex,
connect
}
Loading

0 comments on commit 7e4c11f

Please sign in to comment.