From 860e4b511d137a930433b680bb16c36420f889c3 Mon Sep 17 00:00:00 2001 From: Jeromy Cannon Date: Thu, 12 Sep 2024 09:36:06 +0100 Subject: [PATCH] added drain pause/resume Signed-off-by: Jeromy Cannon --- src/core/k8.mjs | 59 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/src/core/k8.mjs b/src/core/k8.mjs index 32de5d3f6..4c2baf012 100644 --- a/src/core/k8.mjs +++ b/src/core/k8.mjs @@ -573,26 +573,44 @@ export class K8 { return new Promise((resolve, reject) => { const execInstance = new k8s.Exec(this.kubeConfig) const command = ['tar', 'cf', '-', '-C', srcDir, srcFile] - const writerStream = fs.createWriteStream(tmpFile) - const writerPassthroughStream = new stream.PassThrough({ highWaterMark: 1024 * 1024 }) + const outputFileStream = fs.createWriteStream(tmpFile) + const outputPassthroughStream = new stream.PassThrough({ highWaterMark: 1024 * 1024 }) const errStream = new stream.PassThrough() let additionalErrorMessageDetail = '' // Use pipe() to automatically handle backpressure between streams - writerPassthroughStream.pipe(writerStream); + outputPassthroughStream.pipe(outputFileStream); + + outputPassthroughStream.on('data', (chunk) => { + const canWrite = outputFileStream.write(chunk); // Write chunk to file and check if buffer is full + + if (!canWrite) { + console.log(`Buffer is full, pausing data stream... for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`); + outputPassthroughStream.pause(); // Pause the data stream if buffer is full + } + + }) + + outputFileStream.on('drain', () => { + outputPassthroughStream.resume() + this.logger.debug(`stream drained, resume write for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) + }) execInstance.exec( namespace, podName, containerName, command, - writerStream, + outputFileStream, errStream, null, false, async ({ status }) => { this.logger.debug(`copyFrom.callback(status)=${status}`) - writerStream.close() + outputFileStream.end() + outputFileStream.close(() => { + this.logger.debug(`finished closing writerStream copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) + }) if (status === 'Failure') { self._deleteTempFile(tmpFile) } @@ -600,7 +618,7 @@ export class K8 { .then(conn => { conn.on('close', async (code, reason) => { if (code !== 1000) { // code 1000 is the success code - return reject(new FullstackTestingError(`failed to copy because of error (${code}): ${reason}`)) + return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} because of error (${code}): ${reason}`)) } try { @@ -622,40 +640,49 @@ export class K8 { } } } catch (e) { - return reject(new FullstackTestingError(`failed to extract file: ${destPath}`, e)) + return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} to extract file: ${destPath}`, e)) } - return reject(new FullstackTestingError(`failed to download file completely: ${destPath}${additionalErrorMessageDetail}`)) + return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} to download file completely: ${destPath}${additionalErrorMessageDetail}`)) }) conn.on('error', (e) => { self._deleteTempFile(tmpFile) return reject(new FullstackTestingError( - `failed to copy file ${destPath} because of connection error: ${e.message}`, e)) + `failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} because of connection error: ${e.message}`, e)) }) }) errStream.on('data', (data) => { - return reject(new FullstackTestingError(`error encountered during download of file: ${destPath}, error: ${data.toString()}`)) + return reject(new FullstackTestingError(`error encountered copying from ${podName}:${srcDir}/${srcFile} to ${destPath}, error: ${data.toString()}`)) }) - writerStream.on('close', () => { + outputFileStream.on('close', () => { this.logger.debug(`finished copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) }) - writerStream.on('error', (err) => { - return reject(new FullstackTestingError(`error encountered during download of file: ${destPath}, err: ${err.toString()}`, err)) + outputFileStream.on('error', (err) => { + return reject(new FullstackTestingError(`writerStream error encountered copying from ${podName}:${srcDir}/${srcFile} to ${destPath}, err: ${err.toString()}`, err)) }) - writerStream.on('end', () => { + outputFileStream.on('end', () => { this.logger.debug(`writerStream has ended for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) }) - writerStream.on('finish', () => { - writerStream.end() + outputPassthroughStream.on('end', () => { + this.logger.debug(`writerPassthroughStream has ended for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) + }) + + outputFileStream.on('finish', () => { + outputFileStream.end() this.logger.debug(`stopping copy, writerStream has finished for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) }) + + outputPassthroughStream.on('finish', () => { + outputFileStream.end() + this.logger.debug(`stopping copy, writerPassthroughStream has finished for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`) + }) }) } catch (e) { throw new FullstackTestingError(