diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 31e5ba7..0187c5b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,22 @@ on: [push] jobs: build: runs-on: ubuntu-latest + services: + postgres: + image: postgres:latest + env: + POSTGRES_DB: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + env: + DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 diff --git a/README.md b/README.md index 66dca9b..a0e1c32 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,31 @@ Evaluate service - [Meridian spec](https://www.notion.so/pl-strflt/Meridian-Design-Doc-07-Flexible-preprocessing-1b8f2f19ca7d4fd4b74a1e57e7d7ef8a?pvs=4) - [Meridian evaluate service](https://github.com/Meridian-IE/evaluate-service) +## Development + +Set up [PostgreSQL](https://www.postgresql.org/) with default settings: + - Port: 5432 + - User: _your system user name_ + - Password: _blank_ + - Database: voyager_stats + +Alternatively, set the environment variable `$DATABASE_URL` with +`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`. + +The Postgres user and database need to exist already, and the user +needs full management permissions for the database. + +You can also run the following command to set up the PostgreSQL server via Docker: + +```bash +docker run -d --name voyager-db \ + -e POSTGRES_HOST_AUTH_METHOD=trust \ + -e POSTGRES_USER=$USER \ + -e POSTGRES_DB=voyager_stats \ + -p 5432:5432 \ + postgres +``` + ## Run the tests ```bash diff --git a/bin/dry-run.js b/bin/dry-run.js index 0317b5e..1d5e3ed 100644 --- a/bin/dry-run.js +++ b/bin/dry-run.js @@ -101,7 +101,14 @@ await evaluate({ fetchRoundDetails, ieContractWithSigner, logger: console, - recordTelemetry + recordTelemetry, + + // We don't want dry runs to update data in `voyager_stats`, therefore we are passing a stub + // connection factory that creates no-op clients. This also keeps the setup simpler. The person + // executing a dry run does not need access to any Postgres instance. + // Evaluate uses the PG client only for updating the statistics, it's not reading any data. + // Thus it's safe to inject a no-op client. + createPgClient: createNoopPgClient }) console.log(process.memoryUsage()) @@ -177,3 +184,14 @@ async function fetchMeasurementsAddedFromChain (roundIndex) { return events.filter(e => e.roundIndex === roundIndex).map(e => e.cid) } + +function createNoopPgClient () { + return { + async query () { + return { rows: [] } + }, + async end () { + // no-op + } + } +} diff --git a/bin/migrate.js b/bin/migrate.js new file mode 100644 index 0000000..2b7d4ec --- /dev/null +++ b/bin/migrate.js @@ -0,0 +1,4 @@ +import { DATABASE_URL } from '../lib/config.js' +import { migrateWithPgConfig } from '../lib/migrate.js' + +await migrateWithPgConfig({ connectionString: DATABASE_URL }) diff --git a/bin/voyager-evaluate.js b/bin/voyager-evaluate.js index e7348b1..b693e80 100644 --- a/bin/voyager-evaluate.js +++ b/bin/voyager-evaluate.js @@ -1,5 +1,5 @@ import * as Sentry from '@sentry/node' -import { IE_CONTRACT_ADDRESS, RPC_URL, rpcHeaders } from '../lib/config.js' +import { DATABASE_URL, IE_CONTRACT_ADDRESS, RPC_URL, rpcHeaders } from '../lib/config.js' import { startEvaluate } from '../index.js' import { fetchRoundDetails } from '../lib/voyager-api.js' import assert from 'node:assert' @@ -9,6 +9,8 @@ import { newDelegatedEthAddress } from '@glif/filecoin-address' import { recordTelemetry } from '../lib/telemetry.js' import fs from 'node:fs/promises' import { fetchMeasurements } from '../lib/preprocess.js' +import { migrateWithPgConfig } from '../lib/migrate.js' +import pg from 'pg' const { SENTRY_ENVIRONMENT = 'development', @@ -24,6 +26,8 @@ Sentry.init({ assert(WALLET_SEED, 'WALLET_SEED required') +await migrateWithPgConfig({ connectionString: DATABASE_URL }) + const fetchRequest = new ethers.FetchRequest(RPC_URL) fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') const provider = new ethers.JsonRpcProvider( @@ -49,6 +53,12 @@ const ieContract = new ethers.Contract( ) const ieContractWithSigner = ieContract.connect(signer) +const createPgClient = async () => { + const pgClient = new pg.Client({ connectionString: DATABASE_URL }) + await pgClient.connect() + return pgClient +} + await startEvaluate({ ieContract, ieContractWithSigner, @@ -58,5 +68,6 @@ await startEvaluate({ fetchMeasurements, fetchRoundDetails, recordTelemetry, + createPgClient, logger: console }) diff --git a/index.js b/index.js index 5567362..aefc013 100644 --- a/index.js +++ b/index.js @@ -1,3 +1,4 @@ +import assert from 'node:assert' import * as Sentry from '@sentry/node' import { preprocess } from './lib/preprocess.js' import { evaluate } from './lib/evaluate.js' @@ -13,8 +14,11 @@ export const startEvaluate = async ({ fetchMeasurements, fetchRoundDetails, recordTelemetry, + createPgClient, logger }) => { + assert(typeof createPgClient === 'function', 'createPgClient must be a function') + const rounds = { current: null, previous: null @@ -87,6 +91,7 @@ export const startEvaluate = async ({ ieContractWithSigner, fetchRoundDetails, recordTelemetry, + createPgClient, logger }).catch(err => { console.error(err) diff --git a/lib/config.js b/lib/config.js index c8fe6b7..e3a0735 100644 --- a/lib/config.js +++ b/lib/config.js @@ -5,6 +5,7 @@ const { // RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1', RPC_URLS = 'https://api.node.glif.io/rpc/v0', GLIF_TOKEN, + DATABASE_URL = 'postgres://localhost:5432/voyager_stats', VOYAGER_API = 'https://voyager.filstation.app' } = process.env @@ -20,6 +21,7 @@ if (RPC_URL.includes('glif')) { export { IE_CONTRACT_ADDRESS, RPC_URL, + DATABASE_URL, VOYAGER_API, rpcHeaders } diff --git a/lib/evaluate.js b/lib/evaluate.js index ecedcd8..ea825d3 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -2,6 +2,7 @@ import createDebug from 'debug' import assert from 'node:assert' import * as Sentry from '@sentry/node' import { buildRetrievalStats, recordCommitteeSizes } from './retrieval-stats.js' +import { updatePublicStats } from './public-stats.js' const debug = createDebug('voyager:evaluate') @@ -44,6 +45,7 @@ export const createSetScoresBuckets = participants => { * @param {any} args.ieContractWithSigner * @param {import('./voyager-api').fetchRoundDetails} args.fetchRoundDetails, * @param {import('./typings').RecordTelemetryFn} args.recordTelemetry + * @param {import('./typings').CreatePgClient} args.createPgClient * @param {Console} args.logger */ export const evaluate = async ({ @@ -52,6 +54,7 @@ export const evaluate = async ({ ieContractWithSigner, fetchRoundDetails, recordTelemetry, + createPgClient, logger }) => { // Get measurements @@ -218,6 +221,13 @@ export const evaluate = async ({ console.error('Cannot record committees.', err) Sentry.captureException(err) } + + try { + await updatePublicStats({ createPgClient, honestMeasurements }) + } catch (err) { + console.error('Cannot update public stats.', err) + Sentry.captureException(err) + } } /** diff --git a/lib/migrate.js b/lib/migrate.js new file mode 100644 index 0000000..b48a5cb --- /dev/null +++ b/lib/migrate.js @@ -0,0 +1,43 @@ +import pg from 'pg' +import Postgrator from 'postgrator' +import { fileURLToPath } from 'node:url' +import { dirname, join } from 'node:path' + +const migrationsDirectory = join( + dirname(fileURLToPath(import.meta.url)), + '..', + 'migrations' +) + +/** + *@param {pg.Config} pgConfig + */ +export const migrateWithPgConfig = async (pgConfig) => { + const client = new pg.Client(pgConfig) + await client.connect() + try { + await migrateWithPgClient(client) + } finally { + await client.end() + } +} + +/** + * @param {pg.Client} client + */ +export const migrateWithPgClient = async (client) => { + const postgrator = new Postgrator({ + migrationPattern: join(migrationsDirectory, '*'), + driver: 'pg', + execQuery: (query) => client.query(query) + }) + console.log( + 'Migrating DB schema from version %s to version %s', + await postgrator.getDatabaseVersion(), + await postgrator.getMaxVersion() + ) + + await postgrator.migrate() + + console.log('Migrated DB schema to version', await postgrator.getDatabaseVersion()) +} diff --git a/lib/public-stats.js b/lib/public-stats.js new file mode 100644 index 0000000..21b8070 --- /dev/null +++ b/lib/public-stats.js @@ -0,0 +1,129 @@ +import assert from 'node:assert' +import createDebug from 'debug' + +const debug = createDebug('voyager:public-stats') + +/** + * @param {object} args + * @param {import('./typings').CreatePgClient} args.createPgClient + * @param {import('./preprocess').Measurement[]} args.honestMeasurements + */ +export const updatePublicStats = async ({ createPgClient, honestMeasurements }) => { + const retrievalStats = { total: 0, successful: 0 } + const participants = new Set() + for (const m of honestMeasurements) { + retrievalStats.total++ + if (m.retrievalResult === 'OK') retrievalStats.successful++ + + participants.add(m.participantAddress) + } + + const pgClient = await createPgClient() + try { + await updateRetrievalStats(pgClient, retrievalStats) + await updateDailyParticipants(pgClient, participants) + } finally { + await pgClient.end() + } +} + +/** + * @param {import('pg').Client} pgClient + * @param {object} stats + * @param {number} stats.total + * @param {number} stats.successful + */ +const updateRetrievalStats = async (pgClient, { total, successful }) => { + debug('Updating public retrieval stats: total += %s successful += %s', total, successful) + await pgClient.query(` + INSERT INTO retrieval_stats + (day, total, successful) + VALUES + (now(), $1, $2) + ON CONFLICT(day) DO UPDATE SET + total = retrieval_stats.total + $1, + successful = retrieval_stats.successful + $2 + `, [ + total, + successful + ]) +} + +/** + * @param {import('pg').Client} pgClient + * @param {Set} participants + */ +export const updateDailyParticipants = async (pgClient, participants) => { + debug('Updating daily participants, count=%s', participants.size) + const ids = await mapParticipantsToIds(pgClient, participants) + await pgClient.query(` + INSERT INTO daily_participants (day, participant_id) + SELECT now() as day, UNNEST($1::INT[]) AS participant_id + ON CONFLICT DO NOTHING + `, [ + ids + ]) +} + +/** + * @param {import('pg').Client} pgClient + * @param {Set} participantsSet + * @returns {Promise} A list of participant ids. The order of ids is not defined. + */ +export const mapParticipantsToIds = async (pgClient, participantsSet) => { + debug('Mapping participants to id, count=%s', participantsSet.size) + + /** @type {string[]} */ + const ids = [] + + // TODO: We can further optimise performance of this function by using + // an in-memory LRU cache. Our network has currently ~2k participants, + // we need ~50 bytes for each (address, id) pair, that's only ~100KB of data. + + // TODO: passing the entire list of participants as a single query parameter + // will probably not scale beyond several thousands of addresses. We will + // need to rework the queries to split large arrays into smaller batches. + + // In most rounds, we have already seen most of the participant addresses + // If we use "INSERT...ON CONFLICT", then PG increments id counter even for + // existing addresses where we end up skipping the insert. This could quickly + // exhaust the space of all 32bit integers. + // Solution: query the table for know records before running the insert. + // + // Caveat: In my testing, this query was not able to leverage the (unique) + // index on participants.participant_address and performed a full table scan + // after the array grew past ~10 items. If this becomes a problem, we can + // introduce the LRU cache mentioned above. + const { rows: found } = await pgClient.query( + 'SELECT * FROM participants WHERE participant_address = ANY($1::TEXT[])', + [Array.from(participantsSet.values())] + ) + debug('Known participants count=%s', found.length) + + // eslint-disable-next-line camelcase + for (const { id, participant_address } of found) { + ids.push(id) + participantsSet.delete(participant_address) + } + + debug('New participant addresses count=%s', participantsSet.size) + + // Register the new addresses. Use "INSERT...ON CONFLICT" to handle the race condition + // where another client may have registered these addresses between our previous + // SELECT query and the next INSERT query. + const newAddresses = Array.from(participantsSet.values()) + debug('Registering new participant addresses, count=%s', newAddresses.length) + const { rows: created } = await pgClient.query(` + INSERT INTO participants (participant_address) + SELECT UNNEST($1::TEXT[]) AS participant_address + ON CONFLICT(participant_address) DO UPDATE + -- this no-op update is needed to populate "RETURNING id" + SET participant_address = EXCLUDED.participant_address + RETURNING id + `, [ + newAddresses + ]) + + assert.strictEqual(created.length, newAddresses.length) + return ids.concat(created.map(r => r.id)) +} diff --git a/lib/typings.d.ts b/lib/typings.d.ts index 5a07ba5..a451b29 100644 --- a/lib/typings.d.ts +++ b/lib/typings.d.ts @@ -66,3 +66,5 @@ export interface GroupWinningStats { export interface FraudDetectionStats { groupWinning: GroupWinningStats } + +export type CreatePgClient = () => Promise; diff --git a/migrations/001.do.retrieval-stats.sql b/migrations/001.do.retrieval-stats.sql new file mode 100644 index 0000000..cbb71ca --- /dev/null +++ b/migrations/001.do.retrieval-stats.sql @@ -0,0 +1,5 @@ +CREATE TABLE retrieval_stats ( + day DATE NOT NULL PRIMARY KEY, + total INT NOT NULL, + successful INT NOT NULL +); diff --git a/migrations/003.do.daily-participants.sql b/migrations/003.do.daily-participants.sql new file mode 100644 index 0000000..8c629af --- /dev/null +++ b/migrations/003.do.daily-participants.sql @@ -0,0 +1,11 @@ +CREATE TABLE participants ( + id SERIAL NOT NULL PRIMARY KEY, + participant_address TEXT NOT NULL UNIQUE +); + +CREATE TABLE daily_participants ( + day DATE NOT NULL, + participant_id INTEGER NOT NULL REFERENCES participants(id), + PRIMARY KEY (day, participant_id) +); +CREATE INDEX daily_participants_day ON daily_participants (day); diff --git a/package-lock.json b/package-lock.json index 582aa99..ae4d6ed 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,11 +16,12 @@ "ipfs-unixfs-exporter": "^13.5.0", "just-percentile": "^4.2.0", "on-contract-event": "^1.0.2", - "p-retry": "^6.2.0" + "p-retry": "^6.2.0", + "pg": "^8.11.5", + "postgrator": "^7.2.0" }, "devDependencies": { "mocha": "^10.4.0", - "pg": "^8.11.5", "standard": "^17.1.0" } }, @@ -795,8 +796,7 @@ "node_modules/balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", - "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==", - "dev": true + "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==" }, "node_modules/base64-js": { "version": "1.5.1", @@ -871,7 +871,6 @@ "version": "1.1.11", "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", "integrity": "sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==", - "dev": true, "dependencies": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -1072,8 +1071,7 @@ "node_modules/concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", - "integrity": "sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==", - "dev": true + "integrity": "sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==" }, "node_modules/cross-spawn": { "version": "7.0.3", @@ -1922,8 +1920,7 @@ "node_modules/fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", - "integrity": "sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==", - "dev": true + "integrity": "sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==" }, "node_modules/fsevents": { "version": "2.3.3", @@ -2028,7 +2025,6 @@ "version": "7.2.3", "resolved": "https://registry.npmjs.org/glob/-/glob-7.2.3.tgz", "integrity": "sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==", - "dev": true, "dependencies": { "fs.realpath": "^1.0.0", "inflight": "^1.0.4", @@ -2286,7 +2282,6 @@ "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", "integrity": "sha512-k92I/b08q4wvFscXCLvqfsHCrjrF7yiXsQuIVvVE7N82W3+aqpzuUdBbfhWcy/FZR3/4IgflMgKLOsvPDrGCJA==", - "dev": true, "dependencies": { "once": "^1.3.0", "wrappy": "1" @@ -3112,7 +3107,6 @@ "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", "integrity": "sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==", - "dev": true, "dependencies": { "brace-expansion": "^1.1.7" }, @@ -3405,7 +3399,6 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", - "dev": true, "dependencies": { "wrappy": "1" } @@ -3557,7 +3550,6 @@ "version": "1.0.1", "resolved": "https://registry.npmjs.org/path-is-absolute/-/path-is-absolute-1.0.1.tgz", "integrity": "sha512-AVbw3UJ2e9bq64vSaS9Am0fje1Pa8pbGqTTsmXfaIiMpnr5DlDhfJOuLj9Sf95ZPVDAUerDfEk88MPmPe7UCQg==", - "dev": true, "engines": { "node": ">=0.10.0" } @@ -3581,7 +3573,6 @@ "version": "8.11.5", "resolved": "https://registry.npmjs.org/pg/-/pg-8.11.5.tgz", "integrity": "sha512-jqgNHSKL5cbDjFlHyYsCXmQDrfIX/3RsNwYqpd4N0Kt8niLuNoRNH+aazv6cOd43gPh9Y4DjQCtb+X0MH0Hvnw==", - "dev": true, "dependencies": { "pg-connection-string": "^2.6.4", "pg-pool": "^3.6.2", @@ -3608,20 +3599,17 @@ "version": "1.1.1", "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz", "integrity": "sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==", - "dev": true, "optional": true }, "node_modules/pg-connection-string": { "version": "2.6.4", "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.4.tgz", - "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==", - "dev": true + "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==" }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", - "dev": true, "engines": { "node": ">=4.0.0" } @@ -3630,7 +3618,6 @@ "version": "3.6.2", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.2.tgz", "integrity": "sha512-Htjbg8BlwXqSBQ9V8Vjtc+vzf/6fVUuak/3/XXKA9oxZprwW3IMDQTGHP+KDmVL7rtd+R1QjbnCFPuTHm3G4hg==", - "dev": true, "peerDependencies": { "pg": ">=8.0" } @@ -3638,14 +3625,12 @@ "node_modules/pg-protocol": { "version": "1.6.1", "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.1.tgz", - "integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==", - "dev": true + "integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==" }, "node_modules/pg-types": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", - "dev": true, "dependencies": { "pg-int8": "1.0.1", "postgres-array": "~2.0.0", @@ -3661,7 +3646,6 @@ "version": "1.0.5", "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", - "dev": true, "dependencies": { "split2": "^4.1.0" } @@ -3761,11 +3745,21 @@ "node": ">=4" } }, + "node_modules/postgrator": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/postgrator/-/postgrator-7.2.0.tgz", + "integrity": "sha512-rVi/X5//51Sj5SWsBb2knBn/GCWdFOXRdq4VATnNePLK1h2774j0byr5tsDDb5B+UWVAZftjq5VYCQaB6dXMWw==", + "dependencies": { + "glob": "^7.2.3" + }, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/postgres-array": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", - "dev": true, "engines": { "node": ">=4" } @@ -3774,7 +3768,6 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", "integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==", - "dev": true, "engines": { "node": ">=0.10.0" } @@ -3783,7 +3776,6 @@ "version": "1.0.7", "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", - "dev": true, "engines": { "node": ">=0.10.0" } @@ -3792,7 +3784,6 @@ "version": "1.2.0", "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", - "dev": true, "dependencies": { "xtend": "^4.0.0" }, @@ -4216,7 +4207,6 @@ "version": "4.2.0", "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", - "dev": true, "engines": { "node": ">= 10.x" } @@ -4752,8 +4742,7 @@ "node_modules/wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", - "dev": true + "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==" }, "node_modules/ws": { "version": "8.5.0", @@ -4788,7 +4777,6 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", - "dev": true, "engines": { "node": ">=0.4" } diff --git a/package.json b/package.json index 0133fb8..39d9ead 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,12 @@ { "type": "module", "scripts": { + "migrate": "node bin/migrate.js", "start": "node bin/voyager-evaluate.js", "test": "standard && mocha" }, "devDependencies": { "mocha": "^10.4.0", - "pg": "^8.11.5", "standard": "^17.1.0" }, "dependencies": { @@ -21,7 +21,9 @@ "ipfs-unixfs-exporter": "^13.5.0", "just-percentile": "^4.2.0", "on-contract-event": "^1.0.2", - "p-retry": "^6.2.0" + "p-retry": "^6.2.0", + "pg": "^8.11.5", + "postgrator": "^7.2.0" }, "standard": { "env": [ diff --git a/test/evaluate.js b/test/evaluate.js index 513865e..88b2ede 100644 --- a/test/evaluate.js +++ b/test/evaluate.js @@ -2,10 +2,13 @@ import { MAX_SCORE, evaluate, runFraudDetection, MAX_SET_SCORES_PARTICIPANTS, cr import { Point } from '../lib/telemetry.js' import assert from 'node:assert' import createDebug from 'debug' -import { VALID_MEASUREMENT, VALID_TASK } from './helpers/test-data.js' +import { VALID_MEASUREMENT, VALID_TASK, today } from './helpers/test-data.js' import { assertPointFieldValue } from './helpers/assertions.js' import { RoundData } from '../lib/round.js' +import { DATABASE_URL } from '../lib/config.js' +import pg from 'pg' import { beforeEach } from 'mocha' +import { migrateWithPgClient } from '../lib/migrate.js' const debug = createDebug('test') const logger = { log: debug, error: debug } @@ -19,7 +22,27 @@ const recordTelemetry = (measurementName, fn) => { } beforeEach(() => telemetry.splice(0)) +const createPgClient = async () => { + const pgClient = new pg.Client({ connectionString: DATABASE_URL }) + await pgClient.connect() + return pgClient +} + describe('evaluate', () => { + let pgClient + before(async () => { + pgClient = await createPgClient() + await migrateWithPgClient(pgClient) + }) + + beforeEach(async () => { + await pgClient.query('DELETE FROM retrieval_stats') + }) + + after(async () => { + await pgClient.end() + }) + it('evaluates measurements', async () => { const round = new RoundData(0) for (let i = 0; i < 10; i++) { @@ -42,6 +65,7 @@ describe('evaluate', () => { ieContractWithSigner, fetchRoundDetails, recordTelemetry, + createPgClient, logger }) assert.strictEqual(setScoresCalls.length, 1) @@ -58,6 +82,13 @@ describe('evaluate', () => { `No telemetry point "evaluate" was recorded. Actual points: ${JSON.stringify(telemetry.map(p => p.name))}`) assertPointFieldValue(point, 'total_nodes', '1i') // TODO: assert more point fields + + const { rows: publicStats } = await pgClient.query('SELECT * FROM retrieval_stats') + assert.deepStrictEqual(publicStats, [{ + day: today(), + total: 1, + successful: 1 + }]) }) it('handles empty rounds', async () => { @@ -79,6 +110,7 @@ describe('evaluate', () => { ieContractWithSigner, fetchRoundDetails, recordTelemetry, + createPgClient, logger }) assert.strictEqual(setScoresCalls.length, 1) @@ -130,6 +162,7 @@ describe('evaluate', () => { ieContractWithSigner, fetchRoundDetails, recordTelemetry, + createPgClient, logger }) assert.strictEqual(setScoresCalls.length, 1) @@ -170,6 +203,7 @@ describe('evaluate', () => { ieContractWithSigner, recordTelemetry, fetchRoundDetails, + createPgClient, logger }) assert.strictEqual(setScoresCalls.length, 1) @@ -218,6 +252,7 @@ describe('evaluate', () => { ieContractWithSigner, recordTelemetry, fetchRoundDetails, + createPgClient, logger }) assert.strictEqual(setScoresCalls.length, 1) @@ -257,6 +292,7 @@ describe('evaluate', () => { ieContractWithSigner, recordTelemetry, fetchRoundDetails, + createPgClient, logger }) diff --git a/test/integration.test.js b/test/integration.test.js index ba32b28..ff2e6c4 100644 --- a/test/integration.test.js +++ b/test/integration.test.js @@ -1,7 +1,10 @@ // import createDebug from 'debug' // import { beforeEach } from 'mocha' // import assert from 'node:assert' +// import pg from 'pg' +// import { DATABASE_URL } from '../lib/config.js' // import { evaluate } from '../lib/evaluate.js' +// import { migrateWithPgClient } from '../lib/migrate.js' // import { fetchMeasurements, preprocess } from '../lib/preprocess.js' // import { RoundData } from '../lib/round.js' // import { fetchRoundDetails } from '../lib/voyager-api.js' @@ -35,7 +38,27 @@ // } // }) +// const createPgClient = async () => { +// const pgClient = new pg.Client({ connectionString: DATABASE_URL }) +// await pgClient.connect() +// return pgClient +// } + describe('preprocess-evaluate integration', () => { + // let pgClient + // before(async () => { + // pgClient = await createPgClient() + // await migrateWithPgClient(pgClient) + // }) + + // beforeEach(async () => { + // await pgClient.query('DELETE FROM retrieval_stats') + // }) + + // after(async () => { + // await pgClient.end() + // }) + // TODO: Need existing round with measurements it('produces expected results' /*, async function () { this.timeout(10000) @@ -57,6 +80,7 @@ describe('preprocess-evaluate integration', () => { const ieContractWithSigner = createIeContractWithSigner(MERIDIAN_VERSION) await evaluate({ + createPgClient, fetchRoundDetails, ieContractWithSigner, logger, diff --git a/test/public-stats.test.js b/test/public-stats.test.js new file mode 100644 index 0000000..621b892 --- /dev/null +++ b/test/public-stats.test.js @@ -0,0 +1,143 @@ +import assert from 'node:assert' +import pg from 'pg' + +import { DATABASE_URL } from '../lib/config.js' +import { migrateWithPgClient } from '../lib/migrate.js' +import { VALID_MEASUREMENT } from './helpers/test-data.js' +import { mapParticipantsToIds, updateDailyParticipants, updatePublicStats } from '../lib/public-stats.js' +import { beforeEach } from 'mocha' + +const createPgClient = async () => { + const pgClient = new pg.Client({ connectionString: DATABASE_URL }) + await pgClient.connect() + return pgClient +} + +describe('public-stats', () => { + let pgClient + before(async () => { + pgClient = await createPgClient() + await migrateWithPgClient(pgClient) + }) + + let today + beforeEach(async () => { + await pgClient.query('DELETE FROM retrieval_stats') + await pgClient.query('DELETE FROM daily_participants') + // empty `participants` table in such way that the next participants.id will be always 1 + await pgClient.query('TRUNCATE TABLE participants RESTART IDENTITY CASCADE') + + // Run all tests inside a transaction to ensure `now()` always returns the same value + // See https://dba.stackexchange.com/a/63549/125312 + // This avoids subtle race conditions when the tests are executed around midnight. + await pgClient.query('BEGIN TRANSACTION') + + today = await getCurrentDate() + }) + + afterEach(async () => { + await pgClient.query('END TRANSACTION') + }) + + after(async () => { + await pgClient.end() + }) + + describe('retrieval_stats', () => { + it('creates or updates the row for today', async () => { + /** @type {import('../lib/preprocess').Measurement[]} */ + const honestMeasurements = [ + { ...VALID_MEASUREMENT, retrievalResult: 'OK' }, + { ...VALID_MEASUREMENT, retrievalResult: 'TIMEOUT' } + ] + await updatePublicStats({ createPgClient, honestMeasurements }) + + const { rows: created } = await pgClient.query( + 'SELECT day::TEXT, total, successful FROM retrieval_stats' + ) + assert.deepStrictEqual(created, [ + { day: today, total: 2, successful: 1 } + ]) + + honestMeasurements.push({ ...VALID_MEASUREMENT, retrievalResult: 'UNKNOWN_ERROR' }) + await updatePublicStats({ createPgClient, honestMeasurements }) + + const { rows: updated } = await pgClient.query( + 'SELECT day::TEXT, total, successful FROM retrieval_stats' + ) + assert.deepStrictEqual(updated, [ + { day: today, total: 2 + 3, successful: 1 + 1 } + ]) + }) + }) + + describe('daily_participants', () => { + it('submits daily_participants data for today', async () => { + /** @type {import('../lib/preprocess').Measurement[]} */ + const honestMeasurements = [ + { ...VALID_MEASUREMENT, participantAddress: '0x10' }, + { ...VALID_MEASUREMENT, participantAddress: '0x10' }, + { ...VALID_MEASUREMENT, participantAddress: '0x20' } + ] + await updatePublicStats({ createPgClient, honestMeasurements }) + + const { rows } = await pgClient.query( + 'SELECT day::TEXT, participant_id FROM daily_participants' + ) + assert.deepStrictEqual(rows, [ + { day: today, participant_id: 1 }, + { day: today, participant_id: 2 } + ]) + }) + + it('creates a new daily_participants row', async () => { + await updateDailyParticipants(pgClient, new Set(['0x10', '0x20'])) + + const { rows: created } = await pgClient.query( + 'SELECT day::TEXT, participant_id FROM daily_participants' + ) + assert.deepStrictEqual(created, [ + { day: today, participant_id: 1 }, + { day: today, participant_id: 2 } + ]) + }) + + it('handles participants already seen today', async () => { + await updateDailyParticipants(pgClient, new Set(['0x10', '0x20'])) + await updateDailyParticipants(pgClient, new Set(['0x10', '0x30', '0x20'])) + + const { rows: created } = await pgClient.query( + 'SELECT day::TEXT, participant_id FROM daily_participants' + ) + assert.deepStrictEqual(created, [ + { day: today, participant_id: 1 }, + { day: today, participant_id: 2 }, + { day: today, participant_id: 3 } + ]) + }) + + it('maps new participant addresses to new ids', async () => { + const ids = await mapParticipantsToIds(pgClient, new Set(['0x10', '0x20'])) + ids.sort() + assert.deepStrictEqual(ids, [1, 2]) + }) + + it('maps existing participants to their existing ids', async () => { + const participants = new Set(['0x10', '0x20']) + const first = await mapParticipantsToIds(pgClient, participants) + first.sort() + assert.deepStrictEqual(first, [1, 2]) + + participants.add('0x30') + participants.add('0x40') + const second = await mapParticipantsToIds(pgClient, participants) + second.sort() + assert.deepStrictEqual(second, [1, 2, 3, 4]) + }) + }) + + const getCurrentDate = async () => { + const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') + return today + } +})