From 7330c3439475cb82683ab838e6e6055c746225ad Mon Sep 17 00:00:00 2001 From: Stepan Kiryakov Date: Thu, 20 Apr 2023 19:16:25 +0400 Subject: [PATCH] fix #2019 Signed-off-by: Stepan Kiryakov --- guardian-service/.env | 1 - guardian-service/.env.docker | 1 - guardian-service/.env.docker.example | 1 - guardian-service/.env.example | 1 - guardian-service/src/app.ts | 2 - .../multi-policy-service/index.ts | 2 - .../multi-policy-service/mint-service.ts | 319 ------------------ .../synchronization-service.ts | 319 ------------------ policy-service/.env | 1 + policy-service/.env.docker | 1 + policy-service/.env.docker.example | 1 + policy-service/.env.example | 1 + policy-service/src/api/policy-process.ts | 11 +- .../synchronization-service.ts | 97 +++--- 14 files changed, 64 insertions(+), 694 deletions(-) delete mode 100644 guardian-service/src/policy-engine/multi-policy-service/index.ts delete mode 100644 guardian-service/src/policy-engine/multi-policy-service/mint-service.ts delete mode 100644 guardian-service/src/policy-engine/multi-policy-service/synchronization-service.ts diff --git a/guardian-service/.env b/guardian-service/.env index 86a032e723..292347d816 100644 --- a/guardian-service/.env +++ b/guardian-service/.env @@ -13,7 +13,6 @@ INITIALIZATION_TOPIC_ID="0.0.2030" MESSAGE_LANG="en-US" TRANSACTION_LOG_LEVEL="1" SEND_KEYS_TO_VAULT="TRUE" -MULTI_POLICY_SCHEDULER="0 0 * * *" CONTRACT_FILE_ID="0.0.6276" BBS_SIGNATURES_MODE="WASM" MQ_MAX_PAYLOAD="1048576" diff --git a/guardian-service/.env.docker b/guardian-service/.env.docker index f440f0718a..e4290dd61b 100644 --- a/guardian-service/.env.docker +++ b/guardian-service/.env.docker @@ -12,7 +12,6 @@ INITIALIZATION_TOPIC_ID="0.0.2030" MESSAGE_LANG="en-US" TRANSACTION_LOG_LEVEL="1" SEND_KEYS_TO_VAULT="TRUE" -MULTI_POLICY_SCHEDULER="0 0 * * *" CONTRACT_FILE_ID="0.0.6276" BBS_SIGNATURES_MODE="WASM" MQ_MAX_PAYLOAD="1048576" diff --git a/guardian-service/.env.docker.example b/guardian-service/.env.docker.example index a9fbd49fa7..865fdcfba9 100644 --- a/guardian-service/.env.docker.example +++ b/guardian-service/.env.docker.example @@ -12,7 +12,6 @@ INITIALIZATION_TOPIC_ID="0.0.2030" MESSAGE_LANG="en-US" TRANSACTION_LOG_LEVEL="1" SEND_KEYS_TO_VAULT="TRUE" -MULTI_POLICY_SCHEDULER="0 0 * * *" CONTRACT_FILE_ID="0.0.6276" BBS_SIGNATURES_MODE="WASM" #LOG_LEVEL="2" diff --git a/guardian-service/.env.example b/guardian-service/.env.example index d842d7b585..7e647eee5e 100644 --- a/guardian-service/.env.example +++ b/guardian-service/.env.example @@ -14,7 +14,6 @@ HEDERA_NET="testnet" MESSAGE_LANG="en-US" TRANSACTION_LOG_LEVEL="1" SEND_KEYS_TO_VAULT="TRUE" -MULTI_POLICY_SCHEDULER="0 0 * * *" CONTRACT_FILE_ID="0.0.6276" BBS_SIGNATURES_MODE="WASM" #LOG_LEVEL="2" diff --git a/guardian-service/src/app.ts b/guardian-service/src/app.ts index 140fa49e3b..294dbfcc0c 100644 --- a/guardian-service/src/app.ts +++ b/guardian-service/src/app.ts @@ -42,7 +42,6 @@ import { MongoDriver } from '@mikro-orm/mongodb'; import { ipfsAPI } from '@api/ipfs.service'; import { artifactAPI } from '@api/artifact.service'; import { sendKeysToVault } from '@helpers/send-keys-to-vault'; -import { SynchronizationService } from '@policy-engine/multi-policy-service'; import { contractAPI } from '@api/contract.service'; import { analyticsAPI } from '@api/analytics.service'; import { PolicyServiceChannelsContainer } from '@helpers/policy-service-channels-container'; @@ -248,7 +247,6 @@ Promise.all([ await policyService.init(); policyService.registerListeners(); await policyEngine.init(); - SynchronizationService.start(); } catch (error) { console.error(error.message); process.exit(0); diff --git a/guardian-service/src/policy-engine/multi-policy-service/index.ts b/guardian-service/src/policy-engine/multi-policy-service/index.ts deleted file mode 100644 index b4997aeb71..0000000000 --- a/guardian-service/src/policy-engine/multi-policy-service/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { SynchronizationService } from './synchronization-service'; -export { MintService } from './mint-service'; \ No newline at end of file diff --git a/guardian-service/src/policy-engine/multi-policy-service/mint-service.ts b/guardian-service/src/policy-engine/multi-policy-service/mint-service.ts deleted file mode 100644 index 07cddf85ee..0000000000 --- a/guardian-service/src/policy-engine/multi-policy-service/mint-service.ts +++ /dev/null @@ -1,319 +0,0 @@ -import { - ExternalMessageEvents, IRootConfig, - WorkerTaskType -} from '@guardian/interfaces'; -import { ExternalEventChannel, Logger, Token, KeyType, Wallet, Workers} from '@guardian/common'; - -/** - * Token Config - */ -interface TokenConfig { - /** - * Treasury Account Id - */ - treasuryId: any; - /** - * Token ID - */ - tokenId: any; - /** - * Supply Key - */ - supplyKey: string; - /** - * Treasury Account Key - */ - treasuryKey: string; -} - -/** - * Mint Service - */ -export class MintService { - /** - * Size of mint NFT batch - */ - public static readonly BATCH_NFT_MINT_SIZE = - Math.floor(Math.abs(+process.env.BATCH_NFT_MINT_SIZE)) || 10; - - /** - * Wallet service - */ - private static readonly wallet = new Wallet(); - /** - * Logger service - */ - private static readonly logger = new Logger(); - - /** - * Get error message - * @param error - */ - private static getErrorMessage(error: string | Error | any): string { - if (typeof error === 'string') { - return error; - } else if (error.message) { - return error.message; - } else if (error.error) { - return error.error; - } else if (error.name) { - return error.name; - } else { - console.error(error); - return 'Unidentified error'; - } - } - - /** - * Split chunk - * @param array - * @param chunk - */ - private static splitChunk(array: T[], chunk: number): T[][] { - const res: T[][] = []; - let i: number; - let j: number; - for (i = 0, j = array.length; i < j; i += chunk) { - res.push(array.slice(i, i + chunk)); - } - return res; - } - - /** - * Mint Non Fungible Tokens - * @param token - * @param tokenValue - * @param root - * @param targetAccount - * @param uuid - * @param transactionMemo - * @param ref - */ - private static async mintNonFungibleTokens( - token: TokenConfig, - tokenValue: number, - root: IRootConfig, - targetAccount: string, - uuid: string, - transactionMemo: string - ) { - const mintNFT = (metaData) => - workers.addRetryableTask( - { - type: WorkerTaskType.MINT_NFT, - data: { - hederaAccountId: root.hederaAccountId, - hederaAccountKey: root.hederaAccountKey, - dryRun: false, - tokenId: token.tokenId, - supplyKey: token.supplyKey, - metaData, - transactionMemo, - }, - }, - 1, 10 - ); - const transferNFT = (serials) => - { - MintService.logger.debug( - `Transfer ${token?.tokenId} serials: ${JSON.stringify(serials)}`, - ['GUARDIAN_SERVICE', mintId.toString()] - ); - return workers.addRetryableTask( - { - type: WorkerTaskType.TRANSFER_NFT, - data: { - hederaAccountId: - root.hederaAccountId, - hederaAccountKey: - root.hederaAccountKey, - dryRun: false, - tokenId: token.tokenId, - targetAccount, - treasuryId: token.treasuryId, - treasuryKey: token.treasuryKey, - element: serials, - transactionMemo, - }, - }, - 1, 10 - ); - }; - const mintAndTransferNFT = (metaData) => - mintNFT(metaData).then(transferNFT); - const mintId = Date.now(); - MintService.log(`Mint(${mintId}): Start (Count: ${tokenValue})`); - - const workers = new Workers(); - const data = new Array(Math.floor(tokenValue)); - data.fill(uuid); - const dataChunks = MintService.splitChunk(data, 10); - const tasks = MintService.splitChunk( - dataChunks, - MintService.BATCH_NFT_MINT_SIZE - ); - for (let i = 0; i < tasks.length; i++) { - const dataChunk = tasks[i]; - MintService.log( - `Mint(${mintId}): Minting and transferring (Chunk: ${ - i * MintService.BATCH_NFT_MINT_SIZE + 1 - }/${tasks.length * MintService.BATCH_NFT_MINT_SIZE})` - ); - try { - await Promise.all(dataChunk.map(mintAndTransferNFT)); - } catch (error) { - MintService.error( - `Mint(${mintId}): Error (${MintService.getErrorMessage(error)})`, - ); - throw error; - } - } - - MintService.log( - `Mint(${mintId}): Minted (Count: ${Math.floor(tokenValue)})` - ); - MintService.log( - `Mint(${mintId}): Transferred ${token.treasuryId} -> ${targetAccount} ` - ); - MintService.log(`Mint(${mintId}): End`); - } - - /** - * Mint Fungible Tokens - * @param token - * @param tokenValue - * @param root - * @param targetAccount - * @param uuid - * @param transactionMemo - * @param ref - */ - private static async mintFungibleTokens( - token: TokenConfig, - tokenValue: number, - root: IRootConfig, - targetAccount: string, - uuid: string, - transactionMemo: string - ) { - const mintId = Date.now(); - MintService.log(`Mint(${mintId}): Start (Count: ${tokenValue})`); - - try { - const workers = new Workers(); - await workers.addRetryableTask({ - type: WorkerTaskType.MINT_FT, - data: { - hederaAccountId: root.hederaAccountId, - hederaAccountKey: root.hederaAccountKey, - dryRun: false, - tokenId: token.tokenId, - supplyKey: token.supplyKey, - tokenValue, - transactionMemo - } - }, 10); - await workers.addRetryableTask({ - type: WorkerTaskType.TRANSFER_FT, - data: { - hederaAccountId: root.hederaAccountId, - hederaAccountKey: root.hederaAccountKey, - dryRun: false, - tokenId: token.tokenId, - targetAccount, - treasuryId: token.treasuryId, - treasuryKey: token.treasuryKey, - tokenValue, - transactionMemo - } - }, 10); - } catch (error) { - MintService.error(`Mint FT(${mintId}): Mint/Transfer Error (${MintService.getErrorMessage(error)})`); - } - - MintService.log(`Mint(${mintId}): End`); - } - - /** - * Mint - * @param ref - * @param token - * @param tokenValue - * @param documentOwner - * @param root - * @param targetAccount - * @param uuid - */ - public static async multiMint( - root: IRootConfig, - token: Token, - tokenValue: number, - targetAccount: string, - ids: string[] - ): Promise { - const memo = ids.join(','); - const tokenConfig: TokenConfig = { - treasuryId: token.adminId, - tokenId: token.tokenId, - supplyKey: null, - treasuryKey: null - } - const [treasuryKey, supplyKey] = await Promise.all([ - MintService.wallet.getUserKey( - token.owner, - KeyType.TOKEN_TREASURY_KEY, - token.tokenId - ), - MintService.wallet.getUserKey( - token.owner, - KeyType.TOKEN_SUPPLY_KEY, - token.tokenId - ), - ]); - tokenConfig.supplyKey = supplyKey; - tokenConfig.treasuryKey = treasuryKey; - - if (token.tokenType === 'non-fungible') { - await MintService.mintNonFungibleTokens( - tokenConfig, tokenValue, root, targetAccount, memo, memo - ); - } else { - await MintService.mintFungibleTokens( - tokenConfig, tokenValue, root, targetAccount, memo, memo - ); - } - - new ExternalEventChannel().publishMessage( - ExternalMessageEvents.TOKEN_MINTED, - { - tokenId: token.tokenId, - tokenValue, - memo - } - ); - } - - /** - * Write log message - * @param message - */ - public static log(message: string) { - MintService.logger.info(message, ['GUARDIAN_SERVICE']); - } - - /** - * Write error message - * @param message - */ - public static error(message: string) { - MintService.logger.error(message, ['GUARDIAN_SERVICE']); - } - - /** - * Write warn message - * @param message - */ - public static warn(message: string) { - MintService.logger.warn(message, ['GUARDIAN_SERVICE']); - } -} diff --git a/guardian-service/src/policy-engine/multi-policy-service/synchronization-service.ts b/guardian-service/src/policy-engine/multi-policy-service/synchronization-service.ts deleted file mode 100644 index bf4eda2a0b..0000000000 --- a/guardian-service/src/policy-engine/multi-policy-service/synchronization-service.ts +++ /dev/null @@ -1,319 +0,0 @@ -import { - IRootConfig, - PolicyType, - WorkerTaskType -} from '@guardian/interfaces'; -import { CronJob } from 'cron'; -import { MintService } from './mint-service'; -import { - Logger, - Policy, - Token, - MultiPolicyTransaction, - DatabaseServer, - Users, - Workers, - MessageAction, - MessageServer, - SynchronizationMessage, - TopicConfig, -} from '@guardian/common'; - -/** - * Synchronization Service - */ -export class SynchronizationService { - /** - * Cron job - * @private - */ - private static job: CronJob; - /** - * Cron Mask - * @private - */ - private static readonly cronMask = `0 0 * * *`; - /** - * Task Status - * @private - */ - private static taskStatus: boolean = false; - /** - * Users service - * @private - */ - private static readonly users = new Users(); - - /** - * Start scheduler - */ - public static start() { - if (SynchronizationService.job) { - return; - } - const cronMask = process.env.MULTI_POLICY_SCHEDULER || SynchronizationService.cronMask; - SynchronizationService.taskStatus = false; - SynchronizationService.job = new CronJob(cronMask, () => { - SynchronizationService.task().then(); - }, null, false, 'UTC'); - SynchronizationService.job.start(); - new Logger().info(`Start synchronization: ${cronMask}`, ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); - } - - /** - * Stop scheduler - */ - public static stop() { - if (SynchronizationService.job) { - SynchronizationService.job.stop(); - SynchronizationService.job = null; - SynchronizationService.taskStatus = false; - } - } - - /** - * Tick - * @private - */ - private static async task() { - try { - if (SynchronizationService.taskStatus) { - return; - } - new Logger().info('Start synchronization task', ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); - - SynchronizationService.taskStatus = true; - - const policies = await DatabaseServer.getPolicies({ - where: { - status: PolicyType.PUBLISH, - synchronizationTopicId: { $exists: true } - } - }, { - fields: ['id', 'owner', 'instanceTopicId', 'synchronizationTopicId'] - }); - - const tasks: any[] = []; - for (const policy of policies) { - tasks.push(SynchronizationService.taskByPolicy(policy)); - } - await Promise.all(tasks); - - SynchronizationService.taskStatus = false; - - new Logger().info('Complete synchronization task', ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); - } catch (error) { - SynchronizationService.taskStatus = false; - console.error(error); - new Logger().error(error, ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); - } - } - - /** - * Group by policy - * @param policy - * @private - */ - private static async taskByPolicy(policy: Policy) { - try { - const root = await SynchronizationService.users.getHederaAccount(policy.owner); - const count = await DatabaseServer.countMultiPolicyTransactions(policy.id); - if (!count) { - return; - } - - const topic = new TopicConfig({ topicId: policy.synchronizationTopicId }, null, null); - const messageServer = new MessageServer(root.hederaAccountId, root.hederaAccountKey).setTopicObject(topic); - - const workers = new Workers(); - const messages = await workers.addRetryableTask({ - type: WorkerTaskType.GET_TOPIC_MESSAGES, - data: { - operatorId: null, - operatorKey: null, - dryRun: false, - topic: policy.synchronizationTopicId - } - }, 10); - - const policyMap: { [x: string]: SynchronizationMessage[] } = {}; - const vpMap: { [x: string]: Map } = {}; - for (const message of messages) { - try { - const synchronizationMessage = SynchronizationMessage.fromMessage(message.message); - const user = synchronizationMessage.user; - if (synchronizationMessage.action === MessageAction.CreateMultiPolicy) { - if (synchronizationMessage.user !== message.payer_account_id) { - continue; - } - if (!policyMap[user]) { - policyMap[user] = []; - } - policyMap[user].push(synchronizationMessage); - } else if (synchronizationMessage.action === MessageAction.Mint) { - if (synchronizationMessage.policyOwner !== message.payer_account_id) { - continue; - } - if (!vpMap[user]) { - vpMap[user] = new Map(); - } - const amount = parseFloat(synchronizationMessage.amount); - if (isFinite(amount) && amount < 1) { - vpMap[user].delete(synchronizationMessage.getMessageId()); - } else { - vpMap[user].set(synchronizationMessage.getMessageId(), synchronizationMessage); - } - } - } catch (error) { - console.log(`${message?.id}: ${error}`); - } - } - const users = Object.keys(policyMap); - const tasks: any[] = []; - for (const user of users) { - tasks.push(SynchronizationService.taskByUser( - messageServer, root, policy, user, policyMap[user], vpMap[user]) - ); - } - await Promise.all(tasks); - } catch (error) { - console.error(error); - new Logger().error(error, ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); - } - } - - /** - * Group by user - * @param messageServer - * @param root - * @param policy - * @param user - * @param policies - * @param vps - * @private - */ - private static async taskByUser( - messageServer: MessageServer, - root: IRootConfig, - policy: Policy, - user: string, - policies: SynchronizationMessage[], - vps: Map - ) { - if (!vps) { - return; - } - const vpMap: { [x: string]: SynchronizationMessage[] } = {}; - const vpCountMap: { [x: string]: number } = {}; - const policyOwnerMap: { [x: string]: string } = {}; - for (const p of policies) { - vpMap[p.policy] = []; - vpCountMap[p.policy] = 0; - policyOwnerMap[p.policy] = p.policyOwner; - } - for (const vp of vps.values()) { - const policyId = vp.policy; - if (vpMap[policyId] && policyOwnerMap[policyId] === vp.policyOwner) { - vpMap[policyId].push(vp); - vpCountMap[policyId] += vp.amount; - } - } - let min = Infinity; - for (const p of policies) { - min = Math.min(min, vpCountMap[p.policy]); - } - - const transactions = await DatabaseServer.getMultiPolicyTransactions(policy.id, user); - for (const transaction of transactions) { - if (transaction.amount <= min) { - const token = await DatabaseServer.getToken(transaction.tokenId); - const status = await SynchronizationService.completeTransaction( - messageServer, root, token, transaction, policies, vpMap - ); - if (status) { - min -= transaction.amount; - } - } - } - } - - /** - * Complete Transaction - * @param messageServer - * @param root - * @param token - * @param transaction - * @param policies - * @param vpMap - * @private - */ - private static async completeTransaction( - messageServer: MessageServer, - root: IRootConfig, - token: Token, - transaction: MultiPolicyTransaction, - policies: SynchronizationMessage[], - vpMap: { [x: string]: SynchronizationMessage[] } - ): Promise { - try { - if (!token) { - throw new Error('Bad token id'); - } - const messagesIDs: string[] = []; - const updateMessages: SynchronizationMessage[] = []; - for (const p of policies) { - let amount = transaction.amount; - let i = 0; - const count = vpMap[p.policy].length; - while (i < count && amount > 0) { - const m = vpMap[p.policy][i]; - if (m.amount > 0) { - if (amount > m.amount) { - updateMessages.push(m); - messagesIDs.push(m.messageId); - amount -= m.amount; - m.amount = 0; - } else { - updateMessages.push(m); - messagesIDs.push(m.messageId); - m.amount -= amount; - amount = 0; - } - } - i++; - } - } - await SynchronizationService.updateMessages(messageServer, updateMessages); - await MintService.multiMint( - root, - token, - transaction.amount, - transaction.target, - messagesIDs - ); - transaction.status = 'Completed'; - await DatabaseServer.updateMultiPolicyTransactions(transaction); - return true; - } catch (error) { - transaction.status = 'Failed'; - await DatabaseServer.updateMultiPolicyTransactions(transaction); - return false; - } - } - - /** - * Update Messages - * @param messageServer - * @param updateMessages - * @private - */ - private static async updateMessages( - messageServer: MessageServer, - updateMessages: SynchronizationMessage[] - ): Promise { - for (const message of updateMessages) { - await messageServer.sendMessage(message); - } - return true; - } -} diff --git a/policy-service/.env b/policy-service/.env index 43673c128c..f72c304531 100644 --- a/policy-service/.env +++ b/policy-service/.env @@ -8,6 +8,7 @@ DB_DATABASE="guardian_db" MESSAGE_LANG="en-US" BBS_SIGNATURES_MODE="WASM" MQ_MAX_PAYLOAD="1048576" +MULTI_POLICY_SCHEDULER="0 0 * * *" #LOG_LEVEL="2" #HEDERA_CUSTOM_NODES={"0.testnet.hedera.com:50211":"0.0.3"} #HEDERA_CUSTOM_MIRROR_NODES=["testnet.mirrornode.hedera.com:443"] diff --git a/policy-service/.env.docker b/policy-service/.env.docker index 8407eac40a..f56f48e3da 100644 --- a/policy-service/.env.docker +++ b/policy-service/.env.docker @@ -7,6 +7,7 @@ DB_DATABASE="guardian_db" MESSAGE_LANG="en-US" BBS_SIGNATURES_MODE="WASM" MQ_MAX_PAYLOAD="1048576" +MULTI_POLICY_SCHEDULER="0 0 * * *" #LOG_LEVEL="2" #HEDERA_CUSTOM_NODES={"0.testnet.hedera.com:50211":"0.0.3"} #HEDERA_CUSTOM_MIRROR_NODES=["testnet.mirrornode.hedera.com:443"] diff --git a/policy-service/.env.docker.example b/policy-service/.env.docker.example index 90ef13d3cb..54561f277d 100644 --- a/policy-service/.env.docker.example +++ b/policy-service/.env.docker.example @@ -4,6 +4,7 @@ DB_HOST="mongo" DB_DATABASE="guardian_db" HEDERA_NET="testnet" MESSAGE_LANG="en-US" +MULTI_POLICY_SCHEDULER="0 0 * * *" #LOG_LEVEL="2" #MQ_MESSAGE_CHUNK=5000000 diff --git a/policy-service/.env.example b/policy-service/.env.example index 7a098864a0..f38c4825a7 100644 --- a/policy-service/.env.example +++ b/policy-service/.env.example @@ -5,6 +5,7 @@ DB_DATABASE="guardian_db" HEDERA_NET="testnet" MESSAGE_LANG="en-US" LOG_LEVEL="2" +MULTI_POLICY_SCHEDULER="0 0 * * *" #MQ_MESSAGE_CHUNK=5000000 # Vault Secret Manager Configs diff --git a/policy-service/src/api/policy-process.ts b/policy-service/src/api/policy-process.ts index 6336f4147b..a33dad8515 100644 --- a/policy-service/src/api/policy-process.ts +++ b/policy-service/src/api/policy-process.ts @@ -19,9 +19,10 @@ import { BlockTreeGenerator } from '@policy-engine/block-tree-generator'; import { PolicyValidator } from '@policy-engine/block-validators'; import process from 'process'; import { CommonVariables } from '@helpers/common-variables'; -import { PolicyEvents } from '@guardian/interfaces'; +import { PolicyEvents, PolicyType } from '@guardian/interfaces'; import { GridFSBucket } from 'mongodb'; import { OldSecretManager } from '@guardian/common/dist/secret-manager/old-style/old-secret-manager'; +import { SynchronizationService } from '@policy-engine/multi-policy-service'; const { policy, @@ -72,7 +73,7 @@ Promise.all([ } catch (error) { await new Logger().warn( 'HEDERA_CUSTOM_MIRROR_NODES field in settings: ' + - error.message, + error.message, ['POLICY', policy.name, policyId.toString()] ); console.warn(error); @@ -101,7 +102,11 @@ Promise.all([ await generator.generate(policyConfig, skipRegistration, policyValidator); + const synchronizationService = new SynchronizationService(policyConfig); + synchronizationService.start(); + generator.getPolicyMessages(PolicyEvents.DELETE_POLICY, policyId, () => { + synchronizationService.stop(); process.exit(0); }); @@ -109,9 +114,11 @@ Promise.all([ policyId: policyId.toString(), data: policyValidator.getSerializedErrors() }); + const maxPayload = parseInt(process.env.MQ_MAX_PAYLOAD, 10); if (Number.isInteger(maxPayload)) { new LargePayloadContainer().runServer(); } + new Logger().info('Start policy', ['POLICY', policy.name, policyId.toString()]); }); diff --git a/policy-service/src/policy-engine/multi-policy-service/synchronization-service.ts b/policy-service/src/policy-engine/multi-policy-service/synchronization-service.ts index 6ba4012baa..153eaad44c 100644 --- a/policy-service/src/policy-engine/multi-policy-service/synchronization-service.ts +++ b/policy-service/src/policy-engine/multi-policy-service/synchronization-service.ts @@ -27,47 +27,63 @@ export class SynchronizationService { * Cron job * @private */ - private static job: CronJob; + private job: CronJob; /** * Cron Mask * @private */ - private static readonly cronMask = `0 0 * * *`; + private readonly cronMask = `0 0 * * *`; /** * Task Status * @private */ - private static taskStatus: boolean = false; + private taskStatus: boolean = false; /** * Users service * @private */ - private static readonly users = new Users(); + private readonly users = new Users(); + /** + * Policy + * @private + */ + private readonly policy: Policy; + + constructor(policy: Policy) { + this.policy = policy; + } /** * Start scheduler */ - public static start() { - if (SynchronizationService.job) { - return; + public start(): boolean { + if ( + this.policy.status !== PolicyType.PUBLISH || + !this.policy.synchronizationTopicId + ) { + return false; + } + if (this.job) { + return true; } - const cronMask = process.env.MULTI_POLICY_SCHEDULER || SynchronizationService.cronMask; - SynchronizationService.taskStatus = false; - SynchronizationService.job = new CronJob(cronMask, () => { - SynchronizationService.task().then(); + const cronMask = process.env.MULTI_POLICY_SCHEDULER || this.cronMask; + this.taskStatus = false; + this.job = new CronJob(cronMask, () => { + this.task().then(); }, null, false, 'UTC'); - SynchronizationService.job.start(); + this.job.start(); new Logger().info(`Start synchronization: ${cronMask}`, ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); + return true; } /** * Stop scheduler */ - public static stop() { - if (SynchronizationService.job) { - SynchronizationService.job.stop(); - SynchronizationService.job = null; - SynchronizationService.taskStatus = false; + public stop() { + if (this.job) { + this.job.stop(); + this.job = null; + this.taskStatus = false; } } @@ -75,35 +91,20 @@ export class SynchronizationService { * Tick * @private */ - private static async task() { + private async task() { try { - if (SynchronizationService.taskStatus) { + if (this.taskStatus) { return; } new Logger().info('Start synchronization task', ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); - SynchronizationService.taskStatus = true; - - const policies = await DatabaseServer.getPolicies({ - where: { - status: PolicyType.PUBLISH, - synchronizationTopicId: { $exists: true } - } - }, { - fields: ['id', 'owner', 'instanceTopicId', 'synchronizationTopicId'] - }); - - const tasks: any[] = []; - for (const policy of policies) { - tasks.push(SynchronizationService.taskByPolicy(policy)); - } - await Promise.all(tasks); - - SynchronizationService.taskStatus = false; + this.taskStatus = true; + await this.taskByPolicy(this.policy); + this.taskStatus = false; new Logger().info('Complete synchronization task', ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); } catch (error) { - SynchronizationService.taskStatus = false; + this.taskStatus = false; console.error(error); new Logger().error(error, ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); } @@ -114,10 +115,11 @@ export class SynchronizationService { * @param policy * @private */ - private static async taskByPolicy(policy: Policy) { + private async taskByPolicy(policy: Policy) { try { - const root = await SynchronizationService.users.getHederaAccount(policy.owner); + const root = await this.users.getHederaAccount(policy.owner); const count = await DatabaseServer.countMultiPolicyTransactions(policy.id); + if (!count) { return; } @@ -168,10 +170,11 @@ export class SynchronizationService { console.log(`${message?.id}: ${error}`); } } + const users = Object.keys(policyMap); const tasks: any[] = []; for (const user of users) { - tasks.push(SynchronizationService.taskByUser( + tasks.push(this.taskByUser( messageServer, root, policy, user, policyMap[user], vpMap[user]) ); } @@ -192,7 +195,7 @@ export class SynchronizationService { * @param vps * @private */ - private static async taskByUser( + private async taskByUser( messageServer: MessageServer, root: IRootConfig, policy: Policy, @@ -227,7 +230,7 @@ export class SynchronizationService { for (const transaction of transactions) { if (transaction.amount <= min) { const token = await DatabaseServer.getToken(transaction.tokenId); - const status = await SynchronizationService.completeTransaction( + const status = await this.completeTransaction( messageServer, root, token, transaction, policies, vpMap ); if (status) { @@ -247,7 +250,7 @@ export class SynchronizationService { * @param vpMap * @private */ - private static async completeTransaction( + private async completeTransaction( messageServer: MessageServer, root: IRootConfig, token: Token, @@ -283,7 +286,7 @@ export class SynchronizationService { i++; } } - await SynchronizationService.updateMessages(messageServer, updateMessages); + await this.updateMessages(messageServer, updateMessages); await MintService.multiMint( root, token, @@ -296,6 +299,8 @@ export class SynchronizationService { return true; } catch (error) { transaction.status = 'Failed'; + console.error(error); + new Logger().error(error, ['GUARDIAN_SERVICE', 'SYNCHRONIZATION_SERVICE']); await DatabaseServer.updateMultiPolicyTransactions(transaction); return false; } @@ -307,7 +312,7 @@ export class SynchronizationService { * @param updateMessages * @private */ - private static async updateMessages( + private async updateMessages( messageServer: MessageServer, updateMessages: SynchronizationMessage[] ): Promise {