diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index d629225a9c..5cf2131921 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -74,6 +74,11 @@ export class DevQueueConsumer { return; } + logger.debug("[DevQueueConsumer] Deprecating background worker", { + backgroundWorker: backgroundWorker.id, + env: this.env.id, + }); + this._deprecatedWorkers.set(id, backgroundWorker); this._backgroundWorkers.delete(id); } @@ -96,9 +101,10 @@ export class DevQueueConsumer { this._backgroundWorkers.set(backgroundWorker.id, backgroundWorker); - logger.debug("Registered background worker", { + logger.debug("[DevQueueConsumer] Registered background worker", { backgroundWorker: backgroundWorker.id, inProgressRuns, + env: this.env.id, }); const subscriber = await devPubSub.subscribe(`backgroundWorker:${backgroundWorker.id}:*`); @@ -138,6 +144,7 @@ export class DevQueueConsumer { logger.debug("[DevQueueConsumer] taskAttemptCompleted()", { taskRunCompletion: completion, execution, + env: this.env.id, }); const service = new CompleteAttemptService(); @@ -151,7 +158,7 @@ export class DevQueueConsumer { public async taskRunFailed(workerId: string, completion: TaskRunFailedExecutionResult) { this._taskFailures++; - logger.debug("[DevQueueConsumer] taskRunFailed()", { completion }); + logger.debug("[DevQueueConsumer] taskRunFailed()", { completion, env: this.env.id }); this._inProgressRuns.delete(completion.id); @@ -188,7 +195,7 @@ export class DevQueueConsumer { return; } - logger.debug("Stopping dev queue consumer", { env: this.env }); + logger.debug("[DevQueueConsumer] Stopping dev queue consumer", { env: this.env }); this._enabled = false; @@ -335,6 +342,10 @@ export class DevQueueConsumer { }); if (!existingTaskRun) { + logger.debug("Failed to find existing task run, acking", { + messageId: message.messageId, + }); + await marqs?.acknowledgeMessage(message.messageId); setTimeout(() => this.#doWork(), 100); return; @@ -346,6 +357,14 @@ export class DevQueueConsumer { : this.#getLatestBackgroundWorker(); if (!backgroundWorker) { + logger.debug("Failed to find background worker, acking", { + messageId: message.messageId, + lockedToVersionId: existingTaskRun.lockedToVersionId, + deprecatedWorkers: Array.from(this._deprecatedWorkers.keys()), + backgroundWorkers: Array.from(this._backgroundWorkers.keys()), + latestWorker: this.#getLatestBackgroundWorker(), + }); + await marqs?.acknowledgeMessage(message.messageId); setTimeout(() => this.#doWork(), 100); return; diff --git a/scripts/unpack-worker.js b/scripts/unpack-worker.js new file mode 100644 index 0000000000..de356960e4 --- /dev/null +++ b/scripts/unpack-worker.js @@ -0,0 +1,68 @@ +const fs = require("fs"); +const zlib = require("zlib"); +const path = require("path"); + +// Get the file paths from command line arguments +let [jsonFilePath, destDir] = process.argv.slice(2); + +if (!jsonFilePath || !destDir) { + console.error("Usage: node script.js "); + process.exit(1); +} + +// Function to decompress the content +function decompressContent(base64Encoded) { + // Decode base64 string to buffer + const compressedData = Buffer.from(base64Encoded, "base64"); + + // Decompress the data + const decompressedData = zlib.inflateSync(compressedData); + + // Convert buffer to string + return decompressedData.toString(); +} + +try { + // Read and parse the JSON file + const jsonContent = fs.readFileSync(jsonFilePath, "utf8"); + + const data = JSON.parse(jsonContent)[0]; + + console.log(data); + + const id = data.id; + + console.log(`Extracting files for: ${id} to ${destDir}`); + + destDir = path.join(destDir, id); + + console.log(`Extracting files to: ${destDir}`); + + // Create the destination directory if it doesn't exist + fs.mkdirSync(destDir, { recursive: true }); + + // Process each item in the array + const sourceFiles = data.metadata.sourceFiles; + + sourceFiles.forEach((file) => { + // Decompress the contents + const decompressedContent = decompressContent(file.contents); + + // Combine destination directory with file path + const fullPath = path.join(destDir, file.filePath); + + // Create directory structure if it doesn't exist + const dirPath = path.dirname(fullPath); + fs.mkdirSync(dirPath, { recursive: true }); + + // Write the decompressed content to the file + fs.writeFileSync(fullPath, decompressedContent); + + console.log(`Created file: ${fullPath}`); + }); + + console.log(`\nAll files have been extracted to: ${destDir}`); +} catch (error) { + console.error(error); + process.exit(1); +}