Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

listen to new blocks + put changes in transaction #737

Merged
merged 3 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 104 additions & 68 deletions packages/core/src/subscriber/chainSubscribers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { NotificationType } from '../interfaces/app/appNotification';
import { Transaction } from 'sequelize';
import { config, contracts, database, services, utils } from '../../';
import { ethers } from 'ethers';
import { getAddress } from '@ethersproject/address';
Expand All @@ -11,7 +12,6 @@ class ChainSubscribers {
ifaceCommunityAdmin: ethers.utils.Interface;
ifaceCommunity: ethers.utils.Interface;
ifaceMicrocredit: ethers.utils.Interface;
filterTopics: string[][];
communities: Map<string, number>;

constructor(
Expand All @@ -25,18 +25,6 @@ class ChainSubscribers {
this.ifaceCommunity = new ethers.utils.Interface(contracts.CommunityABI);
this.ifaceMicrocredit = new ethers.utils.Interface(contracts.MicrocreditABI);
this.communities = communities;
this.filterTopics = [
[
ethers.utils.id(
'CommunityAdded(address,address[],uint256,uint256,uint256,uint256,uint256,uint256,uint256)'
),
ethers.utils.id('CommunityRemoved(address)'),
ethers.utils.id('BeneficiaryAdded(address,address)'),
ethers.utils.id('BeneficiaryRemoved(address,address)'),
ethers.utils.id('LoanAdded(address,uint256,uint256,uint256,uint256,uint256)'),
ethers.utils.id('ManagerChanged(address,address)')
]
];
this.recover();
}

Expand Down Expand Up @@ -90,60 +78,99 @@ class ChainSubscribers {
});

// iterate
for (let x = 0; x < logs.length; x += 1) {
// verify if cusd or community and do things
await this._filterAndProcessEvent(logs[x]);
try {
await database.sequelize.transaction(async t => {
const transactions: Promise<void>[] = [];
for (let x = 0; x < logs.length; x++) {
// verify if cusd or community and do things
transactions.push(this._filterAndProcessEvent(logs[x], t));
}
await Promise.all(transactions);
services.app.ImMetadataService.setLastBlock(logs[logs.length - 1].blockNumber);
});
} catch (error) {
// TODO: add error handling
}
utils.Logger.info('Past events recovered successfully!');
}

async _getLogs(startFromBlock: number, provider: ethers.providers.JsonRpcProvider) {
const filterTopics = [
[
ethers.utils.id(
'CommunityAdded(address,address[],uint256,uint256,uint256,uint256,uint256,uint256,uint256)'
),
ethers.utils.id('CommunityRemoved(address)'),
ethers.utils.id('BeneficiaryAdded(address,address)'),
ethers.utils.id('BeneficiaryRemoved(address,address)'),
ethers.utils.id('LoanAdded(address,uint256,uint256,uint256,uint256,uint256)'),
ethers.utils.id('ManagerChanged(address,address)')
]
];
// TODO: recover 100 blocks at a time
return provider.getLogs({
fromBlock: startFromBlock,
toBlock: 'latest',
topics: this.filterTopics
topics: filterTopics
});
}

_setupListener(provider: ethers.providers.JsonRpcProvider) {
utils.Logger.info('Starting subscribers...');
const filter = {
topics: this.filterTopics
};

database.redisClient.set('blockCount', 0);

provider.on(filter, async (log: ethers.providers.Log) => {
utils.Logger.info('Receiving new event');
await this._filterAndProcessEvent(log);
database.redisClient.set('lastBlock', log.blockNumber);
provider.on('block', async (blockNumber: number) => {
utils.Logger.info('Receiving new block');
const block = await provider.getBlock(blockNumber);
// get all the logs happening only on transactions in impactMarket contracts
const transactions: ethers.providers.Log[] = [];
for (let x = 0; x < block.transactions.length; x++) {
const transaction = await provider.getTransactionReceipt(block.transactions[x]);
if (
transaction.to === config.communityAdminAddress ||
this.communities.get(transaction.to) ||
transaction.to === config.microcreditContractAddress
) {
transactions.push(...transaction.logs);
}
}
try {
await database.sequelize.transaction(async t => {
const processingLogsPromises: Promise<void>[] = [];
for (let x = 0; x < transactions.length; x++) {
processingLogsPromises.push(this._filterAndProcessEvent(transactions[x], t));
}
await Promise.all(processingLogsPromises);
});
} catch (error) {
// TODO: add error handling
}

database.redisClient.set('lastBlock', blockNumber);
const blockCount = await database.redisClient.get('blockCount');

if (!!blockCount && blockCount > '16560') {
services.app.ImMetadataService.setLastBlock(log.blockNumber);
services.app.ImMetadataService.setLastBlock(blockNumber);
database.redisClient.set('blockCount', 0);
} else {
database.redisClient.incr('blockCount');
}
});
}

async _filterAndProcessEvent(log: ethers.providers.Log) {
let parsedLog: ethers.utils.LogDescription | undefined;
async _filterAndProcessEvent(log: ethers.providers.Log, transaction: Transaction) {
if (log.address === config.communityAdminAddress) {
await this._processCommunityAdminEvents(log);
await this._processCommunityAdminEvents(log, transaction);
} else if (this.communities.get(log.address)) {
parsedLog = await this._processCommunityEvents(log);
await this._processCommunityEvents(log, transaction);
} else if (log.address === config.microcreditContractAddress) {
await this._processMicrocreditEvents(log);
await this._processMicrocreditEvents(log, transaction);
}
return parsedLog;
}

async _processCommunityAdminEvents(log: ethers.providers.Log): Promise<ethers.utils.LogDescription | undefined> {
async _processCommunityAdminEvents(log: ethers.providers.Log, transaction: Transaction): Promise<void> {
try {
const parsedLog = this.ifaceCommunityAdmin.parseLog(log);
let result: ethers.utils.LogDescription | undefined = undefined;

if (parsedLog.name === 'CommunityRemoved') {
utils.Logger.info('Remove Community event');
Expand All @@ -163,12 +190,12 @@ class ChainSubscribers {
deletedAt: new Date()
},
{
where: { contractAddress: communityAddress }
where: { contractAddress: communityAddress },
transaction
}
);

this.communities.delete(communityAddress);
result = parsedLog;
}
} else if (parsedLog.name === 'CommunityAdded') {
utils.Logger.info('Add Community event');
Expand All @@ -185,6 +212,7 @@ class ChainSubscribers {
where: {
requestByAddress: managerAddress[0]
},
transaction,
returning: true
}
);
Expand All @@ -200,25 +228,27 @@ class ChainSubscribers {
});

if (user) {
await sendNotification([user.toJSON()], NotificationType.COMMUNITY_CREATED, true, true, {
communityId: community[1][0].id
});
await sendNotification(
[user.toJSON()],
NotificationType.COMMUNITY_CREATED,
true,
true,
{
communityId: community[1][0].id
},
transaction
);
}
}

result = parsedLog;
}

return result;
} catch (error) {
utils.Logger.error('Failed to process Community Admin Events:', error);
}
}

