Skip to content

Commit

Permalink
listen to new blocks + put changes in transaction (#737)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bernardo Vieira authored Jul 17, 2023
1 parent 8db54d8 commit fc33438
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 72 deletions.
172 changes: 104 additions & 68 deletions packages/core/src/subscriber/chainSubscribers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { NotificationType } from '../interfaces/app/appNotification';
import { Transaction } from 'sequelize';
import { config, contracts, database, services, utils } from '../../';
import { ethers } from 'ethers';
import { getAddress } from '@ethersproject/address';
Expand All @@ -11,7 +12,6 @@ class ChainSubscribers {
ifaceCommunityAdmin: ethers.utils.Interface;
ifaceCommunity: ethers.utils.Interface;
ifaceMicrocredit: ethers.utils.Interface;
filterTopics: string[][];
communities: Map<string, number>;

constructor(
Expand All @@ -25,18 +25,6 @@ class ChainSubscribers {
this.ifaceCommunity = new ethers.utils.Interface(contracts.CommunityABI);
this.ifaceMicrocredit = new ethers.utils.Interface(contracts.MicrocreditABI);
this.communities = communities;
this.filterTopics = [
[
ethers.utils.id(
'CommunityAdded(address,address[],uint256,uint256,uint256,uint256,uint256,uint256,uint256)'
),
ethers.utils.id('CommunityRemoved(address)'),
ethers.utils.id('BeneficiaryAdded(address,address)'),
ethers.utils.id('BeneficiaryRemoved(address,address)'),
ethers.utils.id('LoanAdded(address,uint256,uint256,uint256,uint256,uint256)'),
ethers.utils.id('ManagerChanged(address,address)')
]
];
this.recover();
}

Expand Down Expand Up @@ -90,60 +78,99 @@ class ChainSubscribers {
});

// iterate
for (let x = 0; x < logs.length; x += 1) {
// verify if cusd or community and do things
await this._filterAndProcessEvent(logs[x]);
try {
await database.sequelize.transaction(async t => {
const transactions: Promise<void>[] = [];
for (let x = 0; x < logs.length; x++) {
// verify if cusd or community and do things
transactions.push(this._filterAndProcessEvent(logs[x], t));
}
await Promise.all(transactions);
services.app.ImMetadataService.setLastBlock(logs[logs.length - 1].blockNumber);
});
} catch (error) {
// TODO: add error handling
}
utils.Logger.info('Past events recovered successfully!');
}

async _getLogs(startFromBlock: number, provider: ethers.providers.JsonRpcProvider) {
const filterTopics = [
[
ethers.utils.id(
'CommunityAdded(address,address[],uint256,uint256,uint256,uint256,uint256,uint256,uint256)'
),
ethers.utils.id('CommunityRemoved(address)'),
ethers.utils.id('BeneficiaryAdded(address,address)'),
ethers.utils.id('BeneficiaryRemoved(address,address)'),
ethers.utils.id('LoanAdded(address,uint256,uint256,uint256,uint256,uint256)'),
ethers.utils.id('ManagerChanged(address,address)')
]
];
// TODO: recover 100 blocks at a time
return provider.getLogs({
fromBlock: startFromBlock,
toBlock: 'latest',
topics: this.filterTopics
topics: filterTopics
});
}

_setupListener(provider: ethers.providers.JsonRpcProvider) {
utils.Logger.info('Starting subscribers...');
const filter = {
topics: this.filterTopics
};

database.redisClient.set('blockCount', 0);

provider.on(filter, async (log: ethers.providers.Log) => {
utils.Logger.info('Receiving new event');
await this._filterAndProcessEvent(log);
database.redisClient.set('lastBlock', log.blockNumber);
provider.on('block', async (blockNumber: number) => {
utils.Logger.info('Receiving new block');
const block = await provider.getBlock(blockNumber);
// get all the logs happening only on transactions in impactMarket contracts
const transactions: ethers.providers.Log[] = [];
for (let x = 0; x < block.transactions.length; x++) {
const transaction = await provider.getTransactionReceipt(block.transactions[x]);
if (
transaction.to === config.communityAdminAddress ||
this.communities.get(transaction.to) ||
transaction.to === config.microcreditContractAddress
) {
transactions.push(...transaction.logs);
}
}
try {
await database.sequelize.transaction(async t => {
const processingLogsPromises: Promise<void>[] = [];
for (let x = 0; x < transactions.length; x++) {
processingLogsPromises.push(this._filterAndProcessEvent(transactions[x], t));
}
await Promise.all(processingLogsPromises);
});
} catch (error) {
// TODO: add error handling
}

database.redisClient.set('lastBlock', blockNumber);
const blockCount = await database.redisClient.get('blockCount');

if (!!blockCount && blockCount > '16560') {
services.app.ImMetadataService.setLastBlock(log.blockNumber);
services.app.ImMetadataService.setLastBlock(blockNumber);
database.redisClient.set('blockCount', 0);
} else {
database.redisClient.incr('blockCount');
}
});
}

async _filterAndProcessEvent(log: ethers.providers.Log) {
let parsedLog: ethers.utils.LogDescription | undefined;
async _filterAndProcessEvent(log: ethers.providers.Log, transaction: Transaction) {
if (log.address === config.communityAdminAddress) {
await this._processCommunityAdminEvents(log);
await this._processCommunityAdminEvents(log, transaction);
} else if (this.communities.get(log.address)) {
parsedLog = await this._processCommunityEvents(log);
await this._processCommunityEvents(log, transaction);
} else if (log.address === config.microcreditContractAddress) {
await this._processMicrocreditEvents(log);
await this._processMicrocreditEvents(log, transaction);
}
return parsedLog;
}

async _processCommunityAdminEvents(log: ethers.providers.Log): Promise<ethers.utils.LogDescription | undefined> {
async _processCommunityAdminEvents(log: ethers.providers.Log, transaction: Transaction): Promise<void> {
try {
const parsedLog = this.ifaceCommunityAdmin.parseLog(log);
let result: ethers.utils.LogDescription | undefined = undefined;

if (parsedLog.name === 'CommunityRemoved') {
utils.Logger.info('Remove Community event');
Expand All @@ -163,12 +190,12 @@ class ChainSubscribers {
deletedAt: new Date()
},
{
where: { contractAddress: communityAddress }
where: { contractAddress: communityAddress },
transaction
}
);

this.communities.delete(communityAddress);
result = parsedLog;
}
} else if (parsedLog.name === 'CommunityAdded') {
utils.Logger.info('Add Community event');
Expand All @@ -185,6 +212,7 @@ class ChainSubscribers {
where: {
requestByAddress: managerAddress[0]
},
transaction,
returning: true
}
);
Expand All @@ -200,25 +228,27 @@ class ChainSubscribers {
});

if (user) {
await sendNotification([user.toJSON()], NotificationType.COMMUNITY_CREATED, true, true, {
communityId: community[1][0].id
});
await sendNotification(
[user.toJSON()],
NotificationType.COMMUNITY_CREATED,
true,
true,
{
communityId: community[1][0].id
},
transaction
);
}
}

result = parsedLog;
}

return result;
} catch (error) {
utils.Logger.error('Failed to process Community Admin Events:', error);
}
}

async _processCommunityEvents(log: ethers.providers.Log): Promise<ethers.utils.LogDescription | undefined> {
async _processCommunityEvents(log: ethers.providers.Log, transaction: Transaction): Promise<void> {
try {
const parsedLog = this.ifaceCommunity.parseLog(log);
let result: ethers.utils.LogDescription | undefined = undefined;

if (parsedLog.name === 'BeneficiaryAdded') {
utils.Logger.info('Add Beneficiary event');
Expand All @@ -238,12 +268,17 @@ class ChainSubscribers {
});

if (user) {
await sendNotification([user.toJSON()], NotificationType.BENEFICIARY_ADDED, true, true, {
communityId: community
});
await sendNotification(
[user.toJSON()],
NotificationType.BENEFICIARY_ADDED,
true,
true,
{
communityId: community
},
transaction
);
}

result = parsedLog;
} else if (parsedLog.name === 'BeneficiaryRemoved') {
utils.Logger.info('Remove Beneficiary event');

Expand All @@ -253,19 +288,15 @@ class ChainSubscribers {
if (community) {
utils.cache.cleanBeneficiaryCache(community);
}

result = parsedLog;
}
return result;
} catch (error) {
utils.Logger.error('Failed to process Community Events:', error);
}
}

async _processMicrocreditEvents(log: ethers.providers.Log): Promise<ethers.utils.LogDescription | undefined> {
async _processMicrocreditEvents(log: ethers.providers.Log, transaction: Transaction): Promise<void> {
try {
const parsedLog = this.ifaceMicrocredit.parseLog(log);
let result: ethers.utils.LogDescription | undefined = undefined;
const userAddress = parsedLog.args[0];

if (parsedLog.name === 'LoanAdded') {
Expand All @@ -279,15 +310,23 @@ class ChainSubscribers {
});

if (user) {
await models.microCreditBorrowers.create({
userId: user.id,
performance: 0,
manager: (await this.provider.getTransaction(log.transactionHash)).from
});
await sendNotification([user.toJSON()], NotificationType.LOAN_ADDED);
await models.microCreditBorrowers.create(
{
userId: user.id,
performance: 0,
manager: (await this.provider.getTransaction(log.transactionHash)).from
},
{ transaction }
);
await sendNotification(
[user.toJSON()],
NotificationType.LOAN_ADDED,
true,
true,
undefined,
transaction
);
}

result = parsedLog;
} else if (parsedLog.name === 'ManagerChanged') {
utils.Logger.info('ManagerChanged event');

Expand All @@ -306,15 +345,12 @@ class ChainSubscribers {
{
where: {
userId: user.id
}
},
transaction
}
);
}

result = parsedLog;
}

return result;
} catch (error) {
utils.Logger.error('Failed to process Microcredit Events:', error);
}
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/utils/pushNotification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import localesConfig from '../utils/locale.json';
// it needs to be imported this way, or the initialize will fail
import { AppUser } from '../interfaces/app/appUser';
import { Logger } from './logger';
import { Transaction } from 'sequelize';
import { utils } from '../../index';
import admin from 'firebase-admin';
import config from '../config';
Expand All @@ -15,7 +16,8 @@ export async function sendNotification(
type: NotificationType,
isWallet: boolean = true,
isWebApp: boolean = true,
params?: object
params: object | undefined = undefined,
transaction: Transaction | undefined = undefined
) {
try {
// registry notification
Expand All @@ -26,7 +28,8 @@ export async function sendNotification(
isWallet,
isWebApp,
params
}))
})),
{ transaction }
);

// filter users that have walletPNT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { ChainSubscribers } from '../../../src/subscriber/chainSubscribers';
import CommunityAdminContractJSON from './CommunityAdmin.json';
import cUSDContractJSON from './cUSD.json';

describe('communityAdmin', () => {
describe('communityAdmin (chainSubscribers)', () => {
let provider: ethers.providers.Web3Provider;
let subscribers: ChainSubscribers;
let accounts: string[] = [];
Expand Down Expand Up @@ -103,6 +103,7 @@ describe('communityAdmin', () => {
where: {
requestByAddress: accounts[1]
},
transaction: match.any,
returning: true
}
);
Expand Down Expand Up @@ -140,7 +141,8 @@ describe('communityAdmin', () => {
{
where: {
contractAddress
}
},
transaction: match.any
}
);
});
Expand Down

0 comments on commit fc33438

Please sign in to comment.