Skip to content

Commit b6f80d1

Browse files
committed
Add streams to fork test
1 parent 1416ee2 commit b6f80d1

File tree

6 files changed

+138
-83
lines changed

6 files changed

+138
-83
lines changed

forks/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ The main approach creates intentional conflicts by running parallel operations o
6161
- **installationCount**: `5` - How many installations to use randomly in the createInstallation operations
6262
- **typeofStreamForTest**: `typeofStream.None` - No streams started by default (configured on-demand)
6363
- **typeOfSyncForTest**: `typeOfSync.None` - No automatic syncing (configured on-demand)
64+
- **streams**: `false` - Enable message streams on all workers (configured via `--streams` CLI flag)
6465

6566
### Test setup in local network
6667

@@ -92,6 +93,9 @@ yarn fork --clean-all
9293
# Run on a specific environment
9394
yarn fork --count 200 --env local
9495

96+
# Enable message streams on all workers
97+
yarn fork --streams
98+
9599
# Show help
96100
yarn fork --help
97101
```

forks/cli.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ interface ForkOptions {
2525
env?: string; // XMTP environment (local, dev, production)
2626
chaosEnabled: boolean; // Enable network chaos
2727
chaosLevel: ChaosLevel; // Chaos level (low, medium, high)
28+
streams: boolean; // Enable message streams on all workers
2829
}
2930

3031
function showHelp() {
@@ -42,6 +43,7 @@ OPTIONS:
4243
--env <environment> XMTP environment (local, dev, production) [default: dev]
4344
--chaos-enabled Enable network chaos testing (requires --env local)
4445
--chaos-level <level> Chaos level: low, medium, high [default: medium]
46+
--streams Enable message streams on all workers [default: false]
4547
-h, --help Show this help message
4648
4749
CHAOS LEVELS:
@@ -56,6 +58,7 @@ EXAMPLES:
5658
yarn fork --count 200 --env local # Run 200 times on local environment
5759
yarn fork --env local --chaos-enabled # Run with medium network chaos
5860
yarn fork --env local --chaos-enabled --chaos-level high # Run with high chaos
61+
yarn fork --streams # Run with message streams enabled
5962
6063
For more information, see: forks/README.md
6164
`);
@@ -73,9 +76,9 @@ function getForkCount(): number {
7376
}
7477

