From 19e63273e6f1e42ff79db6fb14dc745adc0b7442 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Thu, 12 Sep 2024 00:44:59 +0200 Subject: [PATCH 1/2] get recent participants from `spark-evaluations-recent-participants` --- observer/bin/dry-run.js | 18 ++++++++++++------ observer/bin/spark-observer.js | 14 ++++++++++---- observer/lib/observer.js | 18 +++++++----------- observer/test/observer.test.js | 23 ++++++++++++++++++----- package-lock.json | 8 ++++++++ package.json | 3 +++ 6 files changed, 58 insertions(+), 26 deletions(-) diff --git a/observer/bin/dry-run.js b/observer/bin/dry-run.js index fa862be..4a7a1e5 100644 --- a/observer/bin/dry-run.js +++ b/observer/bin/dry-run.js @@ -1,23 +1,29 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' +import * as SparkEvaluationsRecentParticipants from '@filecoin-station/spark-evaluations-recent-participants' import { ethers } from 'ethers' import { RPC_URL, rpcHeaders } from '../lib/config.js' import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js' -import { getPgPools } from '@filecoin-station/spark-stats-db' +import { getStatsPgPool } from '@filecoin-station/spark-stats-db' -const pgPools = await getPgPools() +const pgPoolStats = await getStatsPgPool() const fetchRequest = new ethers.FetchRequest(RPC_URL) fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true }) const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) +const recentParticipantsContract = new ethers.Contract( + SparkEvaluationsRecentParticipants.ADDRESS, + SparkEvaluationsRecentParticipants.ABI, + provider +) -await pgPools.stats.query('DELETE FROM daily_reward_transfers') +await pgPoolStats.query('DELETE FROM daily_reward_transfers') await Promise.all([ - observeTransferEvents(pgPools.stats, ieContract, provider), - observeScheduledRewards(pgPools, ieContract) + observeTransferEvents(pgPoolStats, ieContract, provider), + observeScheduledRewards(pgPoolStats, ieContract, recentParticipantsContract) ]) -await pgPools.stats.end() +await pgPoolStats.end() diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 3a67248..cd552dd 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -1,23 +1,29 @@ import '../lib/instrument.js' import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' +import * as SparkEvaluationsRecentParticipants from '@filecoin-station/spark-evaluations-recent-participants' import { ethers } from 'ethers' import * as Sentry from '@sentry/node' import timers from 'node:timers/promises' import { RPC_URL, rpcHeaders } from '../lib/config.js' -import { getPgPools } from '@filecoin-station/spark-stats-db' +import { getStatsPgPool } from '@filecoin-station/spark-stats-db' import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js' -const pgPools = await getPgPools() +const pgPoolStats = await getStatsPgPool() const fetchRequest = new ethers.FetchRequest(RPC_URL) fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true }) const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) +const recentParticipantsContract = new ethers.Contract( + SparkEvaluationsRecentParticipants.ADDRESS, + SparkEvaluationsRecentParticipants.ABI, + provider +) const ONE_HOUR = 60 * 60 * 1000 @@ -25,7 +31,7 @@ const loopObserveTransferEvents = async () => { while (true) { const start = Date.now() try { - await observeTransferEvents(pgPools.stats, ieContract, provider) + await observeTransferEvents(pgPoolStats, ieContract, provider) } catch (e) { console.error(e) Sentry.captureException(e) @@ -40,7 +46,7 @@ const loopObserveScheduledRewards = async () => { while (true) { const start = Date.now() try { - await observeScheduledRewards(pgPools, ieContract) + await observeScheduledRewards(pgPoolStats, ieContract, recentParticipantsContract) } catch (e) { console.error(e) Sentry.captureException(e) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index f3c3e4a..0694000 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -37,18 +37,14 @@ export const observeTransferEvents = async (pgPoolStats, ieContract, provider) = /** * Observe scheduled rewards on the Filecoin blockchain - * @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools + * @param {import('@filecoin-station/spark-stats-db').Queryable} pgPoolStats * @param {import('ethers').Contract} ieContract + * @param {import('ethers').Contract} recentParticipantsContract */ -export const observeScheduledRewards = async (pgPools, ieContract) => { +export const observeScheduledRewards = async (pgPoolStats, ieContract, recentParticipantsContract) => { console.log('Querying scheduled rewards from impact evaluator') - const { rows } = await pgPools.evaluate.query(` - SELECT participant_address - FROM participants p - JOIN daily_participants d ON p.id = d.participant_id - WHERE d.day >= now() - interval '3 days' - `) - for (const { participant_address: address } of rows) { + const participants = await recentParticipantsContract.get() + for (const address of participants) { let scheduledRewards try { scheduledRewards = await ieContract.rewardsScheduledFor(address) @@ -59,10 +55,10 @@ export const observeScheduledRewards = async (pgPools, ieContract) => { address, { cause: err } ) - continue + continue } console.log('Scheduled rewards for', address, scheduledRewards) - await pgPools.stats.query(` + await pgPoolStats.query(` INSERT INTO daily_scheduled_rewards (day, participant_address, scheduled_rewards) VALUES (now(), $1, $2) diff --git a/observer/test/observer.test.js b/observer/test/observer.test.js index 52bcab1..cacf0c1 100644 --- a/observer/test/observer.test.js +++ b/observer/test/observer.test.js @@ -131,11 +131,8 @@ describe('observer', () => { beforeEach(async () => { await pgPools.evaluate.query('DELETE FROM recent_station_details') await pgPools.evaluate.query('DELETE FROM recent_participant_subnets') - await pgPools.evaluate.query('DELETE FROM daily_participants') await pgPools.evaluate.query('DELETE FROM participants') await pgPools.stats.query('DELETE FROM daily_scheduled_rewards') - await givenDailyParticipants(pgPools.evaluate, today(), ['0xCURRENT']) - await givenDailyParticipants(pgPools.evaluate, '2000-01-01', ['0xOLD']) }) it('observes scheduled rewards', async () => { @@ -149,7 +146,15 @@ describe('observer', () => { } } } - await observeScheduledRewards(pgPools, ieContract) + /** @type {any} */ + const recentParticipantsContract = { + get: async () => ['0xCURRENT'] + } + await observeScheduledRewards( + pgPools.stats, + ieContract, + recentParticipantsContract + ) const { rows } = await pgPools.stats.query(` SELECT participant_address, scheduled_rewards FROM daily_scheduled_rewards @@ -164,7 +169,15 @@ describe('observer', () => { const ieContract = { rewardsScheduledFor: async () => 200n } - await observeScheduledRewards(pgPools, ieContract) + /** @type {any} */ + const recentParticipantsContract = { + get: async () => ['0xCURRENT'] + } + await observeScheduledRewards( + pgPools.stats, + ieContract, + recentParticipantsContract + ) const { rows } = await pgPools.stats.query(` SELECT participant_address, scheduled_rewards FROM daily_scheduled_rewards diff --git a/package-lock.json b/package-lock.json index 1069ebb..1edf797 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,9 @@ "observer", "stats" ], + "dependencies": { + "@filecoin-station/spark-evaluations-recent-participants": "^3.0.0" + }, "devDependencies": { "@types/mocha": "^10.0.7", "@types/pg": "^8.11.8", @@ -105,6 +108,11 @@ "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, + "node_modules/@filecoin-station/spark-evaluations-recent-participants": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@filecoin-station/spark-evaluations-recent-participants/-/spark-evaluations-recent-participants-3.0.0.tgz", + "integrity": "sha512-t55W0+1gqgIaUW/GvNEGzfRegnn0oKYlfHpNfhj2hb0/5fDyphSwoeQYZkVD4BZg758gJci18uC5wnocUUx0uQ==" + }, "node_modules/@filecoin-station/spark-impact-evaluator": { "version": "1.1.1", "license": "(Apache-2.0 AND MIT)" diff --git a/package.json b/package.json index 3c040ee..43038ad 100644 --- a/package.json +++ b/package.json @@ -25,5 +25,8 @@ "env": [ "mocha" ] + }, + "dependencies": { + "@filecoin-station/spark-evaluations-recent-participants": "^3.0.0" } } From ee308751c9d4ddbfa19892e0c6aa7f6bed7077ff Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Thu, 12 Sep 2024 10:31:21 +0200 Subject: [PATCH 2/2] participants contains duplicates --- observer/lib/observer.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index 0694000..49fd3ac 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -44,7 +44,14 @@ export const observeTransferEvents = async (pgPoolStats, ieContract, provider) = export const observeScheduledRewards = async (pgPoolStats, ieContract, recentParticipantsContract) => { console.log('Querying scheduled rewards from impact evaluator') const participants = await recentParticipantsContract.get() + const participantsSeen = new Map() for (const address of participants) { + // participants contains duplicates + if (participantsSeen.has(address)) { + continue + } + participantsSeen.set(address, true) + let scheduledRewards try { scheduledRewards = await ieContract.rewardsScheduledFor(address)