Skip to content

fix: guarantee db is empty before performing a replay #1374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@
"preLaunchTask": "stacks-node:deploy-dev",
"postDebugTask": "stacks-node:stop-dev"
},
{
"type": "node",
"request": "launch",
"name": "Jest: Event Replay",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"args": [
"--testTimeout=3600000",
"--runInBand",
"--no-cache",
"--config",
"${workspaceRoot}/jest.config.event-replay.js"
],
"outputCapture": "std",
"console": "integratedTerminal",
"preLaunchTask": "stacks-node:deploy-dev",
"postDebugTask": "stacks-node:stop-dev"
},
{
"type": "node",
"request": "launch",
Expand Down
15 changes: 15 additions & 0 deletions jest.config.event-replay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module.exports = {
preset: 'ts-jest',
rootDir: 'src',
testMatch: ['<rootDir>/tests-event-replay/**/*.ts'],
testPathIgnorePatterns: [
'<rootDir>/tests-event-replay/setup.ts',
'<rootDir>/tests-event-replay/teardown.ts',
],
collectCoverageFrom: ['<rootDir>/**/*.ts'],
coveragePathIgnorePatterns: ['<rootDir>/tests*'],
coverageDirectory: '../coverage',
globalSetup: '<rootDir>/tests-event-replay/setup.ts',
globalTeardown: '<rootDir>/tests-event-replay/teardown.ts',
testTimeout: 20000,
};
32 changes: 23 additions & 9 deletions src/datastore/event-requests.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { pipelineAsync } from '../helpers';
import { Readable, Writable } from 'stream';
import { DbRawEventRequest } from './common';
import { PgServer } from './connection';
import { connectPostgres, PgServer } from './connection';
import { connectPgPool, connectWithRetry } from './connection-legacy';
import * as pgCopyStreams from 'pg-copy-streams';
import * as PgCursor from 'pg-cursor';
Expand Down Expand Up @@ -119,22 +119,36 @@ export async function* getRawEventRequests(
}
}

