Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions forks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The main approach creates intentional conflicts by running parallel operations o
- **installationCount**: `5` - How many installations to use randomly in the createInstallation operations
- **typeofStreamForTest**: `typeofStream.None` - No streams started by default (configured on-demand)
- **typeOfSyncForTest**: `typeOfSync.None` - No automatic syncing (configured on-demand)
- **streams**: `false` - Enable message streams on all workers (configured via `--streams` CLI flag)

### Test setup in local network

Expand Down Expand Up @@ -92,6 +93,9 @@ yarn fork --clean-all
# Run on a specific environment
yarn fork --count 200 --env local

# Enable message streams on all workers
yarn fork --streams

# Show help
yarn fork --help
```
Expand Down
24 changes: 21 additions & 3 deletions forks/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ interface ForkOptions {
env?: string; // XMTP environment (local, dev, production)
chaosEnabled: boolean; // Enable network chaos
chaosLevel: ChaosLevel; // Chaos level (low, medium, high)
streams: boolean; // Enable message streams on all workers
}

function showHelp() {
Expand All @@ -42,6 +43,7 @@ OPTIONS:
--env <environment> XMTP environment (local, dev, production) [default: dev]
--chaos-enabled Enable network chaos testing (requires --env local)
--chaos-level <level> Chaos level: low, medium, high [default: medium]
--streams Enable message streams on all workers [default: false]
-h, --help Show this help message

CHAOS LEVELS:
Expand All @@ -56,6 +58,7 @@ EXAMPLES:
yarn fork --count 200 --env local # Run 200 times on local environment
yarn fork --env local --chaos-enabled # Run with medium network chaos
yarn fork --env local --chaos-enabled --chaos-level high # Run with high chaos
yarn fork --streams # Run with message streams enabled

For more information, see: forks/README.md
`);
Expand All @@ -73,9 +76,9 @@ function getForkCount(): number {
}

/**
* Run the fork test (suppress output)
* Run the fork test (suppress output) and return whether it completed successfully
*/
function runForkTest(options: ForkOptions): void {
function runForkTest(options: ForkOptions): boolean {
const envFlag = options.env ? `--env ${options.env}` : "";
const command = `yarn test forks ${envFlag} --log warn --file`.trim();

Expand All @@ -86,10 +89,14 @@ function runForkTest(options: ForkOptions): void {
...process.env,
CHAOS_ENABLED: options.chaosEnabled ? "true" : "false",
CHAOS_LEVEL: options.chaosLevel,
STREAMS_ENABLED: options.streams ? "true" : "false",
},
});

