Skip to content

Commit e0333f8

Browse files
perf(api): stream wallet address import process
1 parent 0eac2ab commit e0333f8

File tree

2 files changed

+150
-46
lines changed

2 files changed

+150
-46
lines changed

packages/bitcore-node/src/models/walletAddress.ts

Lines changed: 149 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { BaseModel } from './base';
55
import { IWallet } from './wallet';
66
import { TransactionStorage } from './transaction';
77
import { StorageService } from '../services/storage';
8+
import { partition } from '../utils/partition';
9+
import { Readable, Transform, Writable } from 'stream';
810

911
export type IWalletAddress = {
1012
wallet: ObjectID;
@@ -38,60 +40,162 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
3840
const { wallet, addresses } = params;
3941
const { chain, network } = wallet;
4042

41-
const unprocessedAddresses: Array<string> = [];
42-
for (let address of addresses) {
43-
const updatedAddress = await this.collection.findOneAndUpdate(
44-
{
45-
wallet: wallet._id,
46-
address: address,
47-
chain,
48-
network
49-
},
50-
{ $setOnInsert: { wallet: wallet._id, address: address, chain, network } },
51-
{ returnOriginal: false, upsert: true }
52-
);
53-
if (!updatedAddress.value!.processed) {
54-
unprocessedAddresses.push(address);
55-
await CoinStorage.collection.updateMany({ chain, network, address }, { $addToSet: { wallets: wallet._id } });
43+
class AddressInputStream extends Readable {
44+
addressBatches: string[][];
45+
index: number;
46+
constructor() {
47+
super({ objectMode: true});
48+
this.addressBatches = partition(addresses, 1000);
49+
this.index = 0;
50+
}
51+
_read() {
52+
if (this.index < this.addressBatches.length) {
53+
this.push(this.addressBatches[this.index]);
54+
this.index++;
55+
}
56+
else {
57+
this.push(null);
58+
}
5659
}
5760
}
5861

59-
let coinStream = CoinStorage.collection
60-
.find({ wallets: wallet._id, 'wallets.0': { $exists: true } })
61-
.project({ spentTxid: 1, mintTxid: 1, address: 1 })
62-
.addCursorFlag('noCursorTimeout', true);
63-
let txids = {};
64-
coinStream.on('data', (coin: ICoin) => {
65-
coinStream.pause();
66-
if (!unprocessedAddresses.includes(coin.address)) {
67-
return coinStream.resume();
62+
class FilterExistingAddressesStream extends Transform {
63+
constructor() {
64+
super({objectMode: true});
6865
}
69-
if (!txids[coin.mintTxid]) {
70-
TransactionStorage.collection.updateMany(
71-
{ txid: coin.mintTxid, network, chain },
72-
{ $addToSet: { wallets: wallet._id } }
73-
);
66+
async _transform(addressBatch, _, callback) {
67+
let exists = (await WalletAddressStorage.collection.find({ wallet: wallet._id, address: { $in: addressBatch } }).toArray())
68+
.filter(walletAddress => walletAddress.processed)
69+
.map(walletAddress => walletAddress.address);
70+
callback(null, addressBatch.filter(address => {
71+
return !exists.includes(address);
72+
}));
7473
}
75-
txids[coin.mintTxid] = true;
76-
if (coin.spentTxid && !txids[coin.spentTxid]) {
77-
TransactionStorage.collection.updateMany(
78-
{ txid: coin.spentTxid, network, chain },
74+
}
75+
76+
class AddNewAddressesStream extends Transform {
77+
constructor() {
78+
super({objectMode: true});
79+
}
80+
async _transform(addressBatch, _, callback) {
81+
if (!addressBatch.length) {
82+
return callback();
83+
}
84+
await WalletAddressStorage.collection.bulkWrite(addressBatch.map(address => {
85+
return {
86+
updateOne: {
87+
filter: { chain, network, wallet: wallet._id, address },
88+
update: { $setOnInsert: { chain, network, wallet: wallet._id, address }},
89+
upsert: true
90+
}
91+
}
92+
})), { ordered: false};
93+
callback(null, addressBatch);
94+
}
95+
}
96+
97+
class UpdateCoinsStream extends Transform {
98+
constructor() {
99+
super({objectMode: true});
100+
}
101+
async _transform(addressBatch, _, callback) {
102+
if (!addressBatch.length) {
103+
return callback();
104+
}
105+
await WalletAddressStorage.collection.bulkWrite(addressBatch.map(address => {
106+
return {
107+
updateMany: {
108+
filter: { chain, network, address },
109+
update: { $addToSet: { wallets: wallet._id } }
110+
}
111+
};
112+
}), { ordered: false });
113+
callback(null, addressBatch);
114+
}
115+
}
116+
117+
class UpdatedTxidsStream extends Transform {
118+
txids: { [key: string]: boolean };
119+
constructor() {
120+
super({ objectMode: true });
121+
this.txids = {};
122+
}
123+
async _transform(addressBatch, _, callback) {
124+
if (!addressBatch.length) {
125+
return callback();
126+
}
127+
const coinStream = CoinStorage.collection.find({chain, network, address: {$in: addressBatch}});
128+
coinStream.on('data', (coin: ICoin) => {
129+
if (!this.txids[coin.mintTxid]){
130+
this.txids[coin.mintTxid] = true;
131+
this.push({ txid: coin.mintTxid});
132+
}
133+
if (!this.txids[coin.spentTxid]) {
134+
this.txids[coin.spentTxid] = true;
135+
this.push({ txid: coin.spentTxid});
136+
}
137+
});
138+
coinStream.on('end', () => {
139+
callback(null, {addressBatch})
140+
});
141+
}
142+
}
143+
144+
class TxUpdaterStream extends Transform {
145+
constructor() {
146+
super({ objectMode: true });
147+
}
148+
async _transform(data, _, callback) {
149+
const { txid, addressBatch } = data;
150+
if (addressBatch){
151+
return callback(null, addressBatch);
152+
}
153+
await TransactionStorage.collection.updateMany(
154+
{ chain, network, txid },
79155
{ $addToSet: { wallets: wallet._id } }
80156
);
157+
callback();
81158
}
82-
txids[coin.spentTxid] = true;
83-
return coinStream.resume();
84-
});
85-
return new Promise(resolve => {
86-
coinStream.on('end', async () => {
87-
for (const address of unprocessedAddresses) {
88-
await this.collection.updateOne(
89-
{ chain, network, address, wallet: wallet._id },
90-
{ $set: { processed: true } }
91-
);
159+
}
160+
161+
class MarkProcessedStream extends Writable {
162+
constructor() {
163+
super({ objectMode: true });
164+
}
165+
async _write(addressBatch, _, callback) {
166+
if (!addressBatch.length) {
167+
return callback();
92168
}
93-
resolve();
94-
});
169+
await WalletAddressStorage.collection.bulkWrite(addressBatch.map(address => {
170+
return {
171+
updateOne: {
172+
filter: { chain, network, address, wallet: wallet._id },
173+
update: { $set: { processed: true } }
174+
}
175+
}
176+
}));
177+
callback();
178+
}
179+
}
180+
181+
const addressInputStream = new AddressInputStream();
182+
const filterExistingAddressesStream = new FilterExistingAddressesStream();
183+
const addNewAddressesStream = new AddNewAddressesStream();
184+
const updateCoinsStream = new UpdateCoinsStream();
185+
const updatedTxidsStream = new UpdatedTxidsStream();
186+
const txUpdaterStream = new TxUpdaterStream();
187+
const markProcessedStream = new MarkProcessedStream();
188+
189+
addressInputStream
190+
.pipe(filterExistingAddressesStream)
191+
.pipe(addNewAddressesStream)
192+
.pipe(updateCoinsStream)
193+
.pipe(updatedTxidsStream)
194+
.pipe(txUpdaterStream)
195+
.pipe(markProcessedStream);
196+
197+
markProcessedStream.on('end', () => {
198+
return Promise.resolve();
95199
});
96200
}
97201
}

packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ export declare namespace CSP {
122122
broadcastTransaction(params: BroadcastTransactionParams): Promise<any>;
123123
createWallet(params: CreateWalletParams): Promise<IWallet>;
124124
getWallet(params: GetWalletParams): Promise<IWallet | null>;
125-
updateWallet(params: UpdateWalletParams): Promise<{}>;
125+
updateWallet(params: UpdateWalletParams): Promise<void>;
126126
getWalletBalance(params: GetWalletBalanceParams): Promise<{ confirmed: number, unconfirmed: number, balance: number }>;
127127
streamAddressUtxos(params: StreamAddressUtxosParams): any;
128128
streamAddressTransactions(params: StreamAddressUtxosParams): any;

0 commit comments

Comments
 (0)