Skip to content

Commit

Permalink
602 Active Participants Caching (#566)
Browse files Browse the repository at this point in the history
* 602 participants caching - initial implementation

* 602 participants caching - unit tests fixed

* 602 participants caching - new unit tests implemented

* 602 participants caching - integration tests fixed

* 602 participants caching - unit tests fixed

* 602 participants caching - added unit test for cache invalidation

* 602 participants caching - updated configs

* 602 participants caching - fixed integration tests
  • Loading branch information
yosheeck authored Jan 23, 2020
1 parent bd3f72c commit 92fd1d1
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 24 deletions.
3 changes: 3 additions & 0 deletions docker/central-ledger/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
]
},
"INTERNAL_TRANSFER_VALIDITY_SECONDS": "432000",
"CACHE": {
"MAX_BYTE_SIZE": 10000000
},
"KAFKA": {
"TOPIC_TEMPLATES": {
"PARTICIPANT_TOPIC_TEMPLATE": {
Expand Down
83 changes: 82 additions & 1 deletion src/lib/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,29 @@ const Enums = require('../lib/enum')
const ttl = 60 * 1000
let catboxMemoryClient = null

/*
Each client should register its API during module load.
This is to simplify file structure, so that at the same time:
- cache.js can control the life-cycle of underlying data (e.g. init, destroy, refresh, enable/disable, ttl)
- while leaving cached APIs and uncached APIs in their own namespaces (e.g. files or dirs)
*/
const cacheClients = {
participant: {
api: {
getAllNoCache: null
}
}
}

const participantsAllCacheKey = {
segment: 'participants',
id: 'all'
}

const registerParticipantClient = (participantClient) => {
cacheClients.participant.api = participantClient
}

const initCache = async function () {
// Init catbox.
// Note: The strange looking "module.exports.CatboxMemory" reference
Expand All @@ -18,10 +41,12 @@ const initCache = async function () {

// Preload data
await _getAllEnums()
await getParticipantsCached()
}

const destroyCache = function () {
const destroyCache = async function () {
catboxMemoryClient.stop()
catboxMemoryClient = null
}

const _getAllEnums = async function () {
Expand All @@ -46,20 +71,76 @@ const getEnums = async (id) => {
enums = await Enums[id]()
catboxMemoryClient.set(key, enums, ttl)
} else {
// unwrap from catbox structure
enums = enumsFromCache.item
}
}
return enums
}

const buildUnifiedParticipantsData = (allParticipants) => {
// build indexes (or indices?) - optimization for byId and byName access
const indexById = {}
const indexByName = {}

allParticipants.forEach((oneParticipant) => {
// Participant API returns Date type, but cache internals will serialize it to String
// by calling JSON.stringify(), which calls .toISOString() on a Date object.
// Let's ensure all places return same kind of String.
oneParticipant.createdDate = JSON.stringify(oneParticipant.createdDate)

// Add to indexes
indexById[oneParticipant.participantId] = oneParticipant
indexByName[oneParticipant.name] = oneParticipant
})

// build unified structure - indexes + data
const unifiedParticipants = {
indexById,
indexByName,
allParticipants
}

return unifiedParticipants
}

const getParticipantsCached = async () => {
// Do we have valid participants list in the cache ?
let cachedParticipants = catboxMemoryClient.get(participantsAllCacheKey)
if (!cachedParticipants) {
// No participants in the cache, so fetch from participan API
const allParticipants = await cacheClients.participant.api.getAllNoCache()
cachedParticipants = buildUnifiedParticipantsData(allParticipants)

// store in cache
catboxMemoryClient.set(participantsAllCacheKey, cachedParticipants, ttl)
} else {
// unwrap participants list from catbox structure
cachedParticipants = cachedParticipants.item
}
return cachedParticipants
}

const invalidateParticipantsCache = async () => {
catboxMemoryClient.drop(participantsAllCacheKey)
}

module.exports = {
// Clients registration
registerParticipantClient,

// Init & destroy the cache
initCache,
destroyCache,

// enums
getEnums,

// participants
getParticipantsCached,
buildUnifiedParticipantsData,
invalidateParticipantsCache,

// exposed for tests
CatboxMemory
}
41 changes: 33 additions & 8 deletions src/models/participant/participant.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,46 @@

const Db = require('../../lib/db')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Cache = require('../../lib/cache')

/*
Private API for Cache-only callbacks and tests
*/
exports.getAllNoCache = async () => {
return Db.participant.find({}, { order: 'name asc' })
}

const participantCacheClient = {
getAllNoCache: exports.getAllNoCache
}

Cache.registerParticipantClient(participantCacheClient)

/*
Public API
*/
exports.getById = async (id) => {
try {
return await Db.participant.findOne({ participantId: id })
const cachedParticipants = await Cache.getParticipantsCached()
return cachedParticipants.indexById[id]
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

exports.getByName = async (name) => {
try {
const named = await Db.participant.findOne({ name })
return named
const cachedParticipants = await Cache.getParticipantsCached()
return cachedParticipants.indexByName[name]
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

exports.getAll = async () => {
try {
const participants = await Db.participant.find({}, { order: 'name asc' })
return participants
const cachedParticipants = await Cache.getParticipantsCached()
return cachedParticipants.allParticipants
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
Expand All @@ -62,6 +80,7 @@ exports.create = async (participant) => {
name: participant.name,
createdBy: 'unknown'
})
await Cache.invalidateParticipantsCache()
return result
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
Expand All @@ -70,23 +89,29 @@ exports.create = async (participant) => {

exports.update = async (participant, isActive) => {
try {
return await Db.participant.update({ participantId: participant.participantId }, { isActive })
const result = await Db.participant.update({ participantId: participant.participantId }, { isActive })
await Cache.invalidateParticipantsCache()
return result
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

exports.destroyByName = async (name) => {
try {
return await Db.participant.destroy({ name: name })
const result = await Db.participant.destroy({ name: name })
await Cache.invalidateParticipantsCache()
return result
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

exports.destroyParticipantEndpointByParticipantId = async (participantId) => {
try {
return Db.participantEndpoint.destroy({ participantId: participantId })
const result = Db.participantEndpoint.destroy({ participantId: participantId })
await Cache.invalidateParticipantsCache()
return result
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
Expand Down
3 changes: 3 additions & 0 deletions test/integration-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
]
},
"INTERNAL_TRANSFER_VALIDITY_SECONDS": "432000",
"CACHE": {
"MAX_BYTE_SIZE": 10000000
},
"EMAIL_USER": "modusboxemailtest@gmail.com",
"EMAIL_PASSWORD": "April2o0%",
"EMAIL_SMTP": "smtp.gmail.com",
Expand Down
19 changes: 11 additions & 8 deletions test/integration/domain/participant/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
const Test = require('tape')
const Sinon = require('sinon')
const Db = require('../../../../src/lib/db')
const Cache = require('../../../../src/lib/cache')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('../../../../src/lib/config')
const ParticipantService = require('../../../../src/domain/participant')
Expand Down Expand Up @@ -69,6 +70,7 @@ Test('Participant service', async (participantTest) => {
assert.fail()
assert.end()
})
await Cache.initCache()
} catch (err) {
Logger.error(`Setup for test failed with error - ${err}`)
assert.fail()
Expand Down Expand Up @@ -118,15 +120,15 @@ Test('Participant service', async (participantTest) => {
getByNameResult = await ParticipantService.getByName(testData.fsp4Name)
result = await ParticipantHelper.prepareData(testData.fsp4Name, testData.currency, !!getByNameResult)
participantFixtures.push(result.participant)
participantFixtures.forEach(async participant => {
for (const participant of participantFixtures) {
const read = await ParticipantService.getById(participant.participantId)
participantMap.set(participant.participantId, read)
if (debug) assert.comment(`Testing with participant \n ${JSON.stringify(participant, null, 2)}`)
assert.equal(read.name, participant.name, 'names are equal')
assert.deepEqual(read.currencyList, participant.currencyList, 'currency match')
assert.equal(read.isActive, participant.isActive, 'isActive flag matches')
assert.equal(read.createdDate.toString(), participant.createdDate.toString(), 'created date matches')
})
}
assert.end()
} catch (err) {
Logger.error(`create participant failed with error - ${err}`)
Expand All @@ -137,13 +139,13 @@ Test('Participant service', async (participantTest) => {

await participantTest.test('getByName', async (assert) => {
try {
participantFixtures.forEach(async participant => {
for (const participant of participantFixtures) {
const result = await ParticipantService.getByName(participant.name)
assert.equal(result.name, participant.name, 'names are equal')
assert.deepEqual(result.currencyList, participant.currencyList, 'currencies match')
assert.equal(result.isActive, participant.isActive, 'isActive flag matches')
assert.equal(result.createdDate.toString(), participant.createdDate.toString(), 'created date matches')
})
}
assert.end()
} catch (err) {
Logger.error(`get participant by name failed with error - ${err}`)
Expand Down Expand Up @@ -199,11 +201,11 @@ Test('Participant service', async (participantTest) => {
endpointsFixtures.push(result)
result = await ParticipantEndpointHelper.prepareData(participant.name, 'FSPIOP_CALLBACK_URL_TRX_REQ_SERVICE', testData.endpointBase)
endpointsFixtures.push(result)
endpointsFixtures.forEach(async endpoint => {
for (const endpoint of endpointsFixtures) {
const read = await ParticipantService.getEndpoint(participant.name, endpoint.type)
assert.equal(read[0].name, endpoint.type, `endpoint type ${endpoint.type} equal`)
assert.equal(read[0].value, endpoint.value, 'endpoint values match')
})
}
participant = participantFixtures[1]
await ParticipantEndpointHelper.prepareData(participant.name, 'FSPIOP_CALLBACK_URL_TRANSFER_POST', `${testData.endpointBase}/transfers`)
await ParticipantEndpointHelper.prepareData(participant.name, 'FSPIOP_CALLBACK_URL_TRANSFER_PUT', `${testData.endpointBase}/transfers/{{transferId}}`)
Expand Down Expand Up @@ -254,12 +256,12 @@ Test('Participant service', async (participantTest) => {

await participantTest.test('getEndpoint', async (assert) => {
try {
endpointsFixtures.forEach(async endpoint => {
for (const endpoint of endpointsFixtures) {
const result = await ParticipantService.getEndpoint(participantFixtures[0].name, endpoint.type)
assert.equal(result[0].name, endpoint.type, `endpoint type ${endpoint.type} equal`)
assert.equal(result[0].value, endpoint.value, 'endpoint values match')
assert.equal(result[0].isActive, 1, 'isActive flag match')
})
}
assert.end()
} catch (err) {
Logger.error(`get endpoint failed with error - ${err}`)
Expand Down Expand Up @@ -421,6 +423,7 @@ Test('Participant service', async (participantTest) => {
assert.ok(result, `destroy ${participant.name} success`)
}
}
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
// @ggrg: Having the following 3 lines commented prevents the current test from exiting properly when run individually,
Expand Down
3 changes: 3 additions & 0 deletions test/integration/handlers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const Logger = require('@mojaloop/central-services-logger')
const Config = require('../../../src/lib/config')
const sleep = require('@mojaloop/central-services-shared').Util.Time.sleep
const Db = require('@mojaloop/central-services-database').Db
const Cache = require('../../../src/lib/cache')
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
Expand Down Expand Up @@ -244,6 +245,7 @@ Test('Handlers test', async handlersTest => {
await handlersTest.test('registerAllHandlers should', async registerAllHandlers => {
await registerAllHandlers.test('setup handlers', async (test) => {
await Db.connect(Config.DATABASE)
await Cache.initCache()
await Handlers.transfers.registerPrepareHandler()
await Handlers.positions.registerPositionHandler()
await Handlers.transfers.registerFulfilHandler()
Expand Down Expand Up @@ -608,6 +610,7 @@ Test('Handlers test', async handlersTest => {
await handlersTest.test('teardown', async (assert) => {
try {
await Handlers.timeouts.stop()
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')

Expand Down
3 changes: 3 additions & 0 deletions test/integration/models/transfer/facade.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

const Test = require('tape')
const Db = require('../../../../src/lib/db')
const Cache = require('../../../../src/lib/cache')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('../../../../src/lib/config')
const TransferFacade = require('../../../../src/models/transfer/facade')
Expand All @@ -42,6 +43,7 @@ Test('Transfer read model test', async (transferReadModelTest) => {
await transferReadModelTest.test('setup', async (assert) => {
try {
await Db.connect(Config.DATABASE).then(async () => {
await Cache.initCache()
transferPrepareResult = await HelperModule.prepareNeededData('transferModel')
assert.pass('setup OK')
assert.end()
Expand Down Expand Up @@ -84,6 +86,7 @@ Test('Transfer read model test', async (transferReadModelTest) => {

await transferReadModelTest.test('teardown', async (assert) => {
try {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
assert.end()
Expand Down
3 changes: 3 additions & 0 deletions test/integration/models/transfer/ilpPacket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

const Test = require('tape')
const Db = require('../../../../src/lib/db')
const Cache = require('../../../../src/lib/cache')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('../../../../src/lib/config')
const Service = require('../../../../src/models/transfer/ilpPacket')
Expand All @@ -55,6 +56,7 @@ Test('Ilp service tests', async (ilpTest) => {
assert.fail(`Connecting to database - ${err}`)
assert.end()
})
await Cache.initCache()
} catch (err) {
Logger.error(`Setup for test failed with error - ${err}`)
assert.fail(`Setup for test failed with error - ${err}`)
Expand Down Expand Up @@ -174,6 +176,7 @@ Test('Ilp service tests', async (ilpTest) => {

await ilpTest.test('teardown', async (assert) => {
try {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
assert.end()
Expand Down
Loading

0 comments on commit 92fd1d1

Please sign in to comment.