7578
/**
76-
* Run the fork test (suppress output)
79+
* Run the fork test (suppress output) and return whether it completed successfully
7780
*/
78-
function runForkTest(options: ForkOptions): void {
81+
function runForkTest(options: ForkOptions): boolean {
7982
const envFlag = options.env ? `--env ${options.env}` : "";
8083
const command = `yarn test forks ${envFlag} --log warn --file`.trim();
8184

@@ -86,10 +89,14 @@ function runForkTest(options: ForkOptions): void {
8689
...process.env,
8790
CHAOS_ENABLED: options.chaosEnabled ? "true" : "false",
8891
CHAOS_LEVEL: options.chaosLevel,
92+
STREAMS_ENABLED: options.streams ? "true" : "false",
8993
},
9094
});
95+
96+
return true;
9197
} catch (e) {
9298
console.error("Error running fork test", e);
99+
return false;
93100
// Test may fail if forks are detected, that's expected
94101
// We'll analyze the logs afterward
95102
}
@@ -114,6 +121,7 @@ function logForkMatrixParameters(options: ForkOptions): void {
114121
console.info(`randomInboxIdsCount: ${randomInboxIdsCount}`);
115122
console.info(`installationCount: ${installationCount}`);
116123
console.info(`testName: ${testName}`);
124+
console.info(`streams: ${options.streams}`);
117125

118126
if (options.chaosEnabled) {
119127
const preset = chaosPresets[options.chaosLevel];
@@ -156,6 +164,7 @@ async function runForkDetection(options: ForkOptions): Promise<void> {
156164
forksDetected: 0,
157165
runsWithForks: 0,
158166
runsWithoutForks: 0,
167+
runsWithErrors: 0,
159168
};
160169

161170
// Clean logs if requested before starting
@@ -168,7 +177,11 @@ async function runForkDetection(options: ForkOptions): Promise<void> {
168177
// Run the test N times
169178
for (let i = 1; i <= options.count; i++) {
170179
// Run the fork test (silently)
171-
runForkTest(options);
180+
const success = runForkTest(options);
181+
if (!success) {
182+
stats.runsWithErrors++;
183+
console.log(`❌ Error in run ${i}/${options.count}`);
184+
}
172185

173186
// Clean and analyze fork logs after the test (suppress output)
174187
const originalConsoleDebug = console.debug;
@@ -201,6 +214,7 @@ async function runForkDetection(options: ForkOptions): Promise<void> {
201214
console.info(`Total forks detected: ${stats.forksDetected}`);
202215
console.info(`Runs with forks: ${stats.runsWithForks}`);
203216
console.info(`Runs without forks: ${stats.runsWithoutForks}`);
217+
console.info(`Runs with errors: ${stats.runsWithErrors}`);
204218
console.info(
205219
`Fork detection rate: ${(
206220
(stats.runsWithForks / stats.totalRuns) *
@@ -242,6 +256,7 @@ async function main() {
242256
env: process.env.XMTP_ENV || "dev",
243257
chaosEnabled: false,
244258
chaosLevel: "medium",
259+
streams: false,
245260
};
246261

247262
// Parse arguments
@@ -310,6 +325,9 @@ async function main() {
310325
process.exit(1);
311326
}
312327
break;
328+
case "--streams":
329+
options.streams = true;
330+
break;
313331
default:
314332
console.error(`Unknown option: ${arg}`);
315333
console.error("Use --help for usage information");

forks/config.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ export const epochRotationOperations = {
2323
export const otherOperations = {
2424
createInstallation: false, // creates a new installation for a random worker
2525
sendMessage: true, // sends a message to the group
26+
sync: true, // syncs the group
2627
};
2728
export const targetEpoch = 30n; // The target epoch to stop the test (epochs are when performing forks to the group)
2829
export const network = process.env.XMTP_ENV; // Network environment setting
29-
export const randomInboxIdsCount = 10; // How many inboxIds to use randomly in the add/remove operations
30+
export const randomInboxIdsCount = 50; // How many inboxIds to use randomly in the add/remove operations
3031
export const installationCount = 2; // How many installations to use randomly in the createInstallation operations
3132
export const testName = "forks";
3233

@@ -68,7 +69,7 @@ export const chaosPresets: Record<ChaosLevel, ChaosPreset> = {
6869
jitterMin: 50,
6970
jitterMax: 200,
7071
lossMin: 0,
71-
lossMax: 10,
72+
lossMax: 25,
7273
interval: 10000, // 10 seconds
7374
},
7475
};
@@ -84,6 +85,9 @@ export const chaosConfig: ChaosConfig = {
8485
level: (process.env.CHAOS_LEVEL as ChaosLevel) || "medium",
8586
};
8687

88+
// Parse streams config from environment
89+
export const streamsEnabled = process.env.STREAMS_ENABLED === "true";
90+
8791
// Multinode container names for local environment chaos testing
8892
export const multinodeContainers = [
8993
"multinode-node1-1",

forks/forks.test.ts

Lines changed: 37 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { getTime } from "@helpers/logger";
22
import { type Group } from "@helpers/versions";
33
import { setupDurationTracking } from "@helpers/vitest";
44
import { getInboxes } from "@inboxes/utils";
5+
import { typeofStream } from "@workers/main";
56
import { getWorkers, type Worker } from "@workers/manager";
6-
import { describe, it } from "vitest";
7+
import { describe, expect, it } from "vitest";
78
import { DockerContainer } from "../network-stability/container";
89
import {
910
chaosConfig,
@@ -17,71 +18,17 @@ import {
1718
otherOperations,
1819
parallelOperations,
1920
randomInboxIdsCount,
21+
streamsEnabled,
2022
targetEpoch,
2123
testName,
2224
workerNames,
23-
type ChaosPreset,
2425
} from "./config";
25-
26-
const startChaos = (
27-
allNodes: DockerContainer[],
28-
preset: ChaosPreset,
29-
): NodeJS.Timeout => {
30-
console.log(`[chaos] Initialized ${allNodes.length} Docker containers`);
31-
32-
// Validate containers are running
33-
for (const node of allNodes) {
34-
try {
35-
// Test if container exists by trying to get its IP
36-
if (!node.ip) {
37-
throw new Error(`Container ${node.name} has no IP address`);
38-
}
39-
} catch {
40-
throw new Error(
41-
`Docker container ${node.name} is not running. Network chaos requires local multinode setup (./dev/up).`,
42-
);
43-
}
44-
}
45-
console.log("[chaos] All Docker containers validated");
46-
47-
// Function to apply chaos to all nodes
48-
const applyChaos = () => {
49-
console.log(
50-
"[chaos] Applying jitter, delay, and drop rules to all nodes...",
51-
);
52-
for (const node of allNodes) {
53-
const delay = Math.floor(
54-
preset.delayMin + Math.random() * (preset.delayMax - preset.delayMin),
55-
);
56-
const jitter = Math.floor(
57-
preset.jitterMin +
58-
Math.random() * (preset.jitterMax - preset.jitterMin),
59-
);
60-
const loss =
61-
preset.lossMin + Math.random() * (preset.lossMax - preset.lossMin);
62-
63-
try {
64-
node.addJitter(delay, jitter);
65-
if (Math.random() < 0.5) node.addLoss(loss);
66-
} catch (err) {
67-
console.warn(`[chaos] Error applying netem on ${node.name}:`, err);
68-
}
69-
}
70-
};
71-
72-
// Apply chaos immediately
73-
applyChaos();
74-
75-
return setInterval(applyChaos, preset.interval);
76-
};
26+
import { clearChaos, startChaos } from "./utils";
7727

7828
describe(testName, () => {
7929
setupDurationTracking({ testName });
8030

81-
const createOperations = async (worker: Worker, group: Group) => {
82-
// This syncs all and can contribute to the fork
83-
await worker.client.conversations.syncAll();
84-
31+
const createOperations = (worker: Worker, group: Group) => {
8532
// Fetches the group from the worker perspective
8633
const getGroup = () =>
8734
worker.client.conversations.getConversationById(
@@ -121,6 +68,7 @@ describe(testName, () => {
12168
getGroup().then((g) =>
12269
g.send(`Message from ${worker.name}`).then(() => {}),
12370
),
71+
sync: () => getGroup().then((g) => g.sync()),
12472
};
12573
};
12674

@@ -129,13 +77,21 @@ describe(testName, () => {
12977
let allNodes: DockerContainer[] = [];
13078
let chaosInterval: NodeJS.Timeout | undefined;
13179
let verifyInterval: NodeJS.Timeout | undefined;
80+
let mustFail = false;
13281

13382
try {
13483
let workers = await getWorkers(workerNames, {
13584
env: network as "local" | "dev" | "production",
13685
nodeBindings: NODE_VERSION,
13786
});
138-
// Note: typeofStreamForTest and typeOfSyncForTest are set to None, so no streams or syncs to start
87+
88+
// Enable message streams if configured
89+
if (streamsEnabled) {
90+
console.log("[streams] Enabling message streams on all workers");
91+
workers.getAll().forEach((worker) => {
92+
worker.worker.startStream(typeofStream.Message);
93+
});
94+
}
13995

14096
// Initialize network chaos if enabled
14197
if (chaosConfig.enabled) {
@@ -159,7 +115,6 @@ describe(testName, () => {
159115
await workers.checkForks();
160116
} catch (e) {
161117
console.warn("[verify] Skipping check due to exception:", e);
162-
throw e;
163118
}
164119
})();
165120
}, 10 * 1000);
@@ -175,6 +130,9 @@ describe(testName, () => {
175130
const group = await workers.createGroupBetweenAll();
176131

177132
let currentEpoch = 0n;
133+
await Promise.all(
134+
workers.getAll().map((w) => w.client.conversations.sync()),
135+
);
178136

179137
while (currentEpoch < targetEpoch) {
180138
const parallelOperationsArray = Array.from(
@@ -186,7 +144,7 @@ describe(testName, () => {
186144
Math.floor(Math.random() * workers.getAll().length)
187145
];
188146

189-
const ops = await createOperations(randomWorker, group);
147+
const ops = createOperations(randomWorker, group);
190148
const operationList = [
191149
...(epochRotationOperations.updateName
192150
? [ops.updateName]
@@ -203,6 +161,7 @@ describe(testName, () => {
203161
? [ops.createInstallation]
204162
: []),
205163
...(otherOperations.sendMessage ? [ops.sendMessage] : []),
164+
...(otherOperations.sync ? [ops.sync] : []),
206165
];
207166

208167
const randomOperation =
@@ -217,11 +176,19 @@ describe(testName, () => {
217176
await randomOperation();
218177
await otherRandomOperation();
219178
} catch (e) {
220-
console.log(`Group ${groupIndex + 1} operation failed:`, e);
179+
console.error(
180+
`Group ${groupIndex + 1} operation failed:`,
181+
e,
182+
);
221183
}
222184
})(),
223185
);
224-
await Promise.all(parallelOperationsArray);
186+
try {
187+
await Promise.all(parallelOperationsArray);
188+
} catch (e) {
189+
console.error(`Group ${groupIndex + 1} operation failed:`, e);
190+
}
191+
225192
await workers.checkForksForGroup(group.id);
226193
currentEpoch = (await group.debugInfo()).epoch;
227194
}
@@ -232,35 +199,29 @@ describe(testName, () => {
232199

233200
await Promise.all(groupOperationPromises);
234201
await workers.checkForks();
235-
} catch (e) {
202+
} catch (e: any) {
236203
console.error("Error during fork testing:", e);
204+
mustFail = true;
237205
} finally {
238206
if (verifyInterval) {
239207
clearInterval(verifyInterval);
240208
}
241209
// Clean up chaos if it was enabled
242210
if (chaosConfig.enabled) {
243211
console.log("[chaos] Cleaning up network chaos...");
244-
245212
// Clear intervals
246213
if (chaosInterval) {
247214
clearInterval(chaosInterval);
248215
}
249216

250-
// Clear network rules
251-
for (const node of allNodes) {
252-
try {
253-
node.clearLatency();
254-
} catch (err) {
255-
console.warn(
256-
`[chaos] Error clearing latency on ${node.name}:`,
257-
err,
258-
);
259-
}
260-
}
217+
clearChaos(allNodes);
261218

262219
console.log("[chaos] Cleanup complete");
263220
}
221+
222+
if (mustFail) {
223+
expect.fail(`Test failed`);
224+
}
264225
}
265226
});
266227
});

0 commit comments

Comments
 (0)