Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into index-measurements-finished-at
Browse files Browse the repository at this point in the history
  • Loading branch information
bajtos authored May 6, 2024
2 parents bb5b498 + c6e6d3b commit 7019500
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 220 deletions.
1 change: 1 addition & 0 deletions migrations/045.do.measurement-lock.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE measurements ADD COLUMN locked_by_pid INTEGER;
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 40 additions & 29 deletions voyager-publish/bin/voyager-publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { spawn } from 'node:child_process'
import { once } from 'events'
import { fileURLToPath } from 'node:url'
import { rpcUrls } from '../ie-contract-config.js'
import pg from 'pg'

const {
SENTRY_ENVIRONMENT = 'development',
Expand All @@ -15,7 +16,9 @@ const {
MAX_MEASUREMENTS_PER_ROUND = 1000,
// See https://web3.storage/docs/how-to/upload/#bring-your-own-agent
W3UP_PRIVATE_KEY,
W3UP_PROOF
W3UP_PROOF,
CONCURRENCY = 2,
DATABASE_URL
} = process.env

Sentry.init({
Expand All @@ -40,34 +43,42 @@ console.log(

let rpcUrlIndex = 0

while (true) {
const lastStart = new Date()
const ps = spawn(
'node',
[
'--unhandled-rejections=strict',
fileURLToPath(new URL('publish-batch.js', import.meta.url))
],
{
env: {
...process.env,
MIN_ROUND_LENGTH_SECONDS,
MAX_MEASUREMENTS_PER_ROUND,
WALLET_SEED,
W3UP_PRIVATE_KEY,
W3UP_PROOF,
RPC_URLS: rpcUrls[rpcUrlIndex % rpcUrls.length]
const client = new pg.Pool({ connectionString: DATABASE_URL })
await client.query('UPDATE measurements SET locked_by_pid = NULL')
await Promise.all(new Array(CONCURRENCY).fill().map(() => async () => {
while (true) {
const lastStart = new Date()
const ps = spawn(
'node',
[
'--unhandled-rejections=strict',
fileURLToPath(new URL('publish-batch.js', import.meta.url))
],
{
env: {
...process.env,
MIN_ROUND_LENGTH_SECONDS,
MAX_MEASUREMENTS_PER_ROUND,
WALLET_SEED,
W3UP_PRIVATE_KEY,
W3UP_PROOF,
RPC_URLS: rpcUrls[rpcUrlIndex % rpcUrls.length]
}
}
)
ps.stdout.pipe(process.stdout)
ps.stderr.pipe(process.stderr)
const [code] = await once(ps, 'exit')
if (code !== 0) {
console.error(`Bad exit code: ${code}`)
Sentry.captureMessage(`Bad exit code: ${code}`)
rpcUrlIndex++
}
)
ps.stdout.pipe(process.stdout)
ps.stderr.pipe(process.stderr)
const [code] = await once(ps, 'exit')
if (code !== 0) {
console.error(`Bad exit code: ${code}`)
Sentry.captureMessage(`Bad exit code: ${code}`)
rpcUrlIndex++
await client.query(
'UPDATE measurements SET locked_by_pid = NULL WHERE locked_by_pid = $1',
[ps.pid]
)
const dt = new Date() - lastStart
if (dt < minRoundLength) await timers.setTimeout(minRoundLength - dt)
}
const dt = new Date() - lastStart
if (dt < minRoundLength) await timers.setTimeout(minRoundLength - dt)
}
}))
126 changes: 82 additions & 44 deletions voyager-publish/index.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,66 @@
/* global File */

import { record } from './lib/telemetry.js'
import pRetry from 'p-retry'

const fetchMeasurements = async ({ pgPool, maxMeasurements, pid }) => {
let measurements
{
const pgClient = await pgPool.connect()
try {
await pgClient.query('BEGIN ISOLATION LEVEL SERIALIZABLE')

// Select measurements for publishing and lock them
const { rows } = await pgClient.query(`
WITH rows AS (
SELECT id
FROM measurements
WHERE locked_by_pid IS NULL
ORDER BY id
LIMIT $1
)
UPDATE measurements
SET locked_by_pid = $2
WHERE EXISTS (SELECT * FROM rows WHERE measurements.id = rows.id)
RETURNING
id,
zinnia_version,
participant_address,
status_code,
end_at,
inet_group,
car_too_large,
cid
`, [
maxMeasurements,
pid
])
measurements = rows

await pgClient.query('COMMIT')
} catch (err) {
await pgClient.query('ROLLBACK')
throw err
} finally {
pgClient.release()
}
}

return measurements
}

export const publish = async ({
client: pgPool,
web3Storage,
ieContract,
recordTelemetry,
maxMeasurements = 1000,
pid = process.pid,
logger = console
}) => {
// Fetch measurements
const { rows: measurements } = await pgPool.query(`
SELECT
id,
zinnia_version,
participant_address,
status_code,
end_at,
inet_group,
car_too_large,
cid
FROM measurements
LIMIT $1
`, [
maxMeasurements
])
const measurements = await pRetry(
() => fetchMeasurements({ pgPool, maxMeasurements, pid }),
{ retries: 3 }
)

// Fetch the count of all unpublished measurements - we need this for monitoring
// Note: this number will be higher than `measurements.length` because voyager-api adds more
Expand Down Expand Up @@ -60,32 +96,34 @@ export const publish = async ({
// const ieAddMeasurementsDuration = new Date() - start
// logger.log('Measurements added to round', roundIndex.toString())

const pgClient = await pgPool.connect()
try {
await pgClient.query('BEGIN')

// Delete published measurements
await pgClient.query(`
DELETE FROM measurements
WHERE id = ANY($1::bigint[])
`, [
measurements.map(m => m.id)
])

// FIXME: Since we're not publishing to the contract, also don't record any
// commitment
// // Record the commitment for future queries
// // TODO: store also ieContract.address and roundIndex
// await pgClient.query('INSERT INTO commitments (cid, published_at) VALUES ($1, $2)', [
// cid.toString(), new Date()
// ])

await pgClient.query('COMMIT')
} catch (err) {
await pgClient.query('ROLLBACK')
throw err
} finally {
pgClient.release()
{
const pgClient = await pgPool.connect()
try {
await pgClient.query('BEGIN')

// Delete published measurements
await pgClient.query(`
DELETE FROM measurements
WHERE locked_by_pid = $1
`, [
pid
])

// FIXME: Since we're not publishing to the contract, also don't record any
// commitment
// // Record the commitment for future queries
// // TODO: store also ieContract.address and roundIndex
// await pgClient.query('INSERT INTO commitments (cid, published_at) VALUES ($1, $2)', [
// cid.toString(), new Date()
// ])

await pgClient.query('COMMIT')
} catch (err) {
await pgClient.query('ROLLBACK')
throw err
} finally {
pgClient.release()
}
}

await pgPool.query('VACUUM measurements')
Expand All @@ -96,7 +134,7 @@ export const publish = async ({

logger.log('Done!')

record('publish', point => {
recordTelemetry('publish', point => {
// FIXME
// point.intField('round_index', roundIndex)
point.intField('measurements', measurements.length)
Expand Down
6 changes: 5 additions & 1 deletion voyager-publish/lib/telemetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ setInterval(() => {
writeClient.flush().catch(console.error)
}, 10_000).unref()

export const record = (name, fn) => {
export const recordTelemetry = (name, fn) => {
const point = new Point(name)
fn(point)
writeClient.writePoint(point)
}

export const close = () => writeClient.close()

export {
Point
}
51 changes: 42 additions & 9 deletions voyager-publish/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions voyager-publish/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"type": "module",
"scripts": {
"migrate": "node ../bin/migrate.js",
"start": "node bin/voyager-publish.js",
"test": "standard && mocha"
},
Expand All @@ -14,6 +15,7 @@
"@web3-storage/access": "^18.0.3",
"@web3-storage/w3up-client": "^11.0.2",
"ethers": "^6.10.0",
"p-retry": "^6.2.0",
"pg": "^8.11.3"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 7019500

Please sign in to comment.