export async function containsAnyRawEventRequests(): Promise<boolean> {
const pool = await connectPgPool({
usageName: 'contains-raw-events-check',
/**
* Check the `pg_class` table for any data structures contained in the database. We will consider
* any and all results here as "data" contained in the DB, since anything that is not a completely
* empty DB could lead to strange errors when running the API. See:
* https://www.postgresql.org/docs/current/catalog-pg-class.html
* @returns `boolean` if the DB has data
*/
export async function databaseHasData(args?: {
ignoreMigrationTables?: boolean;
}): Promise<boolean> {
const sql = await connectPostgres({
usageName: 'contains-data-check',
pgServer: PgServer.primary,
});
const client = await pool.connect();
try {
const result = await client.query('SELECT id from event_observer_requests LIMIT 1');
return result.rowCount > 0;
const ignoreMigrationTables = args?.ignoreMigrationTables ?? false;
const result = await sql<{ count: number }[]>`
SELECT COUNT(*)
FROM pg_class c
JOIN pg_namespace s ON s.oid = c.relnamespace
WHERE s.nspname = ${sql.options.connection.search_path}
${ignoreMigrationTables ? sql`AND c.relname NOT LIKE 'pgmigrations%'` : sql``}
`;
return result.count > 0 && result[0].count > 0;
} catch (error: any) {
if (error.message?.includes('does not exist')) {
return false;
}
throw error;
} finally {
client.release();
await pool.end();
await sql.end();
}
}
53 changes: 30 additions & 23 deletions src/datastore/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import PgMigrate, { RunnerOption } from 'node-pg-migrate';
import { Client } from 'pg';
import { APP_DIR, isDevEnv, isTestEnv, logError, logger } from '../helpers';
import { getPgClientConfig, PgClientConfig } from './connection-legacy';
import { connectPostgres, PgServer } from './connection';
import { databaseHasData } from './event-requests';

const MIGRATIONS_TABLE = 'pgmigrations';
const MIGRATIONS_DIR = path.join(APP_DIR, 'migrations');
Expand Down Expand Up @@ -52,10 +54,14 @@ export async function runMigrations(
export async function cycleMigrations(opts?: {
// Bypass the NODE_ENV check when performing a "down" migration which irreversibly drops data.
dangerousAllowDataLoss?: boolean;
checkForEmptyData?: boolean;
}): Promise<void> {
const clientConfig = getPgClientConfig({ usageName: 'cycle-migrations' });

await runMigrations(clientConfig, 'down', opts);
if (opts?.checkForEmptyData && (await databaseHasData({ ignoreMigrationTables: true }))) {
throw new Error('Migration down process did not completely remove DB tables');
}
await runMigrations(clientConfig, 'up', opts);
}

Expand All @@ -65,30 +71,31 @@ export async function dangerousDropAllTables(opts?: {
if (opts?.acknowledgePotentialCatastrophicConsequences !== 'yes') {
throw new Error('Dangerous usage error.');
}
const clientConfig = getPgClientConfig({ usageName: 'dangerous-drop-all-tables' });
const client = new Client(clientConfig);
const sql = await connectPostgres({
usageName: 'dangerous-drop-all-tables',
pgServer: PgServer.primary,
});
const schema = sql.options.connection.search_path;
try {
await client.connect();
await client.query('BEGIN');
const getTablesQuery = await client.query<{ table_name: string }>(
`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
AND table_catalog = $2
AND table_type = 'BASE TABLE'
`,
[clientConfig.schema, clientConfig.database]
);
const tables = getTablesQuery.rows.map(r => r.table_name);
for (const table of tables) {
await client.query(`DROP TABLE IF EXISTS ${table} CASCADE`);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
await sql.begin(async sql => {
const relNamesQuery = async (kind: string) => sql<{ relname: string }[]>`
SELECT relname
FROM pg_class c
JOIN pg_namespace s ON s.oid = c.relnamespace
WHERE s.nspname = ${schema} AND c.relkind = ${kind}
`;
Comment on lines +81 to +86
Copy link
Member

@zone117x zone117x Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to constrain this query to the database specified in clientConfig.database? Otherwise, it seems like this could drop everything from the entire postgres instance rather than only the specific database.

The previous code was marked as "dangerous"/"unsafe" because it's conceivable that devs might also use the same database for their app-specific data, e.g. something like a stacks_blockchain_api db with tables like:

transactions
blocks
...<the other tables configured by migration files>
my_app_users
my_app_posts

So the idea behind requiring that --force flag for wiping is that devs might think twice about this situation. However, it seems like the above code would drop data even if devs put their app-specific data in a separate database yet still in the same postgres instance.

But maybe that is prevented because of implicit database scoping setup in the sql connection? Idk, just want to verify the behavior here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pg_class system catalog is actually database-specific. There are some which are shared but pg_class is not:

Most system catalogs are copied from the template database during database creation and are thereafter database-specific. A few catalogs are physically shared across all databases in a cluster; these are noted in the descriptions of the individual catalogs.

https://www.postgresql.org/docs/14/catalogs-overview.html

it's conceivable that devs might also use the same database for their app-specific data

This is a great point, though, that I think we should definitely discourage somehow. The difficulty with this is that when we change schemas across API version we'd need to somehow keep track of historical migrations to validate that we're only dropping things the API created as opposed to the developer.

I could expand the warning message we display during replay to explain this situation (i.e. check for empty DB, if not, ask the user to use --wipe-db --force but also explain they shouldn't keep other data in the same DB)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pg_class system catalog is actually database-specific. There are some which are shared but pg_class is not

Got it, I think in that case this PR is good to go. The extra log warning sounds helpful but doesn't have to be in the PR imo since there doesn't seem to be extra danger introduced with this change.

// Remove materialized views first and tables second.
// Using CASCADE in these DROP statements also removes associated indexes and constraints.
const views = await relNamesQuery('m');
for (const view of views) {
await sql`DROP MATERIALIZED VIEW IF EXISTS ${sql(view.relname)} CASCADE`;
}
const tables = await relNamesQuery('r');
for (const table of tables) {
await sql`DROP TABLE IF EXISTS ${sql(table.relname)} CASCADE`;
}
});
} finally {
await client.end();
await sql.end();
}
}
16 changes: 10 additions & 6 deletions src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { defaultLogLevel, getApiConfiguredChainID, httpPostRequest, logger } fro
import { findBnsGenesisBlockData, findTsvBlockHeight, getDbBlockHeight } from './helpers';
import { importV1BnsNames, importV1BnsSubdomains, importV1TokenOfferingData } from '../import-v1';
import {
containsAnyRawEventRequests,
databaseHasData,
exportRawEventRequests,
getRawEventRequests,
} from '../datastore/event-requests';
Expand Down Expand Up @@ -90,18 +90,22 @@ export async function importEventsFromTsv(
default:
throw new Error(`Invalid event import mode: ${importMode}`);
}
const hasData = await containsAnyRawEventRequests();
const hasData = await databaseHasData();
if (!wipeDb && hasData) {
throw new Error(`Database contains existing data. Add --wipe-db to drop the existing tables.`);
}
if (force) {
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}

// This performs a "migration down" which drops the tables, then re-creates them.
// If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually,
// or the `--force` option can be used.
await cycleMigrations({ dangerousAllowDataLoss: true });
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error(
`DB migration cycle failed, possibly due to an incompatible API version upgrade. Add --wipe-db --force or perform a manual DB wipe before importing.`
);
}

// Look for the TSV's block height and determine the prunable block window.
const tsvBlockHeight = await findTsvBlockHeight(resolvedFilePath);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import * as fs from 'fs';
import { findTsvBlockHeight } from '../event-replay/helpers';
import { findBnsGenesisBlockData, findTsvBlockHeight } from '../event-replay/helpers';
import { ReverseFileStream } from '../event-replay/reverse-file-stream';

describe('event replay tests', () => {
describe('helper tests', () => {
function writeTmpFile(fileName: string, contents: string): string {
try {
fs.mkdirSync('./.tmp');
Expand Down Expand Up @@ -119,4 +119,17 @@ line4`;
fs.unlinkSync(testFilePath);
}
});

test('BNS genesis block data is found', async () => {
const genesisBlock = await findBnsGenesisBlockData('src/tests-event-replay/tsv/mainnet.tsv');
expect(genesisBlock).toEqual({
index_block_hash: '0x918697ef63f9d8bdf844c3312b299e72a231cde542f3173f7755bb8c1cdaf3a7',
parent_index_block_hash: '0x55c9861be5cff984a20ce6d99d4aa65941412889bdc665094136429b84f8c2ee',
microblock_hash: '0x0000000000000000000000000000000000000000000000000000000000000000',
microblock_sequence: 0,
microblock_canonical: true,
tx_id: '0x2f079994c9bd92b2272258b9de73e278824d76efe1b5a83a3b00941f9559de8a',
tx_index: 7,
});
});
});
101 changes: 101 additions & 0 deletions src/tests-event-replay/import-export-tests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import * as fs from 'fs';
import { exportEventsAsTsv, importEventsFromTsv } from '../event-replay/event-replay';
import { PgWriteStore } from '../datastore/pg-write-store';
import { dangerousDropAllTables, runMigrations } from '../datastore/migrations';
import { databaseHasData } from '../datastore/event-requests';
import { getPgClientConfig } from '../datastore/connection-legacy';

describe('import/export tests', () => {
let db: PgWriteStore;

beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
db = await PgWriteStore.connect({
usageName: 'tests',
withNotifier: false,
skipMigrations: true,
});
});

test('event import and export cycle', async () => {
// Import from mocknet TSV
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
const chainTip = await db.getUnanchoredChainTip();
expect(chainTip.found).toBe(true);
expect(chainTip.result?.blockHeight).toBe(28);
expect(chainTip.result?.indexBlockHash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(chainTip.result?.blockHash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);

// Export into temp TSV
const tmpDir = 'src/tests-event-replay/.tmp';
try {
fs.mkdirSync(tmpDir);
} catch (error: any) {
if (error.code != 'EEXIST') throw error;
}
const tmpTsvPath = `${tmpDir}/export.tsv`;
await exportEventsAsTsv(tmpTsvPath, true);

// Re-import with exported TSV and check that chain tip matches.
try {
await importEventsFromTsv(`${tmpDir}/export.tsv`, 'archival', true, true);
const newChainTip = await db.getUnanchoredChainTip();
expect(newChainTip.found).toBe(true);
expect(newChainTip.result?.blockHeight).toBe(28);
expect(newChainTip.result?.indexBlockHash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(newChainTip.result?.blockHash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
} finally {
fs.rmSync(tmpDir, { force: true, recursive: true });
}
});

test('import with db wipe options', async () => {
// Migrate first so we have some data.
const clientConfig = getPgClientConfig({ usageName: 'cycle-migrations' });
await runMigrations(clientConfig, 'up', {});
await expect(
importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', false, false)
).rejects.toThrowError('contains existing data');

// Create strange table
await db.sql`CREATE TABLE IF NOT EXISTS test (a varchar(10))`;
await expect(
importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, false)
).rejects.toThrowError('migration cycle failed');

// Force and test
await expect(
importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true)
).resolves.not.toThrow();
});

test('db contains data', async () => {
const clientConfig = getPgClientConfig({ usageName: 'cycle-migrations' });
await runMigrations(clientConfig, 'up', {});

// Having tables counts as having data as this may change across major versions.
await expect(databaseHasData()).resolves.toBe(true);

// Dropping all tables removes everything.
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
await expect(databaseHasData()).resolves.toBe(false);

// Cycling migrations leaves the `pgmigrations` table.
await runMigrations(clientConfig, 'up', {});
await runMigrations(clientConfig, 'down', {});
await expect(databaseHasData()).resolves.toBe(true);
await expect(databaseHasData({ ignoreMigrationTables: true })).resolves.toBe(false);
});

afterEach(async () => {
await db?.close();
});
});
11 changes: 11 additions & 0 deletions src/tests-event-replay/setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { loadDotEnv } from '../helpers';

// ts-unused-exports:disable-next-line
export default (): void => {
console.log('Jest - setup..');
if (!process.env.NODE_ENV) {
process.env.NODE_ENV = 'test';
}
loadDotEnv();
console.log('Jest - setup done');
};
5 changes: 5 additions & 0 deletions src/tests-event-replay/teardown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// ts-unused-exports:disable-next-line
export default (): void => {
console.log('Jest - teardown');
console.log('Jest - teardown done');
};
10 changes: 10 additions & 0 deletions src/tests-event-replay/tsv/mainnet.tsv

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions src/tests-event-replay/tsv/mocknet.tsv

Large diffs are not rendered by default.