Skip to content

Commit

Permalink
fix(mojaloop/#2644): missing error-code for the transfer in the centr…
Browse files Browse the repository at this point in the history
…al-ledger db (#929)

fix(mojaloop/#2644): missing error-code for the transfer in the central-ledger db - mojaloop/project#2644
- added payer_transfer_timeout.json test-case to feature_tests/transfer_negative_scenarios
- updated dependencies
- Added work-around for Kafka Producer Client disconnect/connect and created a following story to investigate this further as I believe its an issue with the Kafka Producer Client Lib: mojaloop/project#3067
- Added a new Helper `KafkaHelper` to manage connect/disconnect in an effort to resolve the above issue by implementing re-usable helper functions
- Updated docker-compose to include kafka-init to pre-create Kafka topics to improve stability of Integration Tests
- Fixed issue with Wait Retry helper functions which did not set the Timeout correct (i.e. it was hard-coded to 2, instead of being set by the input parameter)
- Added env config (TST_RETRY_COUNT, TST_RETRY_TIMEOUT) for Integration Tests to set input params for Wait Retry helper functions being called by the `./transfers/handlers.test.js` test-cases
- Updated `test:int` to include env configs for TST_RETRY_COUNT, TST_RETRY_TIMEOUT and UV_THREADPOOL_SIZE (increased avaialable threads for NODE-RDKAFKA). This should include consistency for Integration Tests.
  • Loading branch information
mdebarros authored Dec 12, 2022
1 parent 77cb536 commit 7e49456
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 130 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ jobs:
# docker-compose build
## Lets pull only the Services needed for the Integration Test
docker-compose pull mysql kafka
docker-compose pull mysql kafka init-kafka
## Lets startup only the Services needed for the Integration Test
docker-compose up -d mysql kafka
docker-compose up -d mysql kafka init-kafka
## Check straight away to see if any containers have exited
docker-compose ps
Expand Down
21 changes: 21 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@ services:
start_period: 40s
interval: 30s

init-kafka:
container_name: cl_init_kafka
networks:
- cl-mojaloop-net
image: docker.io/bitnami/kafka:3.2
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics.sh --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-transfer-prepare --replication-factor 1 --partitions 1
kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-transfer-position --replication-factor 1 --partitions 1
kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-transfer-fulfil --replication-factor 1 --partitions 1
kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic topic-notification-event --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics.sh --bootstrap-server kafka:29092 --list
"
objstore:
image: mongo:latest
container_name: cl_objstore
Expand Down
92 changes: 46 additions & 46 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "16.3.2",
"version": "16.3.3-snapshot.1",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -48,7 +48,7 @@
"test:xunit": "npx tape 'test/unit/**/**.test.js' | tap-xunit > ./test/results/xunit.xml",
"test:coverage": "npx nyc --reporter=lcov --reporter=text-summary tapes -- 'test/unit/**/**.test.js'",
"test:coverage-check": "npm run test:coverage && nyc check-coverage",
"test:int": "npm run migrate && npx tape 'test/integration/**/*.test.js' | tap-spec",
"test:int": "npm run migrate && TST_RETRY_COUNT=20 TST_RETRY_TIMEOUT=3 UV_THREADPOOL_SIZE=12 npx tape 'test/integration/**/*.test.js' | tap-spec",
"migrate": "npm run migrate:latest && npm run seed:run",
"migrate:latest": "npx knex $npm_package_config_knex migrate:latest",
"migrate:create": "npx knex migrate:make $npm_package_config_knex",
Expand All @@ -60,6 +60,7 @@
"docker:build:api": "docker build --no-cache -f Dockerfile -t $npm_package_config_images_api:latest .",
"docker:up": "docker-compose -f docker-compose.yml up",
"docker:up:backend": "docker-compose up -d ml-api-adapter mysql mockserver kafka kowl temp_curl",
"docker:up:int": "docker compose up -d kafka init-kafka objstore mysql",
"docker:script:populateTestData": "sh ./test/util/scripts/populateTestData.sh",
"docker:stop": "docker-compose -f docker-compose.yml stop",
"docker:rm": "docker-compose -f docker-compose.yml rm -f -v",
Expand All @@ -76,7 +77,7 @@
},
"dependencies": {
"@hapi/good": "9.0.1",
"@hapi/hapi": "21.0.0",
"@hapi/hapi": "21.1.0",
"@hapi/inert": "7.0.0",
"@hapi/joi": "17.1.1",
"@hapi/vision": "7.0.0",
Expand All @@ -98,7 +99,7 @@
"catbox-memory": "4.0.1",
"commander": "9.4.1",
"cron": "2.1.0",
"decimal.js": "10.4.2",
"decimal.js": "10.4.3",
"docdash": "2.0.0",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
Expand All @@ -119,17 +120,17 @@
},
"devDependencies": {
"async-retry": "1.3.3",
"audit-ci": "^6.3.0",
"audit-ci": "^6.4.0",
"get-port": "5.1.1",
"jsdoc": "4.0.0",
"jsonpath": "1.1.1",
"nodemon": "2.0.20",
"npm-check-updates": "16.4.3",
"npm-check-updates": "16.5.6",
"nyc": "15.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"replace": "^1.2.2",
"sinon": "14.0.2",
"sinon": "15.0.0",
"standard": "17.0.0",
"standard-version": "^9.5.0",
"tap-spec": "^5.0.0",
Expand Down
32 changes: 26 additions & 6 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
const knex = await Db.getKnex()
await knex.transaction(async (trx) => {
try {
// Insert `transferTimeout` records for transfers found between the interval intervalMin <= intervalMax
await knex.from(knex.raw('transferTimeout (transferId, expirationDate)')).transacting(trx)
.insert(function () {
this.from('transfer AS t')
Expand All @@ -613,9 +614,10 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.whereNull('tt.transferId')
.whereIn('tsc.transferStateId', [`${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`, `${Enum.Transfers.TransferState.RESERVED}`])
.select('t.transferId', 't.expirationDate')
})// .toSQL().sql
// console.log('SQL: ' + q)
}) // .toSQL().sql
// console.log('SQL: ' + q1)

// Insert `transferStateChange` records for RECEIVED_PREPARE
await knex.from(knex.raw('transferStateChange (transferId, transferStateId, reason)')).transacting(trx)
.insert(function () {
this.from('transferTimeout AS tt')
Expand All @@ -629,9 +631,10 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.where('tt.expirationDate', '<', transactionTimestamp)
.andWhere('tsc.transferStateId', `${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`)
.select('tt.transferId', knex.raw('?', Enum.Transfers.TransferInternalState.EXPIRED_PREPARED), knex.raw('?', 'Aborted by Timeout Handler'))
})// .toSQL().sql
// console.log('SQL: ' + q)
}) // .toSQL().sql
// console.log('SQL: ' + q2)

// Insert `transferStateChange` records for RESERVED
await knex.from(knex.raw('transferStateChange (transferId, transferStateId, reason)')).transacting(trx)
.insert(function () {
this.from('transferTimeout AS tt')
Expand All @@ -645,8 +648,25 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.where('tt.expirationDate', '<', transactionTimestamp)
.andWhere('tsc.transferStateId', `${Enum.Transfers.TransferState.RESERVED}`)
.select('tt.transferId', knex.raw('?', Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT), knex.raw('?', 'Marked for expiration by Timeout Handler'))
})// .toSQL().sql
// console.log('SQL: ' + q)
}) // .toSQL().sql
// console.log('SQL: ' + q3)

// Insert `transferError` records
await knex.from(knex.raw('transferError (transferId, transferStateChangeId, errorCode, errorDescription)')).transacting(trx)
.insert(function () {
this.from('transferTimeout AS tt')
.innerJoin(knex('transferStateChange AS tsc1')
.select('tsc1.transferId')
.max('tsc1.transferStateChangeId AS maxTransferStateChangeId')
.innerJoin('transferTimeout AS tt1', 'tt1.transferId', 'tsc1.transferId')
.groupBy('tsc1.transferId').as('ts'), 'ts.transferId', 'tt.transferId'
)
.innerJoin('transferStateChange AS tsc', 'tsc.transferStateChangeId', 'ts.maxTransferStateChangeId')
.where('tt.expirationDate', '<', transactionTimestamp)
.andWhere('tsc.transferStateId', `${Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT}`)
.select('tt.transferId', 'tsc.transferStateChangeId', knex.raw('?', ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code), knex.raw('?', ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message))
}) // .toSQL().sql
// console.log('SQL: ' + q4)

if (segmentId === 0) {
const segment = {
Expand Down
Loading

0 comments on commit 7e49456

Please sign in to comment.