From 3c50c792e373bf48ddf94df05a7497558e581d8e Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 20 Feb 2024 12:02:18 +0100 Subject: [PATCH 1/2] meridian (wip) --- bin/{spark.js => meridian.js} | 0 index.js | 126 +++++------------------- lib/ie-contract.js | 6 +- lib/round-tracker.js | 74 +++++++------- lib/spark.js | 66 +++++++++++++ lib/voyager.js | 9 ++ migrations/042.do.meridian-platform.sql | 30 ++++++ spark-publish/ie-contract-config.js | 1 - spark-publish/index.js | 26 +---- 9 files changed, 175 insertions(+), 163 deletions(-) rename bin/{spark.js => meridian.js} (100%) create mode 100644 lib/spark.js create mode 100644 lib/voyager.js create mode 100644 migrations/042.do.meridian-platform.sql diff --git a/bin/spark.js b/bin/meridian.js similarity index 100% rename from bin/spark.js rename to bin/meridian.js diff --git a/index.js b/index.js index 970f3ef7..e2356882 100644 --- a/index.js +++ b/index.js @@ -3,8 +3,10 @@ import Sentry from '@sentry/node' import getRawBody from 'raw-body' import assert from 'http-assert' import { validate } from './lib/validate.js' -import { mapRequestToInetGroup } from './lib/inet-grouping.js' -import { satisfies } from 'compare-versions' +import * as spark from './lib/spark.js' +import * as voyager from './lib/voyager.js' + +const moduleImplementations = { spark, voyager } const handler = async (req, res, client, getCurrentRound, domain) => { if (req.headers.host.split(':')[0] !== domain) { @@ -36,13 +38,8 @@ const createMeasurement = async (req, res, client, getCurrentRound) => { const { sparkRoundNumber } = getCurrentRound() const body = await getRawBody(req, { limit: '100kb' }) const measurement = JSON.parse(body) - validate(measurement, 'sparkVersion', { type: 'string', required: false }) + validate(measurement, 'zinniaVersion', { type: 'string', required: false }) - assert( - typeof measurement.sparkVersion === 'string' && satisfies(measurement.sparkVersion, '>=1.9.0'), - 410, 'OUTDATED CLIENT' - ) - // Backwards-compatibility with older clients sending walletAddress instead of participantAddress // We can remove this after enough SPARK clients are running the new version (mid-October 2023) if (!('participantAddress' in measurement) && ('walletAddress' in measurement)) { @@ -50,99 +47,35 @@ const createMeasurement = async (req, res, client, getCurrentRound) => { measurement.participantAddress = measurement.walletAddress delete measurement.walletAddress } - - validate(measurement, 'cid', { type: 'string', required: true }) - validate(measurement, 'providerAddress', { type: 'string', required: true }) - validate(measurement, 'protocol', { type: 'string', required: true }) validate(measurement, 'participantAddress', { type: 'string', required: true }) - validate(measurement, 'timeout', { type: 'boolean', required: false }) - validate(measurement, 'startAt', { type: 'date', required: true }) - validate(measurement, 'statusCode', { type: 'number', required: false }) - validate(measurement, 'firstByteAt', { type: 'date', required: false }) - validate(measurement, 'endAt', { type: 'date', required: false }) - validate(measurement, 'byteLength', { type: 'number', required: false }) - validate(measurement, 'attestation', { type: 'string', required: false }) - validate(measurement, 'carTooLarge', { type: 'boolean', required: false }) - validate(measurement, 'carChecksum', { type: 'string', required: false }) - validate(measurement, 'indexerResult', { type: 'string', required: false }) - const inetGroup = await mapRequestToInetGroup(client, req) + const moduleName = measurement.moduleName || 'spark' + const moduleImplementation = moduleImplementations[moduleName] + assert(moduleImplementation, `Unknown module: ${moduleName}`) + + moduleImplementation.validateMeasurement(measurement) const { rows } = await client.query(` - INSERT INTO measurements ( - spark_version, - zinnia_version, - cid, - provider_address, - protocol, - participant_address, - timeout, - start_at, - status_code, - first_byte_at, - end_at, - byte_length, - attestation, - inet_group, - car_too_large, - car_checksum, - indexer_result, - completed_at_round - ) - VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18 - ) - RETURNING id - `, [ - measurement.sparkVersion, - measurement.zinniaVersion, - measurement.cid, - measurement.providerAddress, - measurement.protocol, - measurement.participantAddress, - measurement.timeout || false, - parseOptionalDate(measurement.startAt), - measurement.statusCode, - parseOptionalDate(measurement.firstByteAt), - parseOptionalDate(measurement.endAt), - measurement.byteLength, - measurement.attestation, - inetGroup, - measurement.carTooLarge ?? false, - measurement.carChecksum, - measurement.indexerResult, - sparkRoundNumber + INSERT INTO measurements (data) + VALUES ($1) + RETURNING id + `, [ + JSON.stringify(moduleImplementation.sanitizeMeasurement(measurement)) ]) + json(res, { id: rows[0].id }) } const getMeasurement = async (req, res, client, measurementId) => { assert(!Number.isNaN(measurementId), 400, 'Invalid RetrievalResult ID') - const { rows: [resultRow] } = await client.query(` - SELECT * - FROM measurements - WHERE id = $1 - `, [ - measurementId - ]) + const { rows: [resultRow] } = await client.query( + `SELECT data FROM measurements WHERE id = $1`, + [measurementId] + ) assert(resultRow, 404, 'Measurement Not Found') json(res, { - id: resultRow.id, - cid: resultRow.cid, - providerAddress: resultRow.provider_address, - protocol: resultRow.protocol, - sparkVersion: resultRow.spark_version, - zinniaVersion: resultRow.zinnia_version, - createdAt: resultRow.created_at, - finishedAt: resultRow.finished_at, - timeout: resultRow.timeout, - startAt: resultRow.start_at, - statusCode: resultRow.status_code, - firstByteAt: resultRow.first_byte_at, - endAt: resultRow.end_at, - byteLength: resultRow.byte_length, - carTooLarge: resultRow.car_too_large, - attestation: resultRow.attestation + ...JSON.parse(resultRow.data), + id: measurementId }) } @@ -291,18 +224,3 @@ export const createHandler = async ({ }) } } - -/** - * Parse a date string field that may be `undefined` or `null`. - * - * - undefined -> undefined - * - null -> undefined - * - "iso-date-string" -> new Date("iso-date-string") - * - * @param {string | null | undefined} str - * @returns {Date | undefined} - */ -const parseOptionalDate = (str) => { - if (str === undefined || str === null) return undefined - return new Date(str) -} diff --git a/lib/ie-contract.js b/lib/ie-contract.js index 7c9e4806..5d5ab453 100644 --- a/lib/ie-contract.js +++ b/lib/ie-contract.js @@ -1,5 +1,5 @@ import { ethers } from 'ethers' -import { IE_CONTRACT_ABI, IE_CONTRACT_ADDRESS, RPC_URL, GLIF_TOKEN } from '../spark-publish/ie-contract-config.js' +import { IE_CONTRACT_ABI, RPC_URL, GLIF_TOKEN } from '../spark-publish/ie-contract-config.js' const provider = new ethers.providers.JsonRpcProvider({ url: RPC_URL, @@ -11,8 +11,8 @@ const provider = new ethers.providers.JsonRpcProvider({ // Uncomment for troubleshooting // provider.on('debug', d => console.log('[ethers:debug] %s\nrequest: %o\nresponse: %o', d.action, d.request, d.response)) -export const createMeridianContract = async () => new ethers.Contract( - IE_CONTRACT_ADDRESS, +export const createMeridianContract = address => new ethers.Contract( + address, IE_CONTRACT_ABI, provider ) diff --git a/lib/round-tracker.js b/lib/round-tracker.js index 50c7fec3..7b0cb10d 100644 --- a/lib/round-tracker.js +++ b/lib/round-tracker.js @@ -16,48 +16,53 @@ export const MAX_TASKS_PER_NODE = 15 /** * @param {import('pg').Pool} pgPool * @returns {() => { - * sparkRoundNumber: bigint; + * moduleRoundNumber: bigint; * meridianContractAddress: string; * meridianRoundIndex: bigint; * }} */ export async function createRoundGetter (pgPool) { - const contract = await createMeridianContract() - - let sparkRoundNumber, meridianContractAddress, meridianRoundIndex - - const updateSparkRound = async (newRoundIndex) => { - meridianRoundIndex = BigInt(newRoundIndex) - meridianContractAddress = contract.address + const { rows: modules } = await client.query( + 'SELECT id, contract_address AS contractAddress FROM modules' + ) - const pgClient = await pgPool.connect() - try { - await pgClient.query('BEGIN') - sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ - meridianContractAddress, - meridianRoundIndex, - pgClient - }) - await pgClient.query('COMMIT') - console.log('SPARK round started: %s', sparkRoundNumber) - } catch (err) { - await pgClient.query('ROLLBACK') - } finally { - pgClient.release() + for (const mod of modules) { + const contract = createMeridianContract(mod.contractAddress) + let moduleRoundNumber, meridianRoundIndex + + const updateModuleRound = async (newRoundIndex) => { + meridianRoundIndex = BigInt(newRoundIndex) + + const pgClient = await pgPool.connect() + try { + await pgClient.query('BEGIN') + moduleRoundNumber = await mapCurrentMeridianRoundToModuleRound({ + moduleId: mod.id, + moduleContractAddress: mod.contractAddress, + meridianRoundIndex, + pgClient + }) + await pgClient.query('COMMIT') + console.log('%s round started: %s', mod.name, moduleRoundNumber) + } catch (err) { + await pgClient.query('ROLLBACK') + } finally { + pgClient.release() + } } - } - contract.on('RoundStart', (newRoundIndex) => { - updateSparkRound(newRoundIndex).catch(err => { - console.error('Cannot handle RoundStart:', err) - Sentry.captureException(err) + contract.on('RoundStart', (newRoundIndex) => { + updateModuleRound(newRoundIndex).catch(err => { + console.error('Cannot handle RoundStart:', err) + Sentry.captureException(err) + }) }) - }) - - await updateSparkRound(await contract.currentRoundIndex()) + await updateModuleRound(await contract.currentRoundIndex()) + } + return () => ({ - sparkRoundNumber, + moduleRoundNumber, meridianContractAddress, meridianRoundIndex }) @@ -132,15 +137,16 @@ LIMIT 1 ``` */ -export async function mapCurrentMeridianRoundToSparkRound ({ +export async function mapCurrentMeridianRoundToModuleRound ({ + moduleId, meridianContractAddress, meridianRoundIndex, pgClient }) { - let sparkRoundNumber + let moduleRoundNumber const { rows: [contractVersionOfPreviousSparkRound] } = await pgClient.query( - 'SELECT * FROM meridian_contract_versions ORDER BY last_spark_round_number DESC LIMIT 1' + 'SELECT * FROM meridian_contract_versions ORDER BY last_module_round_number DESC LIMIT 1' ) // More events coming from the same meridian contract diff --git a/lib/spark.js b/lib/spark.js new file mode 100644 index 00000000..3f54f384 --- /dev/null +++ b/lib/spark.js @@ -0,0 +1,66 @@ +import { validate } from './lib/validate.js' +import { satisfies } from 'compare-versions' +import assert from 'http-assert' + +export const validateMeasurement = measurement => { + validate(measurement, 'sparkVersion', { type: 'string', required: false }) + assert( + typeof measurement.sparkVersion === 'string' && satisfies(measurement.sparkVersion, '>=1.9.0'), + 410, 'OUTDATED CLIENT' + ) + + validate(measurement, 'cid', { type: 'string', required: true }) + validate(measurement, 'providerAddress', { type: 'string', required: true }) + validate(measurement, 'protocol', { type: 'string', required: true }) + + validate(measurement, 'timeout', { type: 'boolean', required: false }) + validate(measurement, 'startAt', { type: 'date', required: true }) + validate(measurement, 'statusCode', { type: 'number', required: false }) + validate(measurement, 'firstByteAt', { type: 'date', required: false }) + validate(measurement, 'endAt', { type: 'date', required: false }) + validate(measurement, 'byteLength', { type: 'number', required: false }) + validate(measurement, 'attestation', { type: 'string', required: false }) + validate(measurement, 'carTooLarge', { type: 'boolean', required: false }) + validate(measurement, 'carChecksum', { type: 'string', required: false }) + validate(measurement, 'indexerResult', { type: 'string', required: false }) +} + +export const sanitizeMeasurement = ({ + measurement, + sparkRoundNumber, + inetGroup +}) => ({ + sparkVersion: measurement.sparkVersion, + zinniaVersion: measurement.zinniaVersion, + cid: measurement.cid, + providerAddress: measurement.providerAddress, + protocol: measurement.protocol, + participantAddress: measurement.participantAddress, + timeout: measurement.timeout || false, + startAt: parseOptionalDate(measurement.startAt), + statusCode: measurement.statusCode, + firstByteAt: parseOptionalDate(measurement.firstByteAt), + endAt: parseOptionalDate(measurement.endAt), + byteLength: measurement.byteLength, + attestation: measurement.attestation, + inetGroup, + carTooLarge: measurement.carTooLarge ?? false, + carChecksum: measurement.carChecksum, + indexerResult: measurement.indexerResult, + sparkRoundNumber +}) + +/** + * Parse a date string field that may be `undefined` or `null`. + * + * - undefined -> undefined + * - null -> undefined + * - "iso-date-string" -> new Date("iso-date-string") + * + * @param {string | null | undefined} str + * @returns {Date | undefined} + */ +const parseOptionalDate = (str) => { + if (str === undefined || str === null) return undefined + return new Date(str) +} diff --git a/lib/voyager.js b/lib/voyager.js new file mode 100644 index 00000000..0806dfc4 --- /dev/null +++ b/lib/voyager.js @@ -0,0 +1,9 @@ +import assert from 'http-assert' + +export const validateMeasurement = measurement => { + assert.fail('Not implemented') +} + +export const sanitizeMeasurement = () => { + assert.fail('Not implemented') +} diff --git a/migrations/042.do.meridian-platform.sql b/migrations/042.do.meridian-platform.sql new file mode 100644 index 00000000..cb97c571 --- /dev/null +++ b/migrations/042.do.meridian-platform.sql @@ -0,0 +1,30 @@ +TRUNCATE TABLE measurements; + +ALTER TABLE measurements + ADD COLUMN data TYPE TEXT + DROP COLUMN participant_address, + DROP COLUMN finished_at, + DROP COLUMN start_at, + DROP COLUMN status_code, + DROP COLUMN first_byte_at, + DROP COLUMN end_at, + DROP COLUMN byte_length, + DROP COLUMN timeout, + DROP COLUMN attestation, + DROP COLUMN completed_at_round, + DROP COLUMN spark_version, + DROP COLUMN zinnia_version, + DROP COLUMN cid, + DROP COLUMN provider_address, + DROP COLUMN protocol, + DROP COLUMN inet_group, + DROP COLUMN car_too_large; + +CREATE TABLE modules ( + id SERIAL NOT NULL PRIMARY KEY, + name TEXT NOT NULL, + slug TEXT NOT NULL, + contract_address TEXT NOT NULL +); +INSERT INTO modules (name, slug, contract_address) VALUES ('SPARK', 'spark', '0x8460766edc62b525fc1fa4d628fc79229dc73031'); +INSERT INTO modules (name, slug, contract_address) VALUES ('Voyager', 'voyager', '0xc524b83bf85021e674a7c9f18f5381179fabaf6c'); diff --git a/spark-publish/ie-contract-config.js b/spark-publish/ie-contract-config.js index 0673f342..d21a4c69 100644 --- a/spark-publish/ie-contract-config.js +++ b/spark-publish/ie-contract-config.js @@ -2,7 +2,6 @@ import fs from 'node:fs/promises' import { fileURLToPath } from 'node:url' const { - IE_CONTRACT_ADDRESS = '0x8460766Edc62B525fc1FA4D628FC79229dC73031', RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1', GLIF_TOKEN } = process.env diff --git a/spark-publish/index.js b/spark-publish/index.js index a2e3e4f3..24420046 100644 --- a/spark-publish/index.js +++ b/spark-publish/index.js @@ -11,26 +11,7 @@ export const publish = async ({ }) => { // Fetch measurements const { rows: measurements } = await pgPool.query(` - SELECT - id, - spark_version, - zinnia_version, - participant_address, - finished_at, - timeout, - start_at, - status_code, - first_byte_at, - end_at, - byte_length, - attestation, - inet_group, - car_too_large, - car_checksum, - indexer_result, - cid, - provider_address, - protocol + SELECT id, data FROM measurements LIMIT $1 `, [ @@ -49,7 +30,10 @@ export const publish = async ({ // Share measurements let start = new Date() const file = new File( - [measurements.map(m => JSON.stringify(m)).join('\n')], + [measurements.map(m => JSON.stringify({ + ...JSON.parse(m.data), + id: m.id, + })).join('\n')], 'measurements.ndjson', { type: 'application/json' } ) From 90f9ed7cf27416aed82b8f00832ed3b450e9ac89 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Wed, 21 Feb 2024 18:16:34 +0100 Subject: [PATCH 2/2] progress --- bin/meridian.js | 6 +- index.js | 33 +++-- lib/round-tracker.js | 159 +++++++++++++----------- migrations/042.do.meridian-platform.sql | 5 +- 4 files changed, 112 insertions(+), 91 deletions(-) diff --git a/bin/meridian.js b/bin/meridian.js index a1081e2d..a36c764a 100644 --- a/bin/meridian.js +++ b/bin/meridian.js @@ -62,8 +62,10 @@ await migrate(client) const getCurrentRound = await createRoundGetter(client) const round = getCurrentRound() -assert(!!round, 'cannot obtain the current Spark round number') -console.log('SPARK round number at service startup:', round.sparkRoundNumber) +assert(!!round, 'cannot obtain the current module round numbers') +for (const [moduleId, moduleRoundNumber] of round.moduleRoundNumbers.entries()) { + console.log('%s round number at service startup: %s', moduleId, moduleRoundNumber) +} const logger = { error: console.error, diff --git a/index.js b/index.js index e2356882..fa90b3cd 100644 --- a/index.js +++ b/index.js @@ -6,7 +6,10 @@ import { validate } from './lib/validate.js' import * as spark from './lib/spark.js' import * as voyager from './lib/voyager.js' -const moduleImplementations = { spark, voyager } +const moduleImplementations = { + 0: spark, + 1: voyager +} const handler = async (req, res, client, getCurrentRound, domain) => { if (req.headers.host.split(':')[0] !== domain) { @@ -24,8 +27,10 @@ const handler = async (req, res, client, getCurrentRound, domain) => { } else if (segs[0] === 'measurements' && req.method === 'GET') { await getMeasurement(req, res, client, Number(segs[1])) } else if (segs[0] === 'rounds' && segs[1] === 'meridian' && req.method === 'GET') { + // TODO: Add moduleId await getMeridianRoundDetails(req, res, client, segs[2], segs[3]) } else if (segs[0] === 'rounds' && req.method === 'GET') { + // TODO: Add moduleId await getRoundDetails(req, res, client, getCurrentRound, segs[1]) } else if (segs[0] === 'inspect-request' && req.method === 'GET') { await inspectRequest(req, res) @@ -35,7 +40,6 @@ const handler = async (req, res, client, getCurrentRound, domain) => { } const createMeasurement = async (req, res, client, getCurrentRound) => { - const { sparkRoundNumber } = getCurrentRound() const body = await getRawBody(req, { limit: '100kb' }) const measurement = JSON.parse(body) @@ -48,19 +52,22 @@ const createMeasurement = async (req, res, client, getCurrentRound) => { delete measurement.walletAddress } validate(measurement, 'participantAddress', { type: 'string', required: true }) + validate(measurement.moduleId, { type: 'number', required: false }) - const moduleName = measurement.moduleName || 'spark' - const moduleImplementation = moduleImplementations[moduleName] - assert(moduleImplementation, `Unknown module: ${moduleName}`) + const moduleId = measurement.moduleId || 0 + const moduleImplementation = moduleImplementations[moduleId] + assert(moduleImplementation, `Unknown moduleId: ${moduleId}`) moduleImplementation.validateMeasurement(measurement) const { rows } = await client.query(` - INSERT INTO measurements (data) - VALUES ($1) + INSERT INTO measurements (module_id, data, completed_at_round) + VALUES ($1, $2, $3) RETURNING id `, [ - JSON.stringify(moduleImplementation.sanitizeMeasurement(measurement)) + moduleId, + JSON.stringify(moduleImplementation.sanitizeMeasurement(measurement)), + getCurrentRound().moduleRoundNumbers.get(moduleId) ]) json(res, { id: rows[0].id }) @@ -69,19 +76,21 @@ const createMeasurement = async (req, res, client, getCurrentRound) => { const getMeasurement = async (req, res, client, measurementId) => { assert(!Number.isNaN(measurementId), 400, 'Invalid RetrievalResult ID') const { rows: [resultRow] } = await client.query( - `SELECT data FROM measurements WHERE id = $1`, + `SELECT module_id, data, completed_at_round FROM measurements WHERE id = $1`, [measurementId] ) assert(resultRow, 404, 'Measurement Not Found') json(res, { ...JSON.parse(resultRow.data), - id: measurementId + id: measurementId, + moduleId: resultRow.module_id, + moduleRound: resultRow.completed_at_round }) } -const getRoundDetails = async (req, res, client, getCurrentRound, roundParam) => { +const getRoundDetails = async (req, res, client, getCurrentRound, roundParam, moduleId) => { if (roundParam === 'current') { - const { meridianContractAddress, meridianRoundIndex } = getCurrentRound() + const { meridianContractAddresses, meridianRoundIndexes } = getCurrentRound() const addr = encodeURIComponent(meridianContractAddress) const idx = encodeURIComponent(meridianRoundIndex) const location = `/rounds/meridian/${addr}/${idx}` diff --git a/lib/round-tracker.js b/lib/round-tracker.js index 7b0cb10d..bb89f6b1 100644 --- a/lib/round-tracker.js +++ b/lib/round-tracker.js @@ -1,47 +1,50 @@ import Sentry from '@sentry/node' import { createMeridianContract } from './ie-contract.js' -// The number of tasks per round is proportionate to the SPARK round length - longer rounds require +// The number of tasks per round is proportionate to the module round length - longer rounds require // more tasks per round. // -// See https://www.notion.so/pl-strflt/SPARK-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643 +// See https://www.notion.so/pl-strflt/module-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643 // for more details, this constant represents TC (tasks per committee). // // We will need to tweak this value based on measurements; that's why I put it here as a constant. export const TASKS_PER_ROUND = 1000 -// How many tasks is each SPARK checker node expected to complete every round (at most). +// How many tasks is each module node expected to complete every round (at most). export const MAX_TASKS_PER_NODE = 15 /** * @param {import('pg').Pool} pgPool * @returns {() => { - * moduleRoundNumber: bigint; - * meridianContractAddress: string; - * meridianRoundIndex: bigint; + * moduleRoundNumberes: Map; + * meridianContractAddress: Map; + * meridianRoundIndex: Map; * }} */ export async function createRoundGetter (pgPool) { const { rows: modules } = await client.query( 'SELECT id, contract_address AS contractAddress FROM modules' ) + const moduleRoundNumbers = new Map() + const meridianContractAddresses = new Map() + const meridianRoundIndexes = new Map() for (const mod of modules) { + meridianContractAddresses.set(mod.id, mod.contractAddress) const contract = createMeridianContract(mod.contractAddress) - let moduleRoundNumber, meridianRoundIndex const updateModuleRound = async (newRoundIndex) => { - meridianRoundIndex = BigInt(newRoundIndex) + meridianRoundIndexes.set(mod.id, BigInt(newRoundIndex)) const pgClient = await pgPool.connect() try { await pgClient.query('BEGIN') - moduleRoundNumber = await mapCurrentMeridianRoundToModuleRound({ + moduleRoundNumbers.set(mod.id, await mapCurrentMeridianRoundToModuleRound({ moduleId: mod.id, moduleContractAddress: mod.contractAddress, - meridianRoundIndex, + meridianRoundIndex: meridianRoundIndexes.get(mod.id), pgClient - }) + })) await pgClient.query('COMMIT') console.log('%s round started: %s', mod.name, moduleRoundNumber) } catch (err) { @@ -62,9 +65,9 @@ export async function createRoundGetter (pgPool) { } return () => ({ - moduleRoundNumber, - meridianContractAddress, - meridianRoundIndex + moduleRoundNumbers, + meridianContractAddresses, + meridianRoundIndexes }) } @@ -72,13 +75,13 @@ export async function createRoundGetter (pgPool) { There are three cases we need to handle: 1. Business as usual - the IE contract advanced the round by one -2. Fresh start, e.g. a new spark-api instance is deployed, or we deploy this PR to an existing instance. +2. Fresh start, e.g. a new meridian-api instance is deployed, or we deploy this PR to an existing instance. 3. Upgrade of the IE contract For each IE version (defined as the smart contract address), we are keeping track of three fields: - `contractAddress` -- `sparkRoundOffset` -- `lastSparkRoundNumber` +- `moduleRoundOffset` +- `lastmoduleRoundNumber` Whenever a new IE round is started, we know the current IE round number (`meridianRoundIndex`) @@ -86,53 +89,53 @@ Let me explain how are the different cases handled. **Business as usual** -We want to map IE round number to SPARK round number. This assumes we have already initialised our +We want to map IE round number to module round number. This assumes we have already initialised our DB for the current IE contract version we are working with. ``` -sparkRoundNumber = meridianRoundIndex + sparkRoundOffset +moduleRoundNumber = meridianRoundIndex + moduleRoundOffset ``` -For example, if we observe IE round 123, then `sparkRoundOffset` is `-122` and we calculate the -spark round as `123 + (-122) = 1`. +For example, if we observe IE round 123, then `moduleRoundOffset` is `-122` and we calculate the +module round as `123 + (-122) = 1`. We update the record for the current IE contract address -to set `last_spark_round_number = sparkRoundNumber`. +to set `last_module_round_number = moduleRoundNumber`. **Fresh start** -There is no record in our DB. We want to map the current IE round number to SPARK round 1. Also, we -want to setup `sparkRoundOffset` so that the algorithm above produces correct SPARK round numbers. +There is no record in our DB. We want to map the current IE round number to module round 1. Also, we +want to setup `moduleRoundOffset` so that the algorithm above produces correct module round numbers. ``` -sparkRoundNumber = 1 -sparkRoundOffset = sparkRoundNumber - meridianRoundIndex +moduleRoundNumber = 1 +moduleRoundOffset = moduleRoundNumber - meridianRoundIndex ``` -We insert a new record to our DB with the address of the current IE contract, `sparkRoundOffset`, -and `last_spark_round_number = sparkRoundNumber`. +We insert a new record to our DB with the address of the current IE contract, `moduleRoundOffset`, +and `last_module_round_number = moduleRoundNumber`. **Upgrading IE contract** -We have one or more existing records in our DB. We know what is the last SPARK round that we -calculated from the previous version of the IE contract (`lastSparkRoundNumber`). We also know what +We have one or more existing records in our DB. We know what is the last module round that we +calculated from the previous version of the IE contract (`lastmoduleRoundNumber`). We also know what is the round number of the new IE contract. ``` -sparkRoundNumber = lastSparkRoundNumber + 1 -sparkRoundOffset = sparkRoundNumber - meridianRoundIndex +moduleRoundNumber = lastmoduleRoundNumber + 1 +moduleRoundOffset = moduleRoundNumber - meridianRoundIndex ``` -We insert a new record to our DB with the address of the current IE contract, `sparkRoundOffset`, -and `last_spark_round_number = sparkRoundNumber`. +We insert a new record to our DB with the address of the current IE contract, `moduleRoundOffset`, +and `last_module_round_number = moduleRoundNumber`. -If you are wondering how to find out what is the last SPARK round that we calculated from the +If you are wondering how to find out what is the last module round that we calculated from the previous version of the IE contract - we can easily find it in our DB: ```sql -SELECT last_spark_round_number +SELECT last_module_round_number FROM meridian_contract_versions -ORDER BY last_spark_round_number DESC +ORDER BY last_module_round_number DESC LIMIT 1 ``` */ @@ -145,88 +148,96 @@ export async function mapCurrentMeridianRoundToModuleRound ({ }) { let moduleRoundNumber - const { rows: [contractVersionOfPreviousSparkRound] } = await pgClient.query( - 'SELECT * FROM meridian_contract_versions ORDER BY last_module_round_number DESC LIMIT 1' - ) + const { rows: [contractVersionOfPreviousModuleRound] } = await pgClient.query(` + SELECT * FROM meridian_contract_versions + WHERE module_id = $1 + ORDER BY last_module_round_number DESC + LIMIT 1 + `, [moduleId]) // More events coming from the same meridian contract - if (contractVersionOfPreviousSparkRound?.contract_address === meridianContractAddress) { - sparkRoundNumber = BigInt(contractVersionOfPreviousSparkRound.spark_round_offset) + meridianRoundIndex + if (contractVersionOfPreviousModuleRound?.contract_address === meridianContractAddress) { + moduleRoundNumber = BigInt(contractVersionOfPreviousModuleRound.module_round_offset) + meridianRoundIndex await pgClient.query( - 'UPDATE meridian_contract_versions SET last_spark_round_number = $1 WHERE contract_address = $2', - [sparkRoundNumber, meridianContractAddress] + 'UPDATE meridian_contract_versions SET last_module_round_number = $1 WHERE contract_address = $2', + [moduleRoundNumber, meridianContractAddress] ) - console.log('Mapped %s IE round index %s to SPARK round number %s', + console.log('Mapped %s IE round index %s to module round number %s', meridianContractAddress, meridianRoundIndex, - sparkRoundNumber + moduleRoundNumber ) } else { - // We are running for the first time and need to map the meridian round to spark round 1 + // We are running for the first time and need to map the meridian round to module round 1 // Or the contract address has changed - const lastSparkRoundNumber = BigInt(contractVersionOfPreviousSparkRound?.last_spark_round_number ?? 0) - sparkRoundNumber = lastSparkRoundNumber + 1n - const sparkRoundOffset = sparkRoundNumber - meridianRoundIndex + const lastmoduleRoundNumber = BigInt(contractVersionOfPreviousModuleRound?.last_module_round_number ?? 0) + moduleRoundNumber = lastmoduleRoundNumber + 1n + const moduleRoundOffset = moduleRoundNumber - meridianRoundIndex // TODO(bajtos) If we are were are reverting back to a contract address (version) we were // using sometime in the past, the query above will fail. We can fix the problem and support // this edge case by telling Postgres to ignore conflicts (`ON CONFLICT DO NOTHING)` await pgClient.query(` INSERT INTO meridian_contract_versions - (contract_address, spark_round_offset, last_spark_round_number, first_spark_round_number) - VALUES ($1, $2, $3, $3) + (contract_address, module_round_offset, last_module_round_number, first_module_round_number, module_id) + VALUES ($1, $2, $3, $3, $4) `, [ meridianContractAddress, - sparkRoundOffset, - sparkRoundNumber + moduleRoundOffset, + moduleRoundNumber, + moduleId ]) console.log( - 'Upgraded meridian contract from %s to %s, mapping IE round index %s to SPARK round number %s', - contractVersionOfPreviousSparkRound?.contract_address ?? '', + 'Upgraded meridian contract from %s to %s, mapping IE round index %s to module round number %s', + contractVersionOfPreviousModuleRound?.contract_address ?? '', meridianContractAddress, meridianRoundIndex, - sparkRoundNumber + moduleRoundNumber ) } - await maybeCreateSparkRound(pgClient, { sparkRoundNumber, meridianContractAddress, meridianRoundIndex }) + await maybeCreateModuleRound(pgClient, { moduleRoundNumber, meridianContractAddress, meridianRoundIndex, moduleId }) - return sparkRoundNumber + return moduleRoundNumber } -export async function maybeCreateSparkRound (pgClient, { - sparkRoundNumber, +export async function maybeCreateModuleRound (pgClient, { + moduleRoundNumber, meridianContractAddress, - meridianRoundIndex + meridianRoundIndex, + moduleId }) { const { rowCount } = await pgClient.query(` - INSERT INTO spark_rounds - (id, created_at, meridian_address, meridian_round, max_tasks_per_node) - VALUES ($1, now(), $2, $3, $4) + INSERT INTO module_round + (id, created_at, meridian_address, meridian_round, max_tasks_per_node, module_id) + VALUES ($1, now(), $2, $3, $4, $5) ON CONFLICT DO NOTHING `, [ - sparkRoundNumber, + moduleRoundNumber, meridianContractAddress, meridianRoundIndex, - MAX_TASKS_PER_NODE + MAX_TASKS_PER_NODE, + moduleId ]) if (rowCount) { - // We created a new SPARK round. Let's define retrieval tasks for this new round. + // We created a new module round. Let's define retrieval tasks for this new round. // This is a short- to medium-term solution until we move to fully decentralized tasking - await defineTasksForRound(pgClient, sparkRoundNumber) + await defineTasksForRound(pgClient, moduleRoundNumber, moduleId) } } -async function defineTasksForRound (pgClient, sparkRoundNumber) { +async function defineTasksForRound (pgClient, moduleRoundNumber, moduleId) { await pgClient.query(` - INSERT INTO retrieval_tasks (round_id, cid, provider_address, protocol) - SELECT $1 as round_id, cid, provider_address, protocol + INSERT INTO retrieval_tasks (round_id, cid, provider_address, protocol, module_id) + SELECT $1 as round_id, cid, provider_address, protocol, module_id FROM retrieval_templates + WHERE module_id = $3 ORDER BY random() LIMIT $2; `, [ - sparkRoundNumber, - TASKS_PER_ROUND + moduleRoundNumber, + TASKS_PER_ROUND, + moduleId ]) } diff --git a/migrations/042.do.meridian-platform.sql b/migrations/042.do.meridian-platform.sql index cb97c571..8fd3dac0 100644 --- a/migrations/042.do.meridian-platform.sql +++ b/migrations/042.do.meridian-platform.sql @@ -23,8 +23,7 @@ ALTER TABLE measurements CREATE TABLE modules ( id SERIAL NOT NULL PRIMARY KEY, name TEXT NOT NULL, - slug TEXT NOT NULL, contract_address TEXT NOT NULL ); -INSERT INTO modules (name, slug, contract_address) VALUES ('SPARK', 'spark', '0x8460766edc62b525fc1fa4d628fc79229dc73031'); -INSERT INTO modules (name, slug, contract_address) VALUES ('Voyager', 'voyager', '0xc524b83bf85021e674a7c9f18f5381179fabaf6c'); +INSERT INTO modules (name, slug, contract_address) VALUES ('SPARK', '0x8460766edc62b525fc1fa4d628fc79229dc73031'); +INSERT INTO modules (name, slug, contract_address) VALUES ('Voyager', '0xc524b83bf85021e674a7c9f18f5381179fabaf6c');