diff --git a/forks/README.md b/forks/README.md index 4f716cd40..59faa51d8 100644 --- a/forks/README.md +++ b/forks/README.md @@ -61,6 +61,7 @@ The main approach creates intentional conflicts by running parallel operations o - **installationCount**: `5` - How many installations to use randomly in the createInstallation operations - **typeofStreamForTest**: `typeofStream.None` - No streams started by default (configured on-demand) - **typeOfSyncForTest**: `typeOfSync.None` - No automatic syncing (configured on-demand) +- **streams**: `false` - Enable message streams on all workers (configured via `--streams` CLI flag) ### Test setup in local network @@ -92,6 +93,9 @@ yarn fork --clean-all # Run on a specific environment yarn fork --count 200 --env local +# Enable message streams on all workers +yarn fork --streams + # Show help yarn fork --help ``` diff --git a/forks/cli.ts b/forks/cli.ts index 85830f839..8dc44388e 100644 --- a/forks/cli.ts +++ b/forks/cli.ts @@ -25,6 +25,7 @@ interface ForkOptions { env?: string; // XMTP environment (local, dev, production) chaosEnabled: boolean; // Enable network chaos chaosLevel: ChaosLevel; // Chaos level (low, medium, high) + streams: boolean; // Enable message streams on all workers } function showHelp() { @@ -42,6 +43,7 @@ OPTIONS: --env XMTP environment (local, dev, production) [default: dev] --chaos-enabled Enable network chaos testing (requires --env local) --chaos-level Chaos level: low, medium, high [default: medium] + --streams Enable message streams on all workers [default: false] -h, --help Show this help message CHAOS LEVELS: @@ -56,6 +58,7 @@ EXAMPLES: yarn fork --count 200 --env local # Run 200 times on local environment yarn fork --env local --chaos-enabled # Run with medium network chaos yarn fork --env local --chaos-enabled --chaos-level high # Run with high chaos + yarn fork --streams # Run with message streams enabled For more information, see: forks/README.md `); @@ -73,9 +76,9 @@ function getForkCount(): number { } /** - * Run the fork test (suppress output) + * Run the fork test (suppress output) and return whether it completed successfully */ -function runForkTest(options: ForkOptions): void { +function runForkTest(options: ForkOptions): boolean { const envFlag = options.env ? `--env ${options.env}` : ""; const command = `yarn test forks ${envFlag} --log warn --file`.trim(); @@ -86,10 +89,14 @@ function runForkTest(options: ForkOptions): void { ...process.env, CHAOS_ENABLED: options.chaosEnabled ? "true" : "false", CHAOS_LEVEL: options.chaosLevel, + STREAMS_ENABLED: options.streams ? "true" : "false", }, }); + + return true; } catch (e) { console.error("Error running fork test", e); + return false; // Test may fail if forks are detected, that's expected // We'll analyze the logs afterward } @@ -114,6 +121,7 @@ function logForkMatrixParameters(options: ForkOptions): void { console.info(`randomInboxIdsCount: ${randomInboxIdsCount}`); console.info(`installationCount: ${installationCount}`); console.info(`testName: ${testName}`); + console.info(`streams: ${options.streams}`); if (options.chaosEnabled) { const preset = chaosPresets[options.chaosLevel]; @@ -156,6 +164,7 @@ async function runForkDetection(options: ForkOptions): Promise { forksDetected: 0, runsWithForks: 0, runsWithoutForks: 0, + runsWithErrors: 0, }; // Clean logs if requested before starting @@ -168,7 +177,11 @@ async function runForkDetection(options: ForkOptions): Promise { // Run the test N times for (let i = 1; i <= options.count; i++) { // Run the fork test (silently) - runForkTest(options); + const success = runForkTest(options); + if (!success) { + stats.runsWithErrors++; + console.log(`❌ Error in run ${i}/${options.count}`); + } // Clean and analyze fork logs after the test (suppress output) const originalConsoleDebug = console.debug; @@ -201,6 +214,7 @@ async function runForkDetection(options: ForkOptions): Promise { console.info(`Total forks detected: ${stats.forksDetected}`); console.info(`Runs with forks: ${stats.runsWithForks}`); console.info(`Runs without forks: ${stats.runsWithoutForks}`); + console.info(`Runs with errors: ${stats.runsWithErrors}`); console.info( `Fork detection rate: ${( (stats.runsWithForks / stats.totalRuns) * @@ -242,6 +256,7 @@ async function main() { env: process.env.XMTP_ENV || "dev", chaosEnabled: false, chaosLevel: "medium", + streams: false, }; // Parse arguments @@ -310,6 +325,9 @@ async function main() { process.exit(1); } break; + case "--streams": + options.streams = true; + break; default: console.error(`Unknown option: ${arg}`); console.error("Use --help for usage information"); diff --git a/forks/config.ts b/forks/config.ts index 9e4d7707a..23a369284 100644 --- a/forks/config.ts +++ b/forks/config.ts @@ -1,7 +1,7 @@ import { getActiveVersion } from "@helpers/versions"; // Fork matrix parameters - shared between test and CLI -export const groupCount = 5; +export const groupCount = 1; export const parallelOperations = 5; // How many operations to perform in parallel export const NODE_VERSION = getActiveVersion().nodeBindings; // default to latest version, can be overridden with --nodeBindings=3.1.1 // By calling workers with prefix random1, random2, etc. we guarantee that creates a new key each run @@ -23,10 +23,11 @@ export const epochRotationOperations = { export const otherOperations = { createInstallation: false, // creates a new installation for a random worker sendMessage: true, // sends a message to the group + sync: true, // syncs the group }; export const targetEpoch = 30n; // The target epoch to stop the test (epochs are when performing forks to the group) export const network = process.env.XMTP_ENV; // Network environment setting -export const randomInboxIdsCount = 10; // How many inboxIds to use randomly in the add/remove operations +export const randomInboxIdsCount = 50; // How many inboxIds to use randomly in the add/remove operations export const installationCount = 2; // How many installations to use randomly in the createInstallation operations export const testName = "forks"; @@ -68,7 +69,7 @@ export const chaosPresets: Record = { jitterMin: 50, jitterMax: 200, lossMin: 0, - lossMax: 10, + lossMax: 25, interval: 10000, // 10 seconds }, }; @@ -84,6 +85,9 @@ export const chaosConfig: ChaosConfig = { level: (process.env.CHAOS_LEVEL as ChaosLevel) || "medium", }; +// Parse streams config from environment +export const streamsEnabled = process.env.STREAMS_ENABLED === "true"; + // Multinode container names for local environment chaos testing export const multinodeContainers = [ "multinode-node1-1", diff --git a/forks/forks.test.ts b/forks/forks.test.ts index d9d432da4..6927c3f32 100644 --- a/forks/forks.test.ts +++ b/forks/forks.test.ts @@ -2,8 +2,9 @@ import { getTime } from "@helpers/logger"; import { type Group } from "@helpers/versions"; import { setupDurationTracking } from "@helpers/vitest"; import { getInboxes } from "@inboxes/utils"; +import { typeofStream } from "@workers/main"; import { getWorkers, type Worker } from "@workers/manager"; -import { describe, it } from "vitest"; +import { describe, expect, it } from "vitest"; import { DockerContainer } from "../network-stability/container"; import { chaosConfig, @@ -17,71 +18,17 @@ import { otherOperations, parallelOperations, randomInboxIdsCount, + streamsEnabled, targetEpoch, testName, workerNames, - type ChaosPreset, } from "./config"; - -const startChaos = ( - allNodes: DockerContainer[], - preset: ChaosPreset, -): NodeJS.Timeout => { - console.log(`[chaos] Initialized ${allNodes.length} Docker containers`); - - // Validate containers are running - for (const node of allNodes) { - try { - // Test if container exists by trying to get its IP - if (!node.ip) { - throw new Error(`Container ${node.name} has no IP address`); - } - } catch { - throw new Error( - `Docker container ${node.name} is not running. Network chaos requires local multinode setup (./dev/up).`, - ); - } - } - console.log("[chaos] All Docker containers validated"); - - // Function to apply chaos to all nodes - const applyChaos = () => { - console.log( - "[chaos] Applying jitter, delay, and drop rules to all nodes...", - ); - for (const node of allNodes) { - const delay = Math.floor( - preset.delayMin + Math.random() * (preset.delayMax - preset.delayMin), - ); - const jitter = Math.floor( - preset.jitterMin + - Math.random() * (preset.jitterMax - preset.jitterMin), - ); - const loss = - preset.lossMin + Math.random() * (preset.lossMax - preset.lossMin); - - try { - node.addJitter(delay, jitter); - if (Math.random() < 0.5) node.addLoss(loss); - } catch (err) { - console.warn(`[chaos] Error applying netem on ${node.name}:`, err); - } - } - }; - - // Apply chaos immediately - applyChaos(); - - return setInterval(applyChaos, preset.interval); -}; +import { clearChaos, startChaos } from "./utils"; describe(testName, () => { setupDurationTracking({ testName }); - const createOperations = async (worker: Worker, group: Group) => { - // This syncs all and can contribute to the fork - await worker.client.conversations.syncAll(); - + const createOperations = (worker: Worker, group: Group) => { // Fetches the group from the worker perspective const getGroup = () => worker.client.conversations.getConversationById( @@ -121,6 +68,7 @@ describe(testName, () => { getGroup().then((g) => g.send(`Message from ${worker.name}`).then(() => {}), ), + sync: () => getGroup().then((g) => g.sync()), }; }; @@ -129,13 +77,21 @@ describe(testName, () => { let allNodes: DockerContainer[] = []; let chaosInterval: NodeJS.Timeout | undefined; let verifyInterval: NodeJS.Timeout | undefined; + let mustFail = false; try { let workers = await getWorkers(workerNames, { env: network as "local" | "dev" | "production", nodeBindings: NODE_VERSION, }); - // Note: typeofStreamForTest and typeOfSyncForTest are set to None, so no streams or syncs to start + + // Enable message streams if configured + if (streamsEnabled) { + console.log("[streams] Enabling message streams on all workers"); + workers.getAll().forEach((worker) => { + worker.worker.startStream(typeofStream.Message); + }); + } // Initialize network chaos if enabled if (chaosConfig.enabled) { @@ -159,7 +115,6 @@ describe(testName, () => { await workers.checkForks(); } catch (e) { console.warn("[verify] Skipping check due to exception:", e); - throw e; } })(); }, 10 * 1000); @@ -175,6 +130,9 @@ describe(testName, () => { const group = await workers.createGroupBetweenAll(); let currentEpoch = 0n; + await Promise.all( + workers.getAll().map((w) => w.client.conversations.sync()), + ); while (currentEpoch < targetEpoch) { const parallelOperationsArray = Array.from( @@ -186,7 +144,7 @@ describe(testName, () => { Math.floor(Math.random() * workers.getAll().length) ]; - const ops = await createOperations(randomWorker, group); + const ops = createOperations(randomWorker, group); const operationList = [ ...(epochRotationOperations.updateName ? [ops.updateName] @@ -203,6 +161,7 @@ describe(testName, () => { ? [ops.createInstallation] : []), ...(otherOperations.sendMessage ? [ops.sendMessage] : []), + ...(otherOperations.sync ? [ops.sync] : []), ]; const randomOperation = @@ -217,11 +176,19 @@ describe(testName, () => { await randomOperation(); await otherRandomOperation(); } catch (e) { - console.log(`Group ${groupIndex + 1} operation failed:`, e); + console.error( + `Group ${groupIndex + 1} operation failed:`, + e, + ); } })(), ); - await Promise.all(parallelOperationsArray); + try { + await Promise.all(parallelOperationsArray); + } catch (e) { + console.error(`Group ${groupIndex + 1} operation failed:`, e); + } + await workers.checkForksForGroup(group.id); currentEpoch = (await group.debugInfo()).epoch; } @@ -232,8 +199,9 @@ describe(testName, () => { await Promise.all(groupOperationPromises); await workers.checkForks(); - } catch (e) { + } catch (e: any) { console.error("Error during fork testing:", e); + mustFail = true; } finally { if (verifyInterval) { clearInterval(verifyInterval); @@ -241,26 +209,19 @@ describe(testName, () => { // Clean up chaos if it was enabled if (chaosConfig.enabled) { console.log("[chaos] Cleaning up network chaos..."); - // Clear intervals if (chaosInterval) { clearInterval(chaosInterval); } - // Clear network rules - for (const node of allNodes) { - try { - node.clearLatency(); - } catch (err) { - console.warn( - `[chaos] Error clearing latency on ${node.name}:`, - err, - ); - } - } + clearChaos(allNodes); console.log("[chaos] Cleanup complete"); } + + if (mustFail) { + expect.fail(`Test failed`); + } } }); }); diff --git a/forks/utils.ts b/forks/utils.ts new file mode 100644 index 000000000..7cafff60b --- /dev/null +++ b/forks/utils.ts @@ -0,0 +1,68 @@ +import { type DockerContainer } from "network-stability/container"; +import { type ChaosPreset } from "./config"; + +const applyPresetToNode = (node: DockerContainer, preset: ChaosPreset) => { + const delay = Math.floor( + preset.delayMin + Math.random() * (preset.delayMax - preset.delayMin), + ); + const jitter = Math.floor( + preset.jitterMin + Math.random() * (preset.jitterMax - preset.jitterMin), + ); + const loss = + preset.lossMin + Math.random() * (preset.lossMax - preset.lossMin); + + try { + node.addJitter(delay, jitter); + node.addLoss(loss); + } catch (err) { + console.warn(`[chaos] Error applying netem on ${node.name}:`, err); + } +}; + +const validateContainers = (allNodes: DockerContainer[]) => { + for (const node of allNodes) { + try { + // Test if container exists by trying to get its IP + if (!node.ip || !node.veth) { + throw new Error(`Container ${node.name} has no IP address`); + } + } catch { + throw new Error( + `Docker container ${node.name} is not running. Network chaos requires local multinode setup (./dev/up).`, + ); + } + } +}; + +export const startChaos = ( + allNodes: DockerContainer[], + preset: ChaosPreset, +): NodeJS.Timeout => { + validateContainers(allNodes); + console.log(`[chaos] Initialized ${allNodes.length} Docker containers`); + // Function to apply chaos to all nodes + const applyChaos = () => { + console.log( + "[chaos] Applying jitter, delay, and drop rules to all nodes...", + ); + for (const node of allNodes) { + applyPresetToNode(node, preset); + } + }; + + // Apply chaos immediately + applyChaos(); + + return setInterval(applyChaos, preset.interval); +}; + +export const clearChaos = (allNodes: DockerContainer[]) => { + // Clear network rules + for (const node of allNodes) { + try { + node.clearLatency(); + } catch (err) { + console.warn(`[chaos] Error clearing latency on ${node.name}:`, err); + } + } +}; diff --git a/workers/manager.ts b/workers/manager.ts index 6fbe69ebb..ef46d240d 100644 --- a/workers/manager.ts +++ b/workers/manager.ts @@ -354,7 +354,7 @@ export class WorkerManager implements IWorkerManager { if (baseName in this.keysCache) { //They persist in memory in the same test run - console.debug(`Using cached keys for ${baseName}`); + console.log(`Using cached keys for ${baseName}`); return this.keysCache[baseName]; } @@ -435,7 +435,7 @@ export class WorkerManager implements IWorkerManager { // Check if the worker already exists in our production storage if (providedInstallId && this.workers[baseName]?.[providedInstallId]) { - console.debug(`Reusing existing worker for ${descriptor}`); + console.log(`Reusing existing worker for ${descriptor}`); return this.workers[baseName][providedInstallId]; }