Skip to content

Commit

Permalink
fix: app subprocess restart on error and better reporting (#34193)
Browse files Browse the repository at this point in the history
Co-authored-by: Guilherme Gazzo <5263975+ggazzo@users.noreply.github.com>
Co-authored-by: Douglas Gubert <1810309+d-gubert@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 17, 2024
1 parent b7427fa commit 78694f3
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 45 deletions.
5 changes: 5 additions & 0 deletions .changeset/giant-nails-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Adds simple app subprocess metrics report
5 changes: 5 additions & 0 deletions .changeset/honest-kings-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Attempts to restart an app subprocess if the spawn command fails
5 changes: 5 additions & 0 deletions .changeset/quiet-radios-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Fixes an issue while collecting the error message from a failed restart attempt of an app subprocess
5 changes: 5 additions & 0 deletions .changeset/young-dots-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Prevents app:getStatus requests from timing out in some cases
56 changes: 28 additions & 28 deletions packages/apps-engine/deno-runtime/handlers/app/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,41 @@ import handleOnUpdate from './handleOnUpdate.ts';
export default async function handleApp(method: string, params: unknown): Promise<Defined | JsonRpcError> {
const [, appMethod] = method.split(':');

// We don't want the getStatus method to generate logs, so we handle it separately
if (appMethod === 'getStatus') {
return handleGetStatus();
}
try {
// We don't want the getStatus method to generate logs, so we handle it separately
if (appMethod === 'getStatus') {
return await handleGetStatus();
}

// `app` will be undefined if the method here is "app:construct"
const app = AppObjectRegistry.get<App>('app');
// `app` will be undefined if the method here is "app:construct"
const app = AppObjectRegistry.get<App>('app');

app?.getLogger().debug(`'${appMethod}' is being called...`);
app?.getLogger().debug(`'${appMethod}' is being called...`);

if (uikitInteractions.includes(appMethod)) {
return handleUIKitInteraction(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}
if (uikitInteractions.includes(appMethod)) {
return handleUIKitInteraction(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}

return result;
});
}
return result;
});
}

if (appMethod.startsWith('check') || appMethod.startsWith('execute')) {
return handleListener(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}
if (appMethod.startsWith('check') || appMethod.startsWith('execute')) {
return handleListener(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}

return result;
});
}
return result;
});
}

try {
let result: Defined | JsonRpcError;

switch (appMethod) {
Expand Down
4 changes: 4 additions & 0 deletions packages/apps-engine/deno-runtime/lib/messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export const Queue = new (class Queue {
this.queue.push(encoder.encode(message));
this.processQueue();
}

public getCurrentSize() {
return this.queue.length;
}
});

export const Transport = new (class Transporter {
Expand Down
20 changes: 20 additions & 0 deletions packages/apps-engine/deno-runtime/lib/metricsCollector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { writeAll } from "https://deno.land/std@0.216.0/io/write_all.ts";
import { Queue } from "./messenger.ts";

export function collectMetrics() {
return {
queueSize: Queue.getCurrentSize(),
}
};

const encoder = new TextEncoder();

export async function sendMetrics() {
const metrics = collectMetrics();

await writeAll(Deno.stderr, encoder.encode(JSON.stringify(metrics)));
}

export function startMetricsReport() {
setInterval(sendMetrics, 5000);
}
3 changes: 3 additions & 0 deletions packages/apps-engine/deno-runtime/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import apiHandler from './handlers/api-handler.ts';
import handleApp from './handlers/app/handler.ts';
import handleScheduler from './handlers/scheduler-handler.ts';
import registerErrorListeners from './error-handlers.ts';
import { startMetricsReport } from "./lib/metricsCollector.ts";

type Handlers = {
app: typeof handleApp;
Expand Down Expand Up @@ -130,3 +131,5 @@ async function main() {
registerErrorListeners();

main();

startMetricsReport();
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {

logger.info('Successfully restarted app subprocess');
} catch (e) {
logger.error("Failed to restart app's subprocess", { error: e });
logger.error("Failed to restart app's subprocess", { error: e.message || e });
} finally {
await this.logStorage.storeEntries(AppConsole.toStorageEntry(this.getAppId(), logger));
}
Expand All @@ -321,18 +321,24 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private waitUntilReady(): Promise<void> {
if (this.state === 'ready') {
return;
}

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => reject(new Error(`[${this.getAppId()}] Timeout: app process not ready`)), this.options.timeout);
let timeoutId: NodeJS.Timeout;

if (this.state === 'ready') {
const handler = () => {
clearTimeout(timeoutId);
return resolve();
}
resolve();
};

this.once('ready', () => {
clearTimeout(timeoutId);
return resolve();
});
timeoutId = setTimeout(() => {
this.off('ready', handler);
reject(new Error(`[${this.getAppId()}] Timeout: app process not ready`));
}, this.options.timeout);

this.once('ready', handler);
});
}

Expand Down Expand Up @@ -636,6 +642,12 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private async parseError(chunk: Buffer): Promise<void> {
console.error('Subprocess stderr', chunk.toString());
try {
const data = JSON.parse(chunk.toString());

this.debug('Metrics received from subprocess: %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
}
}
15 changes: 11 additions & 4 deletions packages/apps-engine/src/server/runtime/deno/LivenessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class LivenessManager {

this.controller.once('ready', () => this.ping());
this.subprocess.once('exit', this.handleExit.bind(this));
this.subprocess.once('error', this.handleError.bind(this));
}

/**
Expand Down Expand Up @@ -155,6 +156,11 @@ export class LivenessManager {
this.messenger.send(COMMAND_PING);
}

private handleError(err: Error) {
this.debug('App has failed to start.`', err);
this.restartProcess();
}

private handleExit(exitCode: number, signal: string) {
this.pingAbortController.emit('abort');

Expand All @@ -174,21 +180,22 @@ export class LivenessManager {
this.restartProcess();
}

private restartProcess() {
private async restartProcess() {
if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
return;
}

this.pingTimeoutConsecutiveCount = 0;
this.restartCount++;
this.restartLog.push({
restartedAt: new Date(),
source: 'liveness-manager',
pid: this.subprocess.pid,
});

this.controller.restartApp();
await this.controller.restartApp();

this.pingTimeoutConsecutiveCount = 0;
this.restartCount++;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ChildProcess } from 'child_process';
import type { ChildProcess } from 'child_process';

import type { JsonRpc } from 'jsonrpc-lite';

import { encoder } from './codec';

export class ProcessMessenger {
private deno: ChildProcess;
private deno: ChildProcess | undefined;

private _sendStrategy: (message: JsonRpc) => void;

Expand All @@ -30,7 +30,7 @@ export class ProcessMessenger {
}

private switchStrategy() {
if (this.deno instanceof ChildProcess) {
if (this.deno?.stdin?.writable) {
this._sendStrategy = this.strategySend.bind(this);
} else {
this._sendStrategy = this.strategyError.bind(this);
Expand Down

0 comments on commit 78694f3

Please sign in to comment.