diff --git a/docker/docker-compose.dev.postgres.yml b/docker/docker-compose.dev.postgres.yml index 188c6075e7..5db7be8a03 100644 --- a/docker/docker-compose.dev.postgres.yml +++ b/docker/docker-compose.dev.postgres.yml @@ -9,3 +9,7 @@ services: POSTGRES_PASSWORD: postgres POSTGRES_DB: stacks_blockchain_api POSTGRES_PORT: 5432 + command: > + -c work_mem=256MB + -c maintenance_work_mem=256MB + -c max_wal_size=1GB diff --git a/src/datastore/event-requests.ts b/src/datastore/event-requests.ts index 45f68eae5b..e914b4d47c 100644 --- a/src/datastore/event-requests.ts +++ b/src/datastore/event-requests.ts @@ -5,6 +5,7 @@ import { connectPostgres, PgServer } from './connection'; import { connectPgPool, connectWithRetry } from './connection-legacy'; import * as pgCopyStreams from 'pg-copy-streams'; import * as PgCursor from 'pg-cursor'; +import * as readline from 'readline'; export async function exportRawEventRequests(targetStream: Writable): Promise { const pool = await connectPgPool({ @@ -28,94 +29,23 @@ export async function exportRawEventRequests(targetStream: Writable): Promise void -): AsyncGenerator { - // 1. Pipe input stream into a temp table - // 2. Use `pg-cursor` to async read rows from temp table (order by `id` ASC) - // 3. Drop temp table - // 4. Close db connection - const pool = await connectPgPool({ - usageName: 'get-raw-events', - pgServer: PgServer.primary, + readStream: Readable +): AsyncGenerator { + const rl = readline.createInterface({ + input: readStream, + crlfDelay: Infinity, }); try { - const client = await pool.connect(); - try { - await client.query('BEGIN'); - await client.query(` - CREATE TEMPORARY TABLE temp_event_observer_requests( - id bigint PRIMARY KEY, - receive_timestamp timestamptz NOT NULL, - event_path text NOT NULL, - payload jsonb NOT NULL - ) ON COMMIT DROP - `); - // Use a `temp_raw_tsv` table first to store the raw TSV data as it might come with duplicate - // rows which would trigger the `PRIMARY KEY` constraint in `temp_event_observer_requests`. - // We will "upsert" from the former to the latter before event ingestion. - await client.query(` - CREATE TEMPORARY TABLE temp_raw_tsv - (LIKE temp_event_observer_requests) - ON COMMIT DROP - `); - onStatusUpdate?.('Importing raw event requests into temporary table...'); - const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`)); - await pipelineAsync(readStream, importStream); - onStatusUpdate?.('Removing any duplicate raw event requests...'); - await client.query(` - INSERT INTO temp_event_observer_requests - SELECT * - FROM temp_raw_tsv - ON CONFLICT DO NOTHING; - `); - const totallengthQuery = await client.query<{ count: string }>( - `SELECT COUNT(id) count FROM temp_event_observer_requests` - ); - const totallength = parseInt(totallengthQuery.rows[0].count); - let lastStatusUpdatePercent = 0; - onStatusUpdate?.('Streaming raw event requests from temporary table...'); - const cursor = new PgCursor<{ id: string; event_path: string; payload: string }>( - ` - SELECT id, event_path, payload::text - FROM temp_event_observer_requests - ORDER BY id ASC - ` - ); - const cursorQuery = client.query(cursor); - const rowBatchSize = 100; - let rowsReadCount = 0; - let rows: DbRawEventRequest[] = []; - do { - rows = await new Promise((resolve, reject) => { - cursorQuery.read(rowBatchSize, (error, rows) => { - if (error) { - reject(error); - } else { - rowsReadCount += rows.length; - if ((rowsReadCount / totallength) * 100 > lastStatusUpdatePercent + 1) { - lastStatusUpdatePercent = Math.floor((rowsReadCount / totallength) * 100); - onStatusUpdate?.( - `Raw event requests processed: ${lastStatusUpdatePercent}% (${rowsReadCount} / ${totallength})` - ); - } - resolve(rows); - } - }); - }); - if (rows.length > 0) { - yield rows; - } - } while (rows.length > 0); - await client.query('COMMIT'); - } catch (error) { - await client.query('ROLLBACK'); - throw error; - } finally { - client.release(); + for await (const line of rl) { + const columns = line.split('\t'); + const rawRequest: DbRawEventRequest = { + event_path: columns[2], + payload: columns[3], + }; + yield rawRequest; } } finally { - await pool.end(); + rl.close(); } } diff --git a/src/event-replay/event-replay.ts b/src/event-replay/event-replay.ts index 478e1eccea..bd9818d7cb 100644 --- a/src/event-replay/event-replay.ts +++ b/src/event-replay/event-replay.ts @@ -134,38 +134,44 @@ export async function importEventsFromTsv( // Import TSV chain data const readStream = fs.createReadStream(resolvedFilePath); - const rawEventsIterator = getRawEventRequests(readStream, status => { - console.log(status); - }); + const rawEventsIterator = getRawEventRequests(readStream); // Set logger to only output for warnings/errors, otherwise the event replay will result // in the equivalent of months/years of API log output. logger.level = 'warn'; // The current import block height. Will be updated with every `/new_block` event. let blockHeight = 0; + let lastStatusUpdatePercent = 0; const responses = []; - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - if (eventImportMode === EventImportMode.pruned) { - if (blockHeight === prunedBlockHeight) { - console.log(`Resuming prunable event import...`); - } + for await (const rawEvent of rawEventsIterator) { + if (eventImportMode === EventImportMode.pruned) { + if (blockHeight === prunedBlockHeight) { + console.log(`Resuming prunable event import...`); } - const response = await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - if (rawEvent.event_path === '/new_block') { - blockHeight = await getDbBlockHeight(db); - if (blockHeight && blockHeight % 1000 === 0) { + } + const response = await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); + if (rawEvent.event_path === '/new_block') { + blockHeight = await getDbBlockHeight(db); + if (blockHeight) { + if (blockHeight % 1000 === 0) { console.log(`Event file block height reached: ${blockHeight}`); } + const percentProgress = (blockHeight / tsvBlockHeight) * 100; + if (percentProgress > lastStatusUpdatePercent + 1) { + lastStatusUpdatePercent = Math.floor(percentProgress); + console.log( + `Blocks processed: ${lastStatusUpdatePercent}% (${blockHeight} / ${tsvBlockHeight})` + ); + } } - responses.push(response); } + responses.push(response); } await db.finishEventReplay(); console.log(`Event import and playback successful.`); diff --git a/src/tests-event-replay/import-export-tests.ts b/src/tests-event-replay/import-export-tests.ts index 3ff2da9e70..20c8a48e45 100644 --- a/src/tests-event-replay/import-export-tests.ts +++ b/src/tests-event-replay/import-export-tests.ts @@ -170,21 +170,19 @@ describe('IBD', () => { return [eventServer, eventServer.closeAsync] as const; }, async (rawEventsIterator, eventServer) => { - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - ibdRoutesVisited.add(rawEvent.event_path); - const response = await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - if (ibdRoutes.includes(rawEvent.event_path)) { - expect(response.statusCode).toBe(200); - expect(response.response).toBe('IBD mode active.'); - } + for await (const rawEvent of rawEventsIterator) { + ibdRoutesVisited.add(rawEvent.event_path); + const response = await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); + if (ibdRoutes.includes(rawEvent.event_path)) { + expect(response.statusCode).toBe(200); + expect(response.response).toBe('IBD mode active.'); } } } @@ -214,27 +212,25 @@ describe('IBD', () => { return [eventServer, eventServer.closeAsync] as const; }, async (rawEventsIterator, eventServer) => { - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - ibdRoutesVisited.add(rawEvent.event_path); - const response = await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - if (ibdRoutes.includes(rawEvent.event_path)) { - const chainTip = await db.getChainTip(client, false); - const ibdThreshold = Number.parseInt(process.env.IBD_MODE_UNTIL_BLOCK as string); - if (chainTip.blockHeight < ibdThreshold) { - expect(response.statusCode).toBe(200); - expect(response.response).toBe('IBD mode active.'); - } else { - expect(response.statusCode).toBe(200); - expect(response.response).not.toBe('IBD mode active.'); - } + for await (const rawEvent of rawEventsIterator) { + ibdRoutesVisited.add(rawEvent.event_path); + const response = await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); + if (ibdRoutes.includes(rawEvent.event_path)) { + const chainTip = await db.getChainTip(client, false); + const ibdThreshold = Number.parseInt(process.env.IBD_MODE_UNTIL_BLOCK as string); + if (chainTip.blockHeight < ibdThreshold) { + expect(response.statusCode).toBe(200); + expect(response.response).toBe('IBD mode active.'); + } else { + expect(response.statusCode).toBe(200); + expect(response.response).not.toBe('IBD mode active.'); } } } diff --git a/src/tests-event-replay/raw-event-request-tests.ts b/src/tests-event-replay/raw-event-request-tests.ts index 0dedaf8edf..4ef1a3d2a7 100644 --- a/src/tests-event-replay/raw-event-request-tests.ts +++ b/src/tests-event-replay/raw-event-request-tests.ts @@ -55,31 +55,29 @@ describe('Events table', () => { return [eventServer, eventServer.closeAsync] as const; }, async (rawEventsIterator, eventServer) => { - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - try { - if (rawEvent.event_path === '/new_block') { - const payloadJson = JSON.parse(rawEvent.payload); - payloadJson.transactions = undefined; - rawEvent.payload = JSON.stringify(payloadJson); - } - } catch (error) {} - const rawEventRequestCountResultBefore = await getRawEventCount(); - const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0]; - const response = await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: false, - }); + for await (const rawEvent of rawEventsIterator) { + try { if (rawEvent.event_path === '/new_block') { - expect(response.statusCode).toBe(500); - const rawEventRequestCountResultAfter = await getRawEventCount(); - const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0]; - expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter); + const payloadJson = JSON.parse(rawEvent.payload); + payloadJson.transactions = undefined; + rawEvent.payload = JSON.stringify(payloadJson); } + } catch (error) {} + const rawEventRequestCountResultBefore = await getRawEventCount(); + const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0]; + const response = await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: false, + }); + if (rawEvent.event_path === '/new_block') { + expect(response.statusCode).toBe(500); + const rawEventRequestCountResultAfter = await getRawEventCount(); + const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0]; + expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter); } } } diff --git a/src/tests/microblock-tests.ts b/src/tests/microblock-tests.ts index 9f7aae41cf..7d647f17cb 100644 --- a/src/tests/microblock-tests.ts +++ b/src/tests/microblock-tests.ts @@ -91,17 +91,15 @@ describe('microblock tests', () => { return [apiServer, apiServer.terminate] as const; }, async (_, rawEventsIterator, eventServer, api) => { - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - } + for await (const rawEvent of rawEventsIterator) { + await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); } // test that the out-of-order microblocks were not stored const mbHash1 = '0xb714e75a7dae26fee0e77788317a0c84e513d1d8647a376b21b1c864e55c135a'; @@ -153,17 +151,15 @@ describe('microblock tests', () => { return [apiServer, apiServer.terminate] as const; }, async (_, rawEventsIterator, eventServer, api) => { - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - } + for await (const rawEvent of rawEventsIterator) { + await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); } const txResult2 = await supertest(api.server).get(`/extended/v1/tx/${lostTx}`); const { body: txBody }: { body: Transaction } = txResult2; @@ -219,17 +215,15 @@ describe('microblock tests', () => { return [apiServer, apiServer.terminate] as const; }, async (_, rawEventsIterator, eventServer, api) => { - for await (const rawEvents of rawEventsIterator) { - for (const rawEvent of rawEvents) { - await httpPostRequest({ - host: '127.0.0.1', - port: eventServer.serverAddress.port, - path: rawEvent.event_path, - headers: { 'Content-Type': 'application/json' }, - body: Buffer.from(rawEvent.payload, 'utf8'), - throwOnNotOK: true, - }); - } + for await (const rawEvent of rawEventsIterator) { + await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: rawEvent.event_path, + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(rawEvent.payload, 'utf8'), + throwOnNotOK: true, + }); } const txResult2 = await supertest(api.server).get(`/extended/v1/tx/${lostTx}`); const { body: txBody }: { body: Transaction } = txResult2;