Skip to content

Fix deploy timeout issues #1661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/tender-cycles-melt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fixed deploy timeout issues and improve the output of logs when deploying
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { FinalizeDeploymentV2Service } from "~/v3/services/finalizeDeploymentV2";
import { FinalizeDeploymentV2Service } from "~/v3/services/finalizeDeploymentV2.server";

const ParamsSchema = z.object({
deploymentId: z.string(),
Expand Down
104 changes: 104 additions & 0 deletions apps/webapp/app/routes/api.v3.deployments.$deploymentId.finalize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { FinalizeDeploymentRequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { FinalizeDeploymentV2Service } from "~/v3/services/finalizeDeploymentV2.server";

const ParamsSchema = z.object({
deploymentId: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

const parsedParams = ParamsSchema.safeParse(params);

if (!parsedParams.success) {
return json({ error: "Invalid params" }, { status: 400 });
}

// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
logger.info("Invalid or missing api key", { url: request.url });
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const authenticatedEnv = authenticationResult.environment;

const { deploymentId } = parsedParams.data;

const rawBody = await request.json();
const body = FinalizeDeploymentRequestBody.safeParse(rawBody);

if (!body.success) {
return json({ error: "Invalid body", issues: body.error.issues }, { status: 400 });
}

try {
// Create a text stream chain
const stream = new TransformStream();
const encoder = new TextEncoderStream();
const writer = stream.writable.getWriter();

const service = new FinalizeDeploymentV2Service();

// Chain the streams: stream -> encoder -> response
const response = new Response(stream.readable.pipeThrough(encoder), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});

const pingInterval = setInterval(() => {
writer.write("event: ping\ndata: {}\n\n");
}, 10000); // 10 seconds

service
.call(authenticatedEnv, deploymentId, body.data, writer)
.then(async () => {
clearInterval(pingInterval);

await writer.write(`event: complete\ndata: ${JSON.stringify({ id: deploymentId })}\n\n`);
await writer.close();
})
.catch(async (error) => {
let errorMessage;

if (error instanceof ServiceValidationError) {
errorMessage = { error: error.message };
} else if (error instanceof Error) {
logger.error("Error finalizing deployment", { error: error.message });
errorMessage = { error: `Internal server error: ${error.message}` };
} else {
logger.error("Error finalizing deployment", { error: String(error) });
errorMessage = { error: "Internal server error" };
}

clearInterval(pingInterval);

await writer.write(`event: error\ndata: ${JSON.stringify(errorMessage)}\n\n`);
await writer.close();
});

return response;
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 400 });
} else if (error instanceof Error) {
logger.error("Error finalizing deployment", { error: error.message });
return json({ error: `Internal server error: ${error.message}` }, { status: 500 });
} else {
logger.error("Error finalizing deployment", { error: String(error) });
return json({ error: "Internal server error" }, { status: 500 });
}
}
}
6 changes: 6 additions & 0 deletions apps/webapp/app/v3/services/finalizeDeployment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ export class FinalizeDeploymentService extends BaseService {
throw new ServiceValidationError("Worker deployment does not have a worker");
}

if (deployment.status === "DEPLOYED") {
logger.debug("Worker deployment is already deployed", { id });

return deployment;
}

