This repository has been archived by the owner on Jun 21, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Provider.js
104 lines (99 loc) · 3.29 KB
/
Provider.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
const EventEmitter = require('events').EventEmitter;
const CIDUtil = require('../../../common/CIDUtil');
const EngCID = require('../../../common/EngCID');
const parallel = require('async/parallel');
const pull = require('pull-stream');
const streams = require('../streams');
const constants = require('../../../common/constants');
class Provider extends EventEmitter {
constructor(enigmaNode, logger) {
super();
this._enigmaNode = enigmaNode;
this._logger = logger;
streams.setGlobalState({logger: this._logger, providerContext: this});
}
/** provide content in a batch of CID's
* @param {Array<String>} descriptorsList - each element is a byte representation of some content
* currently it's secret contract addresses
* //TODO:: remove withEngCid to default true, leave now for compatability
* @param {Boolean} withEngCid , if false: generate ecid
* @param {Function} callback - (err,listOfFailedEngCIDs) = >{}
* */
provideContentsBatch(descriptorsList, withEngCid, callback) {
let engCIDs = descriptorsList;
if (!withEngCid) {
engCIDs = descriptorsList.map((desc)=>{
const h = CIDUtil.hashKeccack256(desc);
return EngCID.createFromKeccack256(h);
});
}
const jobs = [];
engCIDs.forEach((ecid)=>{
jobs.push((cb)=>{
this._enigmaNode.provideContent(ecid, (err, ecid)=>{
cb(null, {error: err, ecid: ecid});
});
});
});
parallel(jobs, (err, results)=>{
let isError = false;
const failedCids = [];
results.map((r)=>{
if (r.error) {
isError = true;
failedCids.push(r.ecid);
}
});
callback(isError, failedCids);
});
}
/** async version of provideContentsBatch */
asyncProvideContentsBatch(engCids){
return new Promise((resolve,reject)=>{
this.provideContentsBatch(engCids,true,(err,failedCids)=>{
if(err){
return reject(err);
}
resolve(failedCids);
});
});
}
/** stream related methods
* MUST CONTAIN a "notification" field
* specifying the concrete Action
* */
notify(params) {
this.emit('notify', params);
}
/**
* Calls the worker/DbRequestAction
* @param {JSON} request ,must contain the fields:
* - onResponse(err,response)=>{}
* - queryType, describe the query type needed
* */
dbRequest(request) {
if (request.hasOwnProperty('onResponse') && request.hasOwnProperty('dbQueryType')) {
if (request.dbQueryType === constants.CORE_REQUESTS.GetDeltas) {
request.notification = constants.NODE_NOTIFICATIONS.GET_DELTAS;
} else if (request.dbQueryType === constants.CORE_REQUESTS.GetContract) {
request.notification = constants.NODE_NOTIFICATIONS.GET_CONTRACT_BCODE;
}
this.notify(request);
}
}
startStateSyncResponse(connectionStream) {
pull(
// read msg requests one-by-one
connectionStream,
// parse the message
streams.requestParserStream,
// get the requested data from db (i.e array of deltas)
streams.fromDbStream,
// serialize the database result into a network stream
streams.toNetworkParser,
// send the result to the msg request back to the receiver
connectionStream
);
}
}
module.exports = Provider;