Skip to content
This repository has been archived by the owner on Jun 3, 2024. It is now read-only.

Commit

Permalink
add back stats, remove spark specific code (#9)
Browse files Browse the repository at this point in the history
* add back stats, remove spark specific code

* fix db name
  • Loading branch information
juliangruber authored Apr 10, 2024
1 parent 53179b3 commit e33e814
Show file tree
Hide file tree
Showing 18 changed files with 511 additions and 37 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@ on: [push]
jobs:
build:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:latest
env:
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@ Evaluate service
- [Meridian spec](https://www.notion.so/pl-strflt/Meridian-Design-Doc-07-Flexible-preprocessing-1b8f2f19ca7d4fd4b74a1e57e7d7ef8a?pvs=4)
- [Meridian evaluate service](https://github.com/Meridian-IE/evaluate-service)

## Development

Set up [PostgreSQL](https://www.postgresql.org/) with default settings:
- Port: 5432
- User: _your system user name_
- Password: _blank_
- Database: voyager_stats

Alternatively, set the environment variable `$DATABASE_URL` with
`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`.

The Postgres user and database need to exist already, and the user
needs full management permissions for the database.

You can also run the following command to set up the PostgreSQL server via Docker:

```bash
docker run -d --name voyager-db \
-e POSTGRES_HOST_AUTH_METHOD=trust \
-e POSTGRES_USER=$USER \
-e POSTGRES_DB=voyager_stats \
-p 5432:5432 \
postgres
```

## Run the tests

```bash
Expand Down
20 changes: 19 additions & 1 deletion bin/dry-run.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ await evaluate({
fetchRoundDetails,
ieContractWithSigner,
logger: console,
recordTelemetry
recordTelemetry,

// We don't want dry runs to update data in `voyager_stats`, therefore we are passing a stub
// connection factory that creates no-op clients. This also keeps the setup simpler. The person
// executing a dry run does not need access to any Postgres instance.
// Evaluate uses the PG client only for updating the statistics, it's not reading any data.
// Thus it's safe to inject a no-op client.
createPgClient: createNoopPgClient
})

console.log(process.memoryUsage())
Expand Down Expand Up @@ -177,3 +184,14 @@ async function fetchMeasurementsAddedFromChain (roundIndex) {

return events.filter(e => e.roundIndex === roundIndex).map(e => e.cid)
}

function createNoopPgClient () {
return {
async query () {
return { rows: [] }
},
async end () {
// no-op
}
}
}
4 changes: 4 additions & 0 deletions bin/migrate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgConfig } from '../lib/migrate.js'

await migrateWithPgConfig({ connectionString: DATABASE_URL })
13 changes: 12 additions & 1 deletion bin/voyager-evaluate.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Sentry from '@sentry/node'
import { IE_CONTRACT_ADDRESS, RPC_URL, rpcHeaders } from '../lib/config.js'
import { DATABASE_URL, IE_CONTRACT_ADDRESS, RPC_URL, rpcHeaders } from '../lib/config.js'
import { startEvaluate } from '../index.js'
import { fetchRoundDetails } from '../lib/voyager-api.js'
import assert from 'node:assert'
Expand All @@ -9,6 +9,8 @@ import { newDelegatedEthAddress } from '@glif/filecoin-address'
import { recordTelemetry } from '../lib/telemetry.js'
import fs from 'node:fs/promises'
import { fetchMeasurements } from '../lib/preprocess.js'
import { migrateWithPgConfig } from '../lib/migrate.js'
import pg from 'pg'

const {
SENTRY_ENVIRONMENT = 'development',
Expand All @@ -24,6 +26,8 @@ Sentry.init({

assert(WALLET_SEED, 'WALLET_SEED required')

await migrateWithPgConfig({ connectionString: DATABASE_URL })

const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(
Expand All @@ -49,6 +53,12 @@ const ieContract = new ethers.Contract(
)
const ieContractWithSigner = ieContract.connect(signer)

const createPgClient = async () => {
const pgClient = new pg.Client({ connectionString: DATABASE_URL })
await pgClient.connect()
return pgClient
}

await startEvaluate({
ieContract,
ieContractWithSigner,
Expand All @@ -58,5 +68,6 @@ await startEvaluate({
fetchMeasurements,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger: console
})
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from 'node:assert'
import * as Sentry from '@sentry/node'
import { preprocess } from './lib/preprocess.js'
import { evaluate } from './lib/evaluate.js'
Expand All @@ -13,8 +14,11 @@ export const startEvaluate = async ({
fetchMeasurements,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger
}) => {
assert(typeof createPgClient === 'function', 'createPgClient must be a function')

const rounds = {
current: null,
previous: null
Expand Down Expand Up @@ -87,6 +91,7 @@ export const startEvaluate = async ({
ieContractWithSigner,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger
}).catch(err => {
console.error(err)
Expand Down
2 changes: 2 additions & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
// RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1',
RPC_URLS = 'https://api.node.glif.io/rpc/v0',
GLIF_TOKEN,
DATABASE_URL = 'postgres://localhost:5432/voyager_stats',
VOYAGER_API = 'https://voyager.filstation.app'
} = process.env

Expand All @@ -20,6 +21,7 @@ if (RPC_URL.includes('glif')) {
export {
IE_CONTRACT_ADDRESS,
RPC_URL,
DATABASE_URL,
VOYAGER_API,
rpcHeaders
}
10 changes: 10 additions & 0 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import createDebug from 'debug'
import assert from 'node:assert'
import * as Sentry from '@sentry/node'
import { buildRetrievalStats, recordCommitteeSizes } from './retrieval-stats.js'
import { updatePublicStats } from './public-stats.js'

const debug = createDebug('voyager:evaluate')

Expand Down Expand Up @@ -44,6 +45,7 @@ export const createSetScoresBuckets = participants => {
* @param {any} args.ieContractWithSigner
* @param {import('./voyager-api').fetchRoundDetails} args.fetchRoundDetails,
* @param {import('./typings').RecordTelemetryFn} args.recordTelemetry
* @param {import('./typings').CreatePgClient} args.createPgClient
* @param {Console} args.logger
*/
export const evaluate = async ({
Expand All @@ -52,6 +54,7 @@ export const evaluate = async ({
ieContractWithSigner,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger
}) => {
// Get measurements
Expand Down Expand Up @@ -218,6 +221,13 @@ export const evaluate = async ({
console.error('Cannot record committees.', err)
Sentry.captureException(err)
}

try {
await updatePublicStats({ createPgClient, honestMeasurements })
} catch (err) {
console.error('Cannot update public stats.', err)
Sentry.captureException(err)
}
}

/**
Expand Down
43 changes: 43 additions & 0 deletions lib/migrate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pg from 'pg'
import Postgrator from 'postgrator'
import { fileURLToPath } from 'node:url'
import { dirname, join } from 'node:path'

const migrationsDirectory = join(
dirname(fileURLToPath(import.meta.url)),
'..',
'migrations'
)

/**
*@param {pg.Config} pgConfig
*/
export const migrateWithPgConfig = async (pgConfig) => {
const client = new pg.Client(pgConfig)
await client.connect()
try {
await migrateWithPgClient(client)
} finally {
await client.end()
}
}

/**
* @param {pg.Client} client
*/
export const migrateWithPgClient = async (client) => {
const postgrator = new Postgrator({
migrationPattern: join(migrationsDirectory, '*'),
driver: 'pg',
execQuery: (query) => client.query(query)
})
console.log(
'Migrating DB schema from version %s to version %s',
await postgrator.getDatabaseVersion(),
await postgrator.getMaxVersion()
)

await postgrator.migrate()

console.log('Migrated DB schema to version', await postgrator.getDatabaseVersion())
}
129 changes: 129 additions & 0 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import assert from 'node:assert'
import createDebug from 'debug'

const debug = createDebug('voyager:public-stats')

/**
* @param {object} args
* @param {import('./typings').CreatePgClient} args.createPgClient
* @param {import('./preprocess').Measurement[]} args.honestMeasurements
*/
export const updatePublicStats = async ({ createPgClient, honestMeasurements }) => {
const retrievalStats = { total: 0, successful: 0 }
const participants = new Set()
for (const m of honestMeasurements) {
retrievalStats.total++
if (m.retrievalResult === 'OK') retrievalStats.successful++

participants.add(m.participantAddress)
}

const pgClient = await createPgClient()
try {
await updateRetrievalStats(pgClient, retrievalStats)
await updateDailyParticipants(pgClient, participants)
} finally {
await pgClient.end()
}
}

/**
* @param {import('pg').Client} pgClient
* @param {object} stats
* @param {number} stats.total
* @param {number} stats.successful
*/
const updateRetrievalStats = async (pgClient, { total, successful }) => {
debug('Updating public retrieval stats: total += %s successful += %s', total, successful)
await pgClient.query(`
INSERT INTO retrieval_stats
(day, total, successful)
VALUES
(now(), $1, $2)
ON CONFLICT(day) DO UPDATE SET
total = retrieval_stats.total + $1,
successful = retrieval_stats.successful + $2
`, [
total,
successful
])
}

/**
* @param {import('pg').Client} pgClient
* @param {Set<string>} participants
*/
export const updateDailyParticipants = async (pgClient, participants) => {
debug('Updating daily participants, count=%s', participants.size)
const ids = await mapParticipantsToIds(pgClient, participants)
await pgClient.query(`
INSERT INTO daily_participants (day, participant_id)
SELECT now() as day, UNNEST($1::INT[]) AS participant_id
ON CONFLICT DO NOTHING
`, [
ids
])
}

/**
* @param {import('pg').Client} pgClient
* @param {Set<string>} participantsSet
* @returns {Promise<string[]>} A list of participant ids. The order of ids is not defined.
*/
export const mapParticipantsToIds = async (pgClient, participantsSet) => {
debug('Mapping participants to id, count=%s', participantsSet.size)

/** @type {string[]} */
const ids = []

// TODO: We can further optimise performance of this function by using
// an in-memory LRU cache. Our network has currently ~2k participants,
// we need ~50 bytes for each (address, id) pair, that's only ~100KB of data.

// TODO: passing the entire list of participants as a single query parameter
// will probably not scale beyond several thousands of addresses. We will
// need to rework the queries to split large arrays into smaller batches.

// In most rounds, we have already seen most of the participant addresses
// If we use "INSERT...ON CONFLICT", then PG increments id counter even for
// existing addresses where we end up skipping the insert. This could quickly
// exhaust the space of all 32bit integers.
// Solution: query the table for know records before running the insert.
//
// Caveat: In my testing, this query was not able to leverage the (unique)
// index on participants.participant_address and performed a full table scan
// after the array grew past ~10 items. If this becomes a problem, we can
// introduce the LRU cache mentioned above.
const { rows: found } = await pgClient.query(
'SELECT * FROM participants WHERE participant_address = ANY($1::TEXT[])',
[Array.from(participantsSet.values())]
)
debug('Known participants count=%s', found.length)

// eslint-disable-next-line camelcase
for (const { id, participant_address } of found) {
ids.push(id)
participantsSet.delete(participant_address)
}

debug('New participant addresses count=%s', participantsSet.size)

// Register the new addresses. Use "INSERT...ON CONFLICT" to handle the race condition
// where another client may have registered these addresses between our previous
// SELECT query and the next INSERT query.
const newAddresses = Array.from(participantsSet.values())
debug('Registering new participant addresses, count=%s', newAddresses.length)
const { rows: created } = await pgClient.query(`
INSERT INTO participants (participant_address)
SELECT UNNEST($1::TEXT[]) AS participant_address
ON CONFLICT(participant_address) DO UPDATE
-- this no-op update is needed to populate "RETURNING id"
SET participant_address = EXCLUDED.participant_address
RETURNING id
`, [
newAddresses
])

assert.strictEqual(created.length, newAddresses.length)
return ids.concat(created.map(r => r.id))
}
2 changes: 2 additions & 0 deletions lib/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ export interface GroupWinningStats {
export interface FraudDetectionStats {
groupWinning: GroupWinningStats
}

export type CreatePgClient = () => Promise<import('pg').Client>;
Loading

0 comments on commit e33e814

Please sign in to comment.