if (deployment.status !== "DEPLOYING") {
logger.error("Worker deployment is not in DEPLOYING status", { id });
throw new ServiceValidationError("Worker deployment is not in DEPLOYING status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ export class FinalizeDeploymentV2Service extends BaseService {
public async call(
authenticatedEnv: AuthenticatedEnvironment,
id: string,
body: FinalizeDeploymentRequestBody
body: FinalizeDeploymentRequestBody,
writer?: WritableStreamDefaultWriter
) {
// if it's self hosted, lets just use the v1 finalize deployment service
if (body.selfHosted) {
Expand Down Expand Up @@ -83,24 +84,27 @@ export class FinalizeDeploymentV2Service extends BaseService {
throw new ServiceValidationError("Missing depot token");
}

const pushResult = await executePushToRegistry({
depot: {
buildId: externalBuildData.data.buildId,
orgToken: env.DEPOT_TOKEN,
projectId: externalBuildData.data.projectId,
},
registry: {
host: env.DEPLOY_REGISTRY_HOST,
namespace: env.DEPLOY_REGISTRY_NAMESPACE,
username: env.DEPLOY_REGISTRY_USERNAME,
password: env.DEPLOY_REGISTRY_PASSWORD,
},
deployment: {
version: deployment.version,
environmentSlug: deployment.environment.slug,
projectExternalRef: deployment.worker.project.externalRef,
const pushResult = await executePushToRegistry(
{
depot: {
buildId: externalBuildData.data.buildId,
orgToken: env.DEPOT_TOKEN,
projectId: externalBuildData.data.projectId,
},
registry: {
host: env.DEPLOY_REGISTRY_HOST,
namespace: env.DEPLOY_REGISTRY_NAMESPACE,
username: env.DEPLOY_REGISTRY_USERNAME,
password: env.DEPLOY_REGISTRY_PASSWORD,
},
deployment: {
version: deployment.version,
environmentSlug: deployment.environment.slug,
projectExternalRef: deployment.worker.project.externalRef,
},
},
});
writer
);

if (!pushResult.ok) {
throw new ServiceValidationError(pushResult.error);
Expand Down Expand Up @@ -148,11 +152,10 @@ type ExecutePushResult =
logs: string;
};

async function executePushToRegistry({
depot,
registry,
deployment,
}: ExecutePushToRegistryOptions): Promise<ExecutePushResult> {
async function executePushToRegistry(
{ depot, registry, deployment }: ExecutePushToRegistryOptions,
writer?: WritableStreamDefaultWriter
): Promise<ExecutePushResult> {
// Step 1: We need to "login" to the digital ocean registry
const configDir = await ensureLoggedIntoDockerRegistry(registry.host, {
username: registry.username,
Expand Down Expand Up @@ -180,7 +183,7 @@ async function executePushToRegistry({
try {
const processCode = await new Promise<number | null>((res, rej) => {
// For some reason everything is output on stderr, not stdout
childProcess.stderr?.on("data", (data: Buffer) => {
childProcess.stderr?.on("data", async (data: Buffer) => {
const text = data.toString();

// Emitted data chunks can contain multiple lines. Remove empty lines.
Expand All @@ -191,6 +194,13 @@ async function executePushToRegistry({
imageTag,
deployment,
});

// Now we can write strings directly
if (writer) {
for (const line of lines) {
await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
}
}
Comment on lines +198 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider error handling for stream writes.

The code writes to the stream without handling potential write errors. Stream writes can fail, especially in case of network issues or if the client disconnects.

Apply this diff to add error handling:

 if (writer) {
   for (const line of lines) {
-    await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
+    try {
+      await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
+    } catch (error) {
+      logger.error("Failed to write to stream", { error });
+      break;
+    }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Now we can write strings directly
if (writer) {
for (const line of lines) {
await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
}
}
// Now we can write strings directly
if (writer) {
for (const line of lines) {
try {
await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
} catch (error) {
logger.error("Failed to write to stream", { error });
break;
}
}
}

});

childProcess.on("error", (e) => rej(e));
Expand Down
65 changes: 57 additions & 8 deletions packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
FailDeploymentResponseBody,
FinalizeDeploymentRequestBody,
} from "@trigger.dev/core/v3";
import { zodfetch, ApiError } from "@trigger.dev/core/v3/zodfetch";
import { zodfetch, ApiError, zodfetchSSE } from "@trigger.dev/core/v3/zodfetch";

export class CliApiClient {
constructor(
Expand Down Expand Up @@ -247,23 +247,72 @@ export class CliApiClient {
);
}

async finalizeDeployment(id: string, body: FinalizeDeploymentRequestBody) {
async finalizeDeployment(
id: string,
body: FinalizeDeploymentRequestBody,
onLog?: (message: string) => void
): Promise<ApiResult<FailDeploymentResponseBody>> {
if (!this.accessToken) {
throw new Error("finalizeDeployment: No access token");
}

return wrapZodFetch(
FailDeploymentResponseBody,
`${this.apiURL}/api/v2/deployments/${id}/finalize`,
{
let resolvePromise: (value: ApiResult<FailDeploymentResponseBody>) => void;
let rejectPromise: (reason: any) => void;

const promise = new Promise<ApiResult<FailDeploymentResponseBody>>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});

const source = zodfetchSSE({
url: `${this.apiURL}/api/v3/deployments/${id}/finalize`,
request: {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
}
);
},
messages: {
error: z.object({ error: z.string() }),
log: z.object({ message: z.string() }),
complete: FailDeploymentResponseBody,
},
});

source.onConnectionError((error) => {
rejectPromise({
success: false,
error,
});
});

source.onMessage("complete", (message) => {
resolvePromise({
success: true,
data: message,
});
});

source.onMessage("error", ({ error }) => {
rejectPromise({
success: false,
error,
});
});

if (onLog) {
source.onMessage("log", ({ message }) => {
onLog(message);
});
}

const result = await promise;

source.stop();

return result;
}

async startDeploymentIndexing(deploymentId: string, body: StartDeploymentIndexingRequestBody) {
Expand Down
39 changes: 31 additions & 8 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
forcedExternals,
listener: {
onBundleStart() {
$buildSpinner.start("Building project");
$buildSpinner.start("Building trigger code");
},
onBundleComplete(result) {
$buildSpinner.stop("Successfully built project");
$buildSpinner.stop("Successfully built code");

logger.debug("Bundle result", result);
},
Expand Down Expand Up @@ -328,9 +328,9 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
const $spinner = spinner();

if (isLinksSupported) {
$spinner.start(`Deploying version ${version} ${deploymentLink}`);
$spinner.start(`Building version ${version} ${deploymentLink}`);
} else {
$spinner.start(`Deploying version ${version}`);
$spinner.start(`Building version ${version}`);
}

const selfHostedRegistryHost = deployment.registryHost ?? options.registry;
Expand Down Expand Up @@ -359,6 +359,13 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
compilationPath: destination.path,
buildEnvVars: buildManifest.build.env,
network: options.network,
onLog: (logMessage) => {
if (isLinksSupported) {
$spinner.message(`Building version ${version} ${deploymentLink}: ${logMessage}`);
} else {
$spinner.message(`Building version ${version}: ${logMessage}`);
}
},
});

logger.debug("Build result", buildResult);
Expand Down Expand Up @@ -426,10 +433,26 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
}`
: `${buildResult.image}${buildResult.digest ? `@${buildResult.digest}` : ""}`;

const finalizeResponse = await projectClient.client.finalizeDeployment(deployment.id, {
imageReference,
selfHosted: options.selfHosted,
});
if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}`);
} else {
$spinner.message(`Deploying version ${version}`);
}

const finalizeResponse = await projectClient.client.finalizeDeployment(
deployment.id,
{
imageReference,
selfHosted: options.selfHosted,
},
(logMessage) => {
if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}: ${logMessage}`);
} else {
$spinner.message(`Deploying version ${version}: ${logMessage}`);
}
}
);

if (!finalizeResponse.success) {
await failDeploy(
Expand Down
Loading