|
| 1 | +--- |
| 2 | +--- |
| 3 | + |
| 4 | +## Advanced Topics |
| 5 | + |
| 6 | +### Record Stream Pipeline |
| 7 | + |
| 8 | +Record stream is a stream system which regards records in its stream, similar to Node.js's standard readable/writable streams. |
| 9 | + |
| 10 | +Query object - usually returned by `Connection#query(soql)` / `SObject#find(conditions, fields)` methods - |
| 11 | +is considered as `InputRecordStream` which emits event `record` when received record from server. |
| 12 | + |
| 13 | +Batch object - usually returned by `Bulk-Job#createBatch()` / `Bulk#load(sobjectType, operation, input)` / `SObject#bulkload(operation, input)` methods - |
| 14 | +is considered as `OutputRecordStream` and have `send()` and `end()` method to accept incoming record. |
| 15 | + |
| 16 | +You can use `InputRecordStream#pipe(outputRecordStream)` to pipe record stream. |
| 17 | + |
| 18 | +RecordStream can be converted to usual Node.js's stream object by calling `RecordStream#stream()` method. |
| 19 | + |
| 20 | +By default (and only currently) records are serialized to CSV string. |
| 21 | + |
| 22 | + |
| 23 | +#### Piping Query Record Stream to Batch Record Stream |
| 24 | + |
| 25 | +The idea of record stream pipeline is the base of bulk operation for queried record. |
| 26 | +For example, the same process of `Query#destroy()` can be expressed as following: |
| 27 | + |
| 28 | + |
| 29 | +```javascript |
| 30 | +// |
| 31 | +// This is much more complex version of Query#destroy(). |
| 32 | +// |
| 33 | +var Account = conn.sobject('Account'); |
| 34 | +Account.find({ CreatedDate: { $lt: jsforce.Date.LAST_YEAR }}) |
| 35 | + .pipe(Account.deleteBulk()) |
| 36 | + .on('response', function(rets){ |
| 37 | + // ... |
| 38 | + }) |
| 39 | + .on('error', function(err) { |
| 40 | + // ... |
| 41 | + }); |
| 42 | +``` |
| 43 | + |
| 44 | +And `Query#update(mapping)` can be expressed as following: |
| 45 | + |
| 46 | +```javascript |
| 47 | +// |
| 48 | +// This is much more complex version of Query#update(). |
| 49 | +// |
| 50 | +var Opp = conn.sobject('Opportunity'); |
| 51 | +Opp.find({ "Account.Id" : accId }, |
| 52 | + { Id: 1, Name: 1, "Account.Name": 1 }) |
| 53 | + .pipe(jsforce.RecordStream.map(function(r) { |
| 54 | + return { Id: r.Id, |
| 55 | + Name: r.Account.Name + ' - ' + r.Name }; |
| 56 | + })) |
| 57 | + .pipe(Opp.updateBulk()) |
| 58 | + .on('response', function(rets) { |
| 59 | + // ... |
| 60 | + }) |
| 61 | + .on('error', function(err) { |
| 62 | + // ... |
| 63 | + }); |
| 64 | +``` |
| 65 | + |
| 66 | +Following is an example using `Query#stream()` (inherited `RecordStream#stream()`) to convert record stream to Node.js stream, |
| 67 | +in order to export all queried records to CSV file. |
| 68 | + |
| 69 | +```javascript |
| 70 | +var csvFileOut = require('fs').createWriteStream('path/to/Account.csv'); |
| 71 | +conn.query("SELECT Id, Name, Type, BillingState, BillingCity, BillingStreet FROM Account") |
| 72 | + .stream() // Convert to Node.js's usual readable stream. |
| 73 | + .pipe(csvFileOut); |
| 74 | +``` |
| 75 | + |
| 76 | +#### Record Stream Filtering / Mapping |
| 77 | + |
| 78 | +You can also filter / map queried records to output record stream. |
| 79 | +Static functions like `InputRecordStream#map(mappingFn)` and `InputRecordStream#filter(filterFn)` create a record stream |
| 80 | +which accepts records from upstream and pass to downstream, applying given filtering / mapping function. |
| 81 | + |
| 82 | +```javascript |
| 83 | +// |
| 84 | +// Write down Contact records to CSV, with header name converted. |
| 85 | +// |
| 86 | +conn.sobject('Contact') |
| 87 | + .find({}, { Id: 1, Name: 1 }) |
| 88 | + .map(function(r) { |
| 89 | + return { ID: r.Id, FULL_NAME: r.Name }; |
| 90 | + }) |
| 91 | + .stream().pipe(fs.createWriteStream("Contact.csv")); |
| 92 | +// |
| 93 | +// Write down Lead records to CSV file, |
| 94 | +// eliminating duplicated entry with same email address. |
| 95 | +// |
| 96 | +var emails = {}; |
| 97 | +conn.sobject('Lead') |
| 98 | + .find({}, { Id: 1, Name: 1, Company: 1, Email: 1 }) |
| 99 | + .filter(function(r) { |
| 100 | + var dup = emails[r.Email]; |
| 101 | + if (!dup) { emails[r.Email] = true; } |
| 102 | + return !dup; |
| 103 | + }) |
| 104 | + .stream().pipe(fs.createWriteStream("Lead.csv")); |
| 105 | +``` |
| 106 | + |
| 107 | +Here is much lower level code to achieve the same result using `InputRecordStream#pipe()`. |
| 108 | + |
| 109 | + |
| 110 | +```javascript |
| 111 | +// |
| 112 | +// Write down Contact records to CSV, with header name converted. |
| 113 | +// |
| 114 | +conn.sobject('Contact') |
| 115 | + .find({}, { Id: 1, Name: 1 }) |
| 116 | + .pipe(jsforce.RecordStream.map(function(r) { |
| 117 | + return { ID: r.Id, FULL_NAME: r.Name }; |
| 118 | + })) |
| 119 | + .stream().pipe(fs.createWriteStream("Contact.csv")); |
| 120 | +// |
| 121 | +// Write down Lead records to CSV file, |
| 122 | +// eliminating duplicated entry with same email address. |
| 123 | +// |
| 124 | +var emails = {}; |
| 125 | +conn.sobject('Lead') |
| 126 | + .find({}, { Id: 1, Name: 1, Company: 1, Email: 1 }) |
| 127 | + .pipe(jsforce.RecordStream.filter(function(r) { |
| 128 | + var dup = emails[r.Email]; |
| 129 | + if (!dup) { emails[r.Email] = true; } |
| 130 | + return !dup; |
| 131 | + })) |
| 132 | + .stream().pipe(fs.createWriteStream("Lead.csv")); |
| 133 | +``` |
| 134 | + |
| 135 | +#### Example: Data Migration |
| 136 | + |
| 137 | +By using record stream pipeline, you can achieve data migration in a simple code. |
| 138 | + |
| 139 | +```javascript |
| 140 | +// |
| 141 | +// Connection for org which migrating data from |
| 142 | +// |
| 143 | +var conn1 = new jsforce.Connection({ |
| 144 | + // ... |
| 145 | +}); |
| 146 | +// |
| 147 | +// Connection for org which migrating data to |
| 148 | +// |
| 149 | +var conn2 = new jsforce.Connection({ |
| 150 | + // ... |
| 151 | +}); |
| 152 | +// |
| 153 | +// Get query record stream from Connetin #1 |
| 154 | +// and pipe it to batch record stream from connection #2 |
| 155 | +// |
| 156 | +var query = conn1.query("SELECT Id, Name, Type, BillingState, BillingCity, BillingStreet FROM Account"); |
| 157 | +var job = conn2.bulk.createJob("Account", "insert"); |
| 158 | +var batch = job.createBatch(); |
| 159 | +query.pipe(batch); |
| 160 | +batch.on('queue', function() { |
| 161 | + jobId = job.id; |
| 162 | + batchId = batch.id; |
| 163 | + //... |
| 164 | +}) |
| 165 | +``` |
0 commit comments