Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create streams only after Promise.all() #608

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/resources/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ module.exports = (service, endpoint) => {
const options = QueryOptions.fromODataRequest(params, query);
return Promise.all([
Forms.getFields(form.def.id),
Submissions.streamForExport(form.id, draft, undefined, options),
((params.table === 'Submissions') && options.hasPaging())
? Submissions.countByFormId(form.id, draft, options) : resolve(null)
])
.then(([ fields, stream, count ]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count)));
.then(([ fields, count ]) => Submissions.streamForExport(form.id, draft, undefined, options)
.then((stream) => json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count))));
})));
};

Expand Down
24 changes: 12 additions & 12 deletions lib/resources/submissions.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const { always, identity } = require('ramda');
const { always, identity, call } = require('ramda');
const multer = require('multer');
const sanitize = require('sanitize-filename');
const { Blob, Form, Submission } = require('../model/frames');
Expand Down Expand Up @@ -269,20 +269,20 @@ module.exports = (service, endpoint) => {
const options = QueryOptions.fromSubmissionCsvRequest(query);
return Promise.all([
(options.deletedFields === true) ? Forms.getMergedFields(form.id) : Forms.getFields(form.def.id),
Submissions.streamForExport(form.id, draft, keys, options),
(options.splitSelectMultiples !== true) ? null : Submissions.getSelectMultipleValuesForExport(form.id, draft, options),
SubmissionAttachments.streamForExport(form.id, draft, keys, options),
ClientAudits.streamForExport(form.id, draft, keys, options),
draft ? null : Audits.log(auth.actor, 'form.submission.export', form)
]).then(([ fields, rows, selectValues, attachments, clientAudits ]) => {
]).then(([ fields, selectValues ]) => {
const filename = sanitize(form.xmlFormId);
response.append('Content-Disposition', contentDisposition(`${filename}.zip`));
response.append('Content-Type', 'application/zip');
return zipStreamFromParts(
// TODO: not 100% sure that these streams close right on crash.
streamBriefcaseCsvs(rows, fields, form.xmlFormId, selectValues, decryptor, false, options),
streamAttachments(attachments, decryptor),
streamClientAudits(clientAudits, form, decryptor)
() => Submissions.streamForExport(form.id, draft, keys, options)
.then((rows) => streamBriefcaseCsvs(rows, fields, form.xmlFormId, selectValues, decryptor, false, options)),
() => SubmissionAttachments.streamForExport(form.id, draft, keys, options)
.then((attachments) => streamAttachments(attachments, decryptor)),
() => ClientAudits.streamForExport(form.id, draft, keys, options)
.then((clientAudits) => streamClientAudits(clientAudits, form, decryptor))
);
});
}));
Expand All @@ -295,18 +295,18 @@ module.exports = (service, endpoint) => {
const options = QueryOptions.fromSubmissionCsvRequest(query);
return Promise.all([
(options.deletedFields === true) ? Forms.getMergedFields(form.id) : Forms.getFields(form.def.id),
Submissions.streamForExport(form.id, draft, Object.keys(passphrases), options),
(options.splitSelectMultiples !== true) ? null : Submissions.getSelectMultipleValuesForExport(form.id, draft, options),
Keys.getDecryptor(passphrases),
draft ? null : Audits.log(auth.actor, 'form.submission.export', form)
])
.then(([ fields, rows, selectValues, decryptor ]) => {
.then(([ fields, selectValues, decryptor ]) => {
const filename = sanitize(form.xmlFormId);
const extension = (rootOnly === true) ? 'csv' : 'csv.zip';
response.append('Content-Disposition', contentDisposition(`${filename}.${extension}`));
response.append('Content-Type', (rootOnly === true) ? 'text/csv' : 'application/zip');
const envelope = (rootOnly === true) ? identity : zipStreamFromParts;
return envelope(streamBriefcaseCsvs(rows, fields, form.xmlFormId, selectValues, decryptor, rootOnly, options));
const envelope = (rootOnly === true) ? call : zipStreamFromParts;
return envelope(() => Submissions.streamForExport(form.id, draft, Object.keys(passphrases), options)
.then((rows) => streamBriefcaseCsvs(rows, fields, form.xmlFormId, selectValues, decryptor, rootOnly, options)));
});
});

Expand Down
54 changes: 30 additions & 24 deletions lib/util/zip.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
const { Readable } = require('stream');
const { PartialPipe } = require('./stream');
const archiver = require('archiver');
const { resolve } = require('./promise');

// Returns an object that can add files to an archive, without having that archive
// object directly nor knowing what else is going into it. Call append() to add a
Expand All @@ -36,8 +37,7 @@ const zipPart = () => {
// if the final component in the pipeline emitted the error, archiver would then
// emit it again, but if it was an intermediate component archiver wouldn't know
// about it. by manually aborting, we always emit the error and archiver never does.
const zipStreamFromParts = (...zipParts) => {
let completed = 0;
const zipStreamFromParts = (...zipPartFns) => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend hiding whitespace when viewing the diff for this file.

const resultStream = archiver('zip', { zlib: { level: 9 } });

// track requested callbacks and call them when they are fully added to the zip.
Expand All @@ -47,29 +47,35 @@ const zipStreamFromParts = (...zipParts) => {
if (cb != null) cb();
});

for (const part of zipParts) {
part.stream.on('data', ({ stream, options, cb }) => {
const s = (stream instanceof PartialPipe)
? stream.pipeline((err) => { resultStream.emit('error', err); resultStream.abort(); })
: stream;
const next = () => {
if (zipPartFns.length === 0) {
resultStream.finalize();
return;
}

if (cb == null) {
resultStream.append(s, options);
} else {
// using the String object will result in still an empty comment, but allows
// separate instance equality check when the entry is recorded.
const sentinel = new String(); // eslint-disable-line no-new-wrappers
callbacks.set(sentinel, cb);
resultStream.append(s, Object.assign(options, { comment: sentinel }));
}
});
part.stream.on('error', (err) => { resultStream.emit('error', err); });
part.stream.on('end', () => { // eslint-disable-line no-loop-func
completed += 1;
if (completed === zipParts.length)
resultStream.finalize();
});
}
resolve(zipPartFns.shift()())
Copy link
Member Author

@matthew-white matthew-white Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main difference from the approach in #564. In this PR / with Slonik, stream() returns a Promise of a stream, so zipStreamFromParts() needs some Promise-related logic. However, in #564 / with postgres.js, stream() simply returns a stream.

.then((part) => {
part.stream.on('data', ({ stream, options, cb }) => {
const s = (stream instanceof PartialPipe)
? stream.pipeline((err) => { resultStream.emit('error', err); resultStream.abort(); })
: stream;

if (cb == null) {
resultStream.append(s, options);
} else {
// using the String object will result in still an empty comment, but allows
// separate instance equality check when the entry is recorded.
const sentinel = new String(); // eslint-disable-line no-new-wrappers
callbacks.set(sentinel, cb);
resultStream.append(s, Object.assign(options, { comment: sentinel }));
}
});
part.stream.on('error', (err) => { resultStream.emit('error', err); });
part.stream.on('end', next);
})
.catch((err) => { resultStream.emit('error', err); });
};
next();

return resultStream;
};
Expand Down
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"prompt": "~1",
"ramda": "~0",
"sanitize-filename": "~1",
"slonik": "~23",
"slonik": "npm:@getodk/slonik@23.6.0-1",
"slonik-sql-tag-raw": "1.0.3",
"tmp-promise": "~3",
"uuid": "~3",
Expand Down
Loading