return true;
} catch (e) {
console.error("Error running fork test", e);
return false;
// Test may fail if forks are detected, that's expected
// We'll analyze the logs afterward
}
Expand All @@ -114,6 +121,7 @@ function logForkMatrixParameters(options: ForkOptions): void {
console.info(`randomInboxIdsCount: ${randomInboxIdsCount}`);
console.info(`installationCount: ${installationCount}`);
console.info(`testName: ${testName}`);
console.info(`streams: ${options.streams}`);

if (options.chaosEnabled) {
const preset = chaosPresets[options.chaosLevel];
Expand Down Expand Up @@ -156,6 +164,7 @@ async function runForkDetection(options: ForkOptions): Promise<void> {
forksDetected: 0,
runsWithForks: 0,
runsWithoutForks: 0,
runsWithErrors: 0,
};

// Clean logs if requested before starting
Expand All @@ -168,7 +177,11 @@ async function runForkDetection(options: ForkOptions): Promise<void> {
// Run the test N times
for (let i = 1; i <= options.count; i++) {
// Run the fork test (silently)
runForkTest(options);
const success = runForkTest(options);
if (!success) {
stats.runsWithErrors++;
console.log(`❌ Error in run ${i}/${options.count}`);
}

// Clean and analyze fork logs after the test (suppress output)
const originalConsoleDebug = console.debug;
Expand Down Expand Up @@ -201,6 +214,7 @@ async function runForkDetection(options: ForkOptions): Promise<void> {
console.info(`Total forks detected: ${stats.forksDetected}`);
console.info(`Runs with forks: ${stats.runsWithForks}`);
console.info(`Runs without forks: ${stats.runsWithoutForks}`);
console.info(`Runs with errors: ${stats.runsWithErrors}`);
console.info(
`Fork detection rate: ${(
(stats.runsWithForks / stats.totalRuns) *
Expand Down Expand Up @@ -242,6 +256,7 @@ async function main() {
env: process.env.XMTP_ENV || "dev",
chaosEnabled: false,
chaosLevel: "medium",
streams: false,
};

// Parse arguments
Expand Down Expand Up @@ -310,6 +325,9 @@ async function main() {
process.exit(1);
}
break;
case "--streams":
options.streams = true;
break;
default:
console.error(`Unknown option: ${arg}`);
console.error("Use --help for usage information");
Expand Down
10 changes: 7 additions & 3 deletions forks/config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getActiveVersion } from "@helpers/versions";

// Fork matrix parameters - shared between test and CLI
export const groupCount = 5;
export const groupCount = 1;
export const parallelOperations = 5; // How many operations to perform in parallel
export const NODE_VERSION = getActiveVersion().nodeBindings; // default to latest version, can be overridden with --nodeBindings=3.1.1
// By calling workers with prefix random1, random2, etc. we guarantee that creates a new key each run
Expand All @@ -23,10 +23,11 @@ export const epochRotationOperations = {
export const otherOperations = {
createInstallation: false, // creates a new installation for a random worker
sendMessage: true, // sends a message to the group
sync: true, // syncs the group
};
export const targetEpoch = 30n; // The target epoch to stop the test (epochs are when performing forks to the group)
export const network = process.env.XMTP_ENV; // Network environment setting
export const randomInboxIdsCount = 10; // How many inboxIds to use randomly in the add/remove operations
export const randomInboxIdsCount = 50; // How many inboxIds to use randomly in the add/remove operations
export const installationCount = 2; // How many installations to use randomly in the createInstallation operations
export const testName = "forks";

Expand Down Expand Up @@ -68,7 +69,7 @@ export const chaosPresets: Record<ChaosLevel, ChaosPreset> = {
jitterMin: 50,
jitterMax: 200,
lossMin: 0,
lossMax: 10,
lossMax: 25,
interval: 10000, // 10 seconds
},
};
Expand All @@ -84,6 +85,9 @@ export const chaosConfig: ChaosConfig = {
level: (process.env.CHAOS_LEVEL as ChaosLevel) || "medium",
};

// Parse streams config from environment
export const streamsEnabled = process.env.STREAMS_ENABLED === "true";

// Multinode container names for local environment chaos testing
export const multinodeContainers = [
"multinode-node1-1",
Expand Down
113 changes: 37 additions & 76 deletions forks/forks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { getTime } from "@helpers/logger";
import { type Group } from "@helpers/versions";
import { setupDurationTracking } from "@helpers/vitest";
import { getInboxes } from "@inboxes/utils";
import { typeofStream } from "@workers/main";
import { getWorkers, type Worker } from "@workers/manager";
import { describe, it } from "vitest";
import { describe, expect, it } from "vitest";
import { DockerContainer } from "../network-stability/container";
import {
chaosConfig,
Expand All @@ -17,71 +18,17 @@ import {
otherOperations,
parallelOperations,
randomInboxIdsCount,
streamsEnabled,
targetEpoch,
testName,
workerNames,
type ChaosPreset,
} from "./config";

const startChaos = (
allNodes: DockerContainer[],
preset: ChaosPreset,
): NodeJS.Timeout => {
console.log(`[chaos] Initialized ${allNodes.length} Docker containers`);

// Validate containers are running
for (const node of allNodes) {
try {
// Test if container exists by trying to get its IP
if (!node.ip) {
throw new Error(`Container ${node.name} has no IP address`);
}
} catch {
throw new Error(
`Docker container ${node.name} is not running. Network chaos requires local multinode setup (./dev/up).`,
);
}
}
console.log("[chaos] All Docker containers validated");

// Function to apply chaos to all nodes
const applyChaos = () => {
console.log(
"[chaos] Applying jitter, delay, and drop rules to all nodes...",
);
for (const node of allNodes) {
const delay = Math.floor(
preset.delayMin + Math.random() * (preset.delayMax - preset.delayMin),
);
const jitter = Math.floor(
preset.jitterMin +
Math.random() * (preset.jitterMax - preset.jitterMin),
);
const loss =
preset.lossMin + Math.random() * (preset.lossMax - preset.lossMin);

try {
node.addJitter(delay, jitter);
if (Math.random() < 0.5) node.addLoss(loss);
} catch (err) {
console.warn(`[chaos] Error applying netem on ${node.name}:`, err);
}
}
};

// Apply chaos immediately
applyChaos();

return setInterval(applyChaos, preset.interval);
};
import { clearChaos, startChaos } from "./utils";

describe(testName, () => {
setupDurationTracking({ testName });

const createOperations = async (worker: Worker, group: Group) => {
// This syncs all and can contribute to the fork
await worker.client.conversations.syncAll();

const createOperations = (worker: Worker, group: Group) => {
// Fetches the group from the worker perspective
const getGroup = () =>
worker.client.conversations.getConversationById(
Expand Down Expand Up @@ -121,6 +68,7 @@ describe(testName, () => {
getGroup().then((g) =>
g.send(`Message from ${worker.name}`).then(() => {}),
),
sync: () => getGroup().then((g) => g.sync()),
};
};

Expand All @@ -129,13 +77,21 @@ describe(testName, () => {
let allNodes: DockerContainer[] = [];
let chaosInterval: NodeJS.Timeout | undefined;
let verifyInterval: NodeJS.Timeout | undefined;
let mustFail = false;

try {
let workers = await getWorkers(workerNames, {
env: network as "local" | "dev" | "production",
nodeBindings: NODE_VERSION,
});
// Note: typeofStreamForTest and typeOfSyncForTest are set to None, so no streams or syncs to start

// Enable message streams if configured
if (streamsEnabled) {
console.log("[streams] Enabling message streams on all workers");
workers.getAll().forEach((worker) => {
worker.worker.startStream(typeofStream.Message);
});
}

// Initialize network chaos if enabled
if (chaosConfig.enabled) {
Expand All @@ -159,7 +115,6 @@ describe(testName, () => {
await workers.checkForks();
} catch (e) {
console.warn("[verify] Skipping check due to exception:", e);
throw e;
}
})();
}, 10 * 1000);
Expand All @@ -175,6 +130,9 @@ describe(testName, () => {
const group = await workers.createGroupBetweenAll();

let currentEpoch = 0n;
await Promise.all(
workers.getAll().map((w) => w.client.conversations.sync()),
);

while (currentEpoch < targetEpoch) {
const parallelOperationsArray = Array.from(
Expand All @@ -186,7 +144,7 @@ describe(testName, () => {
Math.floor(Math.random() * workers.getAll().length)
];

const ops = await createOperations(randomWorker, group);
const ops = createOperations(randomWorker, group);
const operationList = [
...(epochRotationOperations.updateName
? [ops.updateName]
Expand All @@ -203,6 +161,7 @@ describe(testName, () => {
? [ops.createInstallation]
: []),
...(otherOperations.sendMessage ? [ops.sendMessage] : []),
...(otherOperations.sync ? [ops.sync] : []),
];

const randomOperation =
Expand All @@ -217,11 +176,19 @@ describe(testName, () => {
await randomOperation();
await otherRandomOperation();
} catch (e) {
console.log(`Group ${groupIndex + 1} operation failed:`, e);
console.error(
`Group ${groupIndex + 1} operation failed:`,
e,
);
}
})(),
);
await Promise.all(parallelOperationsArray);
try {
await Promise.all(parallelOperationsArray);
} catch (e) {
console.error(`Group ${groupIndex + 1} operation failed:`, e);
}

await workers.checkForksForGroup(group.id);
currentEpoch = (await group.debugInfo()).epoch;
}
Expand All @@ -232,35 +199,29 @@ describe(testName, () => {

await Promise.all(groupOperationPromises);
await workers.checkForks();
} catch (e) {
} catch (e: any) {
console.error("Error during fork testing:", e);
mustFail = true;
} finally {
if (verifyInterval) {
clearInterval(verifyInterval);
}
// Clean up chaos if it was enabled
if (chaosConfig.enabled) {
console.log("[chaos] Cleaning up network chaos...");

// Clear intervals
if (chaosInterval) {
clearInterval(chaosInterval);
}

// Clear network rules
for (const node of allNodes) {
try {
node.clearLatency();
} catch (err) {
console.warn(
`[chaos] Error clearing latency on ${node.name}:`,
err,
);
}
}
clearChaos(allNodes);

console.log("[chaos] Cleanup complete");
}

if (mustFail) {
expect.fail(`Test failed`);
}
}
});
});
Loading