Skip to content

Commit

Permalink
chore: updates to address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mdebarros committed Dec 9, 2022
1 parent 2aff1fc commit 326545b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 57 deletions.
50 changes: 0 additions & 50 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,26 +598,7 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
const knex = await Db.getKnex()
await knex.transaction(async (trx) => {
try {
// const logger = Logger.child({
// ctx: {
// loc: 'models/transfers/facade.timeoutExpireReserved',
// segmentId,
// intervalMin,
// intervalMax
// }
// })

// logger.isDebugEnabled && logger.debug({
// intervalMin,
// intervalMax,
// transferStateId: [
// Enum.Transfers.TransferInternalState.RECEIVED_PREPARE,
// Enum.Transfers.TransferState.RESERVED
// ]
// })

// Insert `transferTimeout` records for transfers found between the interval intervalMin <= intervalMax
// const q1 = await knex.from(knex.raw('transferTimeout (transferId, expirationDate)')).transacting(trx)
await knex.from(knex.raw('transferTimeout (transferId, expirationDate)')).transacting(trx)
.insert(function () {
this.from('transfer AS t')
Expand All @@ -635,17 +616,8 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.select('t.transferId', 't.expirationDate')
}) // .toSQL().sql
// console.log('SQL: ' + q1)
// logger.isDebugEnabled && logger.debug(q1)

// logger.isDebugEnabled && logger.debug({
// expirationDate: transactionTimestamp,
// transferStateId: [
// Enum.Transfers.TransferInternalState.RECEIVED_PREPARE
// ]
// })

// Insert `transferStateChange` records for RECEIVED_PREPARE
// const q2 = await knex.from(knex.raw('transferStateChange (transferId, transferStateId, reason)')).transacting(trx)
await knex.from(knex.raw('transferStateChange (transferId, transferStateId, reason)')).transacting(trx)
.insert(function () {
this.from('transferTimeout AS tt')
Expand All @@ -661,17 +633,8 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.select('tt.transferId', knex.raw('?', Enum.Transfers.TransferInternalState.EXPIRED_PREPARED), knex.raw('?', 'Aborted by Timeout Handler'))
}) // .toSQL().sql
// console.log('SQL: ' + q2)
// logger.isDebugEnabled && logger.debug(q2)

// logger.isDebugEnabled && logger.debug({
// expirationDate: transactionTimestamp,
// transferStateId: [
// Enum.Transfers.TransferInternalState.RESERVED
// ]
// })

// Insert `transferStateChange` records for RESERVED
// const q3 = await knex.from(knex.raw('transferStateChange (transferId, transferStateId, reason)')).transacting(trx)
await knex.from(knex.raw('transferStateChange (transferId, transferStateId, reason)')).transacting(trx)
.insert(function () {
this.from('transferTimeout AS tt')
Expand All @@ -687,19 +650,8 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.select('tt.transferId', knex.raw('?', Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT), knex.raw('?', 'Marked for expiration by Timeout Handler'))
}) // .toSQL().sql
// console.log('SQL: ' + q3)
// logger.isDebugEnabled && logger.debug(q3)

// logger.isDebugEnabled && logger.debug({
// expirationDate: transactionTimestamp,
// transferStateId: [
// Enum.Transfers.TransferInternalState.RESERVED
// ],
// errorCode: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code,
// errorDescription: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message
// })

// Insert `transferError` records
// const q4 = await knex.from(knex.raw('transferError (transferId, transferStateChangeId, errorCode, errorDescription)')).transacting(trx)
await knex.from(knex.raw('transferError (transferId, transferStateChangeId, errorCode, errorDescription)')).transacting(trx)
.insert(function () {
this.from('transferTimeout AS tt')
Expand All @@ -715,7 +667,6 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.select('tt.transferId', 'tsc.transferStateChangeId', knex.raw('?', ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code), knex.raw('?', ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message))
}) // .toSQL().sql
// console.log('SQL: ' + q4)
// Logger.isDebugEnabled && Logger.debug(q4)

if (segmentId === 0) {
const segment = {
Expand All @@ -724,7 +675,6 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
tableName: 'transferStateChange',
value: intervalMax
}
// logger.isDebugEnabled && logger.debug(segment)
await knex('segment').transacting(trx).insert(segment)
} else {
await knex('segment').transacting(trx).where({ segmentId }).update({ value: intervalMax })
Expand Down
22 changes: 15 additions & 7 deletions test/integration/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1029,45 +1029,53 @@ Test('Handlers test', async handlersTest => {
// Nothing to do here...

// Act

// Re-try function with conditions
const inspectTransferState = async () => {
try {
// Fetch Transfer record
const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {}
// console.dir(transfer)

// Check Transfer for correct state
if (transfer.transferState === Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) {
// We have a Transfer with the correct state, lets check if we can get the TransferError record
try {
// Fetch the TransferError record
const transferError = await TransferService.getTransferErrorByTransferId(td.messageProtocolPrepare.content.payload.transferId)
// console.dir(transferError)
// TransferError record found, so lets return it
return {
transfer,
transferError
}
} catch (err) {
// NO TransferError record found, so lets return the transfer and the error
return {
transfer,
err
}
}
} else {
// NO Transfer with the correct state was found, so we return false
return false
}
} catch (err) {
// NO Transfer with the correct state was found, so we return false
Logger.error(err)
return false
}
}
// wait until we know the position reset, or throw after 5 tries

// wait until we inspect a transfer with the correct status, or return false if all re-try attempts have failed
const result = await wrapWithRetries(inspectTransferState, 10, 4)

// Assert
if (result === false) {
test.fail(`Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].TransferState failed to transition to ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`)
test.end()
} else {
// console.dir(result)
test.equal(result.transfer.transferState, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].TransferState = ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`)
test.equal(result.transferError.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].transferError.errorCode = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code}`)
test.equal(result.transferError.errorDescription, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].transferError.errorDescription = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message}`)
test.equal(result.transfer && result.transfer.transferState, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].TransferState = ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`)
test.equal(result.transferError && result.transferError.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].transferError.errorCode = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code}`)
test.equal(result.transferError && result.transferError.errorDescription, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message, `Transfer['${td.messageProtocolPrepare.content.payload.transferId}'].transferError.errorDescription = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message}`)
test.pass()
test.end()
}
Expand Down

0 comments on commit 326545b

Please sign in to comment.