async _processCommunityEvents(log: ethers.providers.Log): Promise<ethers.utils.LogDescription | undefined> {
async _processCommunityEvents(log: ethers.providers.Log, transaction: Transaction): Promise<void> {
try {
const parsedLog = this.ifaceCommunity.parseLog(log);
let result: ethers.utils.LogDescription | undefined = undefined;

if (parsedLog.name === 'BeneficiaryAdded') {
utils.Logger.info('Add Beneficiary event');
Expand All @@ -238,12 +268,17 @@ class ChainSubscribers {
});

if (user) {
await sendNotification([user.toJSON()], NotificationType.BENEFICIARY_ADDED, true, true, {
communityId: community
});
await sendNotification(
[user.toJSON()],
NotificationType.BENEFICIARY_ADDED,
true,
true,
{
communityId: community
},
transaction
);
}

result = parsedLog;
} else if (parsedLog.name === 'BeneficiaryRemoved') {
utils.Logger.info('Remove Beneficiary event');

Expand All @@ -253,19 +288,15 @@ class ChainSubscribers {
if (community) {
utils.cache.cleanBeneficiaryCache(community);
}

result = parsedLog;
}
return result;
} catch (error) {
utils.Logger.error('Failed to process Community Events:', error);
}
}

async _processMicrocreditEvents(log: ethers.providers.Log): Promise<ethers.utils.LogDescription | undefined> {
async _processMicrocreditEvents(log: ethers.providers.Log, transaction: Transaction): Promise<void> {
try {
const parsedLog = this.ifaceMicrocredit.parseLog(log);
let result: ethers.utils.LogDescription | undefined = undefined;
const userAddress = parsedLog.args[0];

if (parsedLog.name === 'LoanAdded') {
Expand All @@ -279,15 +310,23 @@ class ChainSubscribers {
});

if (user) {
await models.microCreditBorrowers.create({
userId: user.id,
performance: 0,
manager: (await this.provider.getTransaction(log.transactionHash)).from
});
await sendNotification([user.toJSON()], NotificationType.LOAN_ADDED);
await models.microCreditBorrowers.create(
{
userId: user.id,
performance: 0,
manager: (await this.provider.getTransaction(log.transactionHash)).from
},
{ transaction }
);
await sendNotification(
[user.toJSON()],
NotificationType.LOAN_ADDED,
true,
true,
undefined,
transaction
);
}

result = parsedLog;
} else if (parsedLog.name === 'ManagerChanged') {
utils.Logger.info('ManagerChanged event');

Expand All @@ -306,15 +345,12 @@ class ChainSubscribers {
{
where: {
userId: user.id
}
},
transaction
}
);
}

result = parsedLog;
}

return result;
} catch (error) {
utils.Logger.error('Failed to process Microcredit Events:', error);
}
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/utils/pushNotification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import localesConfig from '../utils/locale.json';
// it needs to be imported this way, or the initialize will fail
import { AppUser } from '../interfaces/app/appUser';
import { Logger } from './logger';
import { Transaction } from 'sequelize';
import { utils } from '../../index';
import admin from 'firebase-admin';
import config from '../config';
Expand All @@ -15,7 +16,8 @@ export async function sendNotification(
type: NotificationType,
isWallet: boolean = true,
isWebApp: boolean = true,
params?: object
params: object | undefined = undefined,
transaction: Transaction | undefined = undefined
) {
try {
// registry notification
Expand All @@ -26,7 +28,8 @@ export async function sendNotification(
isWallet,
isWebApp,
params
}))
})),
{ transaction }
);

// filter users that have walletPNT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { ChainSubscribers } from '../../../src/subscriber/chainSubscribers';
import CommunityAdminContractJSON from './CommunityAdmin.json';
import cUSDContractJSON from './cUSD.json';

describe('communityAdmin', () => {
describe('communityAdmin (chainSubscribers)', () => {
let provider: ethers.providers.Web3Provider;
let subscribers: ChainSubscribers;
let accounts: string[] = [];
Expand Down Expand Up @@ -104,6 +104,7 @@ describe('communityAdmin', () => {
where: {
requestByAddress: accounts[1]
},
transaction: match.any,
returning: true
}
);
Expand Down Expand Up @@ -141,7 +142,8 @@ describe('communityAdmin', () => {
{
where: {
contractAddress
}
},
transaction: match.any
}
);
});
Expand Down
Loading