-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: data export bulk/resume
#1035
Conversation
[skip ci]
[skip ci]
jsforce's bulk2 query pagination keeps batches in memory and does multiple passes (csv from API -> parse csv -> json -> back to csv). `data export` commands will be doing the pagination manually to improve memory consumption (we parse/write on each batch and drop it)
isState: true, | ||
filename: BulkExportRequestCache.getFileName(), | ||
stateFolder: Global.SF_STATE_FOLDER, | ||
ttl: Duration.days(7), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulk ingest/query job results are available for 7 days after being created:
https://developer.salesforce.com/docs/atlas.en-us.252.0.salesforce_app_limits_cheatsheet.meta/salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_bulkapi.htm
body, | ||
headers, | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small wrapper around jsforce's HttpApi
class to be able to make requests and get response body and body.
conn.request
only returns the body, we need the sforce-locator
header to fetch the next batch.
}; | ||
} | ||
|
||
export async function exportRecords( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't use jsforce's bulk2.query
because it fetches all batches in memory, does 2/3 passes (raw csv from API -> parse -> back to csv) so you get out of memory quickly mid/big queries.
This helper function fetches 1 batch, parses it (only if output=json
, API returns batches as CSV so we can write it to a file), and drops it.
|
||
if (!locator) { | ||
// first write, start JSON array | ||
jsonWritable.write(`[${EOL}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we output all batches as a JSON array of records, this line starts it.
// eslint-disable-next-line no-await-in-loop | ||
await pipeline( | ||
Readable.from(res.body), | ||
new csvParse({ columns: true, delimiter: ColumnDelimiter[outputInfo.columnDelimiter] }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same options as jsforce (columns: true
):
https://github.com/jsforce/jsforce/blob/main/src/csv.ts
sf data export bulk
allows to specify the delimiter so we pass it here for csv parsing.
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type | ||
transform(chunk, _encoding, callback) { | ||
if (recordsWritten === totalRecords - 1) { | ||
callback(null, ` ${JSON.stringify(chunk)}${EOL}]`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if writing the last record, close the JSON array.
await pipeline( | ||
locator | ||
? [ | ||
Readable.from(res.body.slice(res.body.indexOf(EOL) + 1)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if locator
=> we already wrote the first batch so we skip the first line (CSV header) of the next batches to have all merged in 1 CSV file.
{ | ||
name: 'result-format', | ||
// eslint-disable-next-line @typescript-eslint/require-await | ||
when: async (flags): Promise<boolean> => flags['result-format'] === 'csv', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow --column-delimiter
only if exporting as CSV.
{ | ||
name: 'result-format', | ||
// eslint-disable-next-line @typescript-eslint/require-await | ||
when: async (flags): Promise<boolean> => flags['result-format'] === 'csv', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
export type DataExportBulkResult = { | ||
jobId?: string; | ||
totalSize?: number; | ||
filePath: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async export returns filePath
and jobId
sync export returns filePath
and totalSize
); | ||
|
||
expect(totalQty).to.equal(recordCount); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test helper to ensure:
- a valid CSV file was written (it parses it)
- the total number of records processed by the job were written
const lengthRes = await exec(`jq length ${filePath}`, { shell: 'pwsh' }); | ||
|
||
expect(parseInt(lengthRes.stdout.trim(), 10)).equal(totalqty); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test helper to ensure:
- a valid JSON file was written (
jq
fails it not) - the total number of records processed by the job were written
- queried fields were written on each record
} catch (err) { | ||
const error = err as Error; | ||
ms.stop(error); | ||
throw err; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe an enhancement to MultiStageOutput.stop()
would be to accept an unknown so this pattern wouldn't be all over the commands. It's promoting an anti-pattern with err as Error
coercion.
* fix: edit the messages for the two new data bulk commands * Update data.export.bulk.md
pollingOptions: { pollTimeout: 0, pollInterval: 0 }, | ||
} satisfies Pick<ResumeOptions['options'], 'operation' | 'query' | 'pollingOptions'>; | ||
|
||
if (typeof jobIdOrMostRecent === 'boolean') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I was doing overloads for this but eslint suggested it was an overkill 😛
https://typescript-eslint.io/rules/unified-signatures/
[skip ci]
What does this PR do?
This PR adds 2 new commands:
data export bulk
allows to bulk export records as csv or json.
Unlike
data query --bulk
, it can handle million of records by using node streams to export to a file.forcedotcom/cli#1995
data export resume
resume a export operation started by
data export bulk
Testing notes
Checkout the PR, build and
sf plugins link
.You can query the
ScratchOrgInfo
object in our na40 hub (it has ~1.5M records), try exporting multiple fields in differrent formats, monitor memory usage.What issues does this PR fix or reference?
@W-16486240@