Skip to content

Commit 96bd80a

Browse files
committed
refactor(satellite): extract ProcessManager into composed handler classes
1 parent 715e9b5 commit 96bd80a

15 files changed

+2229
-1565
lines changed

services/satellite/src/core/mcp-server-wrapper.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { FastifyBaseLogger, FastifyRequest, FastifyReply, FastifyInstance } from
1010
import { randomUUID } from 'crypto';
1111
import { AsyncLocalStorage } from 'async_hooks';
1212
import { UnifiedToolDiscoveryManager } from '../services/unified-tool-discovery-manager';
13-
import { ProcessManager } from '../process/manager';
13+
import { ProcessManager } from '../process';
1414
import { ToolSearchService } from '../services/tool-search-service';
1515
import { DynamicConfigManager } from '../services/dynamic-config-manager';
1616
import { OAuthTokenService } from '../services/oauth-token-service';

services/satellite/src/jobs/idle-process-cleanup-job.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { Job, JobStats } from './base-job';
2-
import { ProcessManager } from '../process/manager';
3-
import { RuntimeState } from '../process/runtime-state';
2+
import { ProcessManager, RuntimeState } from '../process';
43
import { IDLE_TIMEOUT_MS, SPAWN_GRACE_PERIOD_MS } from '../config/process';
54
import { FastifyBaseLogger } from 'fastify';
65

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
import { Logger } from 'pino';
2+
import { ProcessInfo, MCPServerConfig } from './types';
3+
import type { RuntimeState } from './runtime-state';
4+
import type { EventBus } from '../services/event-bus';
5+
6+
/**
7+
* Callback type for spawning a new process
8+
*/
9+
export type SpawnFunction = (config: MCPServerConfig) => Promise<ProcessInfo>;
10+
11+
/**
12+
* Callback type for terminating a process
13+
*/
14+
export type TerminateFunction = (processInfo: ProcessInfo, timeout?: number) => Promise<void>;
15+
16+
/**
17+
* Callback type for getting a process by name
18+
*/
19+
export type GetProcessFunction = (installationName: string) => ProcessInfo | null;
20+
21+
/**
22+
* DormantManager handles dormant process tracking and respawning
23+
* Manages idle process termination and on-demand respawning
24+
*/
25+
export class DormantManager {
26+
private respawningProcesses = new Map<string, Promise<ProcessInfo>>(); // installationName -> respawn promise
27+
28+
constructor(
29+
private logger: Logger,
30+
private runtimeState: RuntimeState | undefined,
31+
private eventBus?: EventBus
32+
) {}
33+
34+
/**
35+
* Get all dormant process names from RuntimeState
36+
*/
37+
getAllDormantProcessNames(): string[] {
38+
if (!this.runtimeState) {
39+
return [];
40+
}
41+
return this.runtimeState.getAllDormantProcessNames();
42+
}
43+
44+
/**
45+
* Get or respawn a process if it's dormant
46+
* This method checks active processes first, then dormant configs, and respawns if needed
47+
* Prevents concurrent respawn attempts for the same process
48+
*/
49+
async getOrRespawnProcess(
50+
installationName: string,
51+
getProcess: GetProcessFunction,
52+
spawnProcess: SpawnFunction
53+
): Promise<ProcessInfo> {
54+
// Check if process is already active
55+
const existingProcess = getProcess(installationName);
56+
if (existingProcess && existingProcess.status === 'running') {
57+
return existingProcess;
58+
}
59+
60+
// Check if process is currently being respawned
61+
const respawningPromise = this.respawningProcesses.get(installationName);
62+
if (respawningPromise) {
63+
this.logger.debug({
64+
operation: 'dormant_process_respawn_waiting',
65+
installation_name: installationName
66+
}, `Waiting for in-progress respawn: ${installationName}`);
67+
return await respawningPromise;
68+
}
69+
70+
// Check if process config exists in dormant map
71+
if (!this.runtimeState) {
72+
throw new Error(`Process ${installationName} not found and RuntimeState not available`);
73+
}
74+
75+
const dormantConfig = this.runtimeState.getDormantConfig(installationName);
76+
if (!dormantConfig) {
77+
throw new Error(`Process ${installationName} not found in active or dormant maps`);
78+
}
79+
80+
// Start respawning process
81+
const respawnStartTime = Date.now();
82+
this.logger.info({
83+
operation: 'dormant_process_respawn_start',
84+
installation_name: installationName,
85+
team_id: dormantConfig.team_id
86+
}, `Respawning dormant process: ${installationName}`);
87+
88+
// Create respawn promise to prevent concurrent attempts
89+
const respawnPromise = (async () => {
90+
try {
91+
// Spawn the process
92+
const processInfo = await spawnProcess(dormantConfig);
93+
94+
// Remove from dormant map
95+
this.runtimeState!.removeDormantConfig(installationName);
96+
97+
const dormantDuration = respawnStartTime - (processInfo.startTime - 1000); // Approximate
98+
99+
this.logger.info({
100+
operation: 'dormant_process_respawned',
101+
installation_name: installationName,
102+
team_id: dormantConfig.team_id,
103+
respawn_duration_ms: Date.now() - respawnStartTime,
104+
dormant_duration_ms: dormantDuration,
105+
pid: processInfo.process.pid
106+
}, `Dormant process respawned successfully: ${installationName}`);
107+
108+
// Emit mcp.server.respawned event
109+
try {
110+
this.eventBus?.emit('mcp.server.respawned', {
111+
server_id: dormantConfig.installation_id,
112+
server_slug: installationName,
113+
team_id: dormantConfig.team_id,
114+
user_id: dormantConfig.user_id,
115+
process_id: processInfo.process.pid || 0,
116+
dormant_duration_seconds: Math.round(dormantDuration / 1000),
117+
respawn_duration_ms: Date.now() - respawnStartTime
118+
});
119+
} catch (error) {
120+
this.logger.warn({ error }, 'Failed to emit mcp.server.respawned event (non-fatal)');
121+
}
122+
123+
return processInfo;
124+
125+
} finally {
126+
// Remove from respawning map
127+
this.respawningProcesses.delete(installationName);
128+
}
129+
})();
130+
131+
// Store respawn promise
132+
this.respawningProcesses.set(installationName, respawnPromise);
133+
134+
return await respawnPromise;
135+
}
136+
137+
/**
138+
* Terminate a process and mark it as dormant for later respawning
139+
*/
140+
async terminateAndMarkDormant(
141+
installationName: string,
142+
getProcess: GetProcessFunction,
143+
terminateProcess: TerminateFunction,
144+
timeout: number = 10000
145+
): Promise<void> {
146+
const processInfo = getProcess(installationName);
147+
if (!processInfo) {
148+
this.logger.warn({
149+
operation: 'terminate_dormant_not_found',
150+
installation_name: installationName
151+
}, `Process not found for dormant marking: ${installationName}`);
152+
return;
153+
}
154+
155+
if (!this.runtimeState) {
156+
this.logger.error({
157+
operation: 'terminate_dormant_no_runtime_state',
158+
installation_name: installationName
159+
}, 'Cannot mark process as dormant: RuntimeState not available');
160+
return;
161+
}
162+
163+
const idleDuration = Date.now() - processInfo.lastActivity;
164+
165+
this.logger.info({
166+
operation: 'process_marked_dormant_start',
167+
installation_name: installationName,
168+
team_id: processInfo.config.team_id,
169+
idle_duration_ms: idleDuration,
170+
last_activity: new Date(processInfo.lastActivity).toISOString()
171+
}, `Marking process as dormant due to inactivity: ${installationName}`);
172+
173+
// Store config in dormant map before terminating
174+
this.runtimeState.markProcessDormant(installationName, processInfo.config);
175+
176+
// Mark as dormant shutdown to skip crash detection
177+
processInfo.isDormantShutdown = true;
178+
179+
// Emit mcp.server.dormant event
180+
try {
181+
this.eventBus?.emit('mcp.server.dormant', {
182+
server_id: processInfo.config.installation_id,
183+
server_slug: installationName,
184+
team_id: processInfo.config.team_id,
185+
user_id: processInfo.config.user_id,
186+
process_id: processInfo.process.pid || 0,
187+
idle_duration_seconds: Math.round(idleDuration / 1000),
188+
last_activity_at: new Date(processInfo.lastActivity).toISOString()
189+
});
190+
} catch (error) {
191+
this.logger.warn({ error }, 'Failed to emit mcp.server.dormant event (non-fatal)');
192+
}
193+
194+
// Terminate the process
195+
await terminateProcess(processInfo, timeout);
196+
197+
this.logger.info({
198+
operation: 'process_marked_dormant_success',
199+
installation_name: installationName,
200+
team_id: processInfo.config.team_id
201+
}, `Process marked as dormant and terminated: ${installationName}`);
202+
}
203+
204+
/**
205+
* Check if a process is currently being respawned
206+
*/
207+
isRespawning(installationName: string): boolean {
208+
return this.respawningProcesses.has(installationName);
209+
}
210+
211+
/**
212+
* Get count of currently respawning processes
213+
*/
214+
getRespawningCount(): number {
215+
return this.respawningProcesses.size;
216+
}
217+
218+
/**
219+
* Set the EventBus reference (for late initialization)
220+
* Called when EventBus becomes available after construction
221+
*/
222+
setEventBus(eventBus: EventBus): void {
223+
this.eventBus = eventBus;
224+
}
225+
}

0 commit comments

Comments
 (0)