Skip to content

Commit

Permalink
added drain pause/resume
Browse files Browse the repository at this point in the history
Signed-off-by: Jeromy Cannon <jeromy@swirldslabs.com>
  • Loading branch information
jeromy-cannon committed Sep 12, 2024
1 parent 6b7459c commit 860e4b5
Showing 1 changed file with 43 additions and 16 deletions.
59 changes: 43 additions & 16 deletions src/core/k8.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -573,34 +573,52 @@ export class K8 {
return new Promise((resolve, reject) => {
const execInstance = new k8s.Exec(this.kubeConfig)
const command = ['tar', 'cf', '-', '-C', srcDir, srcFile]
const writerStream = fs.createWriteStream(tmpFile)
const writerPassthroughStream = new stream.PassThrough({ highWaterMark: 1024 * 1024 })
const outputFileStream = fs.createWriteStream(tmpFile)
const outputPassthroughStream = new stream.PassThrough({ highWaterMark: 1024 * 1024 })
const errStream = new stream.PassThrough()
let additionalErrorMessageDetail = ''

// Use pipe() to automatically handle backpressure between streams
writerPassthroughStream.pipe(writerStream);
outputPassthroughStream.pipe(outputFileStream);

outputPassthroughStream.on('data', (chunk) => {
const canWrite = outputFileStream.write(chunk); // Write chunk to file and check if buffer is full

if (!canWrite) {
console.log(`Buffer is full, pausing data stream... for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`);
outputPassthroughStream.pause(); // Pause the data stream if buffer is full
}

})

outputFileStream.on('drain', () => {
outputPassthroughStream.resume()
this.logger.debug(`stream drained, resume write for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})

execInstance.exec(
namespace,
podName,
containerName,
command,
writerStream,
outputFileStream,
errStream,
null,
false,
async ({ status }) => {
this.logger.debug(`copyFrom.callback(status)=${status}`)
writerStream.close()
outputFileStream.end()
outputFileStream.close(() => {
this.logger.debug(`finished closing writerStream copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})
if (status === 'Failure') {
self._deleteTempFile(tmpFile)
}
})
.then(conn => {
conn.on('close', async (code, reason) => {
if (code !== 1000) { // code 1000 is the success code
return reject(new FullstackTestingError(`failed to copy because of error (${code}): ${reason}`))
return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} because of error (${code}): ${reason}`))
}

try {
Expand All @@ -622,40 +640,49 @@ export class K8 {
}
}
} catch (e) {
return reject(new FullstackTestingError(`failed to extract file: ${destPath}`, e))
return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} to extract file: ${destPath}`, e))
}

return reject(new FullstackTestingError(`failed to download file completely: ${destPath}${additionalErrorMessageDetail}`))
return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} to download file completely: ${destPath}${additionalErrorMessageDetail}`))
})

conn.on('error', (e) => {
self._deleteTempFile(tmpFile)
return reject(new FullstackTestingError(
`failed to copy file ${destPath} because of connection error: ${e.message}`, e))
`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} because of connection error: ${e.message}`, e))
})
})

errStream.on('data', (data) => {
return reject(new FullstackTestingError(`error encountered during download of file: ${destPath}, error: ${data.toString()}`))
return reject(new FullstackTestingError(`error encountered copying from ${podName}:${srcDir}/${srcFile} to ${destPath}, error: ${data.toString()}`))
})

writerStream.on('close', () => {
outputFileStream.on('close', () => {
this.logger.debug(`finished copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})

writerStream.on('error', (err) => {
return reject(new FullstackTestingError(`error encountered during download of file: ${destPath}, err: ${err.toString()}`, err))
outputFileStream.on('error', (err) => {
return reject(new FullstackTestingError(`writerStream error encountered copying from ${podName}:${srcDir}/${srcFile} to ${destPath}, err: ${err.toString()}`, err))

})

writerStream.on('end', () => {
outputFileStream.on('end', () => {
this.logger.debug(`writerStream has ended for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})

writerStream.on('finish', () => {
writerStream.end()
outputPassthroughStream.on('end', () => {
this.logger.debug(`writerPassthroughStream has ended for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})

outputFileStream.on('finish', () => {
outputFileStream.end()
this.logger.debug(`stopping copy, writerStream has finished for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})

outputPassthroughStream.on('finish', () => {
outputFileStream.end()
this.logger.debug(`stopping copy, writerPassthroughStream has finished for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
})
})
} catch (e) {
throw new FullstackTestingError(
Expand Down

0 comments on commit 860e4b5

Please sign in to comment.