This repository has been archived by the owner on Dec 29, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.js
172 lines (165 loc) · 5.08 KB
/
listener.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
* prefix [listener] to all logs
*/
const lLog = function() {
args = [];
args.push('[listener] ');
// Note: arguments is part of the prototype
for(let i = 0; i < arguments.length; i++) {
args.push( arguments[i] );
}
console.log.apply(console, args);
};
/*
* start keeping track of a specific query (or fileReq)
*/
function listenFor(topic, query, fileReq=false) {
let list = ipfsearch.watchlist.q;
let rlist = ipfsearch.results.q;
if (fileReq) {
list = ipfsearch.watchlist.f;
rlist = ipfsearch.results.f;
}
if (list[topic] === undefined) {
list[topic] = [];
}
if (list[topic].indexOf(query) > -1) {
return lLog(`Error: Already listening for ${query} on ${ipfsearch.topic}${topic}`);
}
// add to watchlist
list[topic].push(query);
if (rlist[topic] === undefined) {
rlist[topic] = {};
}
rlist[topic][query] = [];
}
/*
* stop keeping track of a specific query (or fileReq)
*/
function stopListening(topic, query, fileReq=false) {
let list = ipfsearch.watchlist.q;
let rlist = ipfsearch.results.q;
if (fileReq) {
list = ipfsearch.watchlist.f;
rlist = ipfsearch.results.f;
}
// remove from watchlist
list[topic].splice(list[topic].indexOf(query), 1);
// return caught messages
const r = rlist[topic][query].slice();
rlist[topic][query] = undefined;
return r
}
/*
* called on received messages.
* Just prints them.
*/
const receiveMsg = async (msg) => {
const us = await ipfs.id();
if (msg.from === us.id) {
return; // Ignoring our own messages
}
const data = JSON.parse(msg.data.toString());
const topic = msg.topicIDs[0].substr(ipfsearch.topic.length);
if (data.event === 'query') {
stats.searchReceived += 1;
if (ipfsearch.subOwners[topic] !== undefined && ipfsearch.subOwners[topic].indexOf(0) > -1) {
const searchResults = await searchLocal(topic, data.query);
if (searchResults.length > 0) {
Publisher.pubAnswer(topic, data.query, searchResults);
}
} else {
stats.droppedMsg += 1;
}
return;
}
if (data.event === 'answer') {
stats.searchReceived += 1;
if (ipfsearch.watchlist.q[topic] !== undefined && ipfsearch.watchlist.q[topic].indexOf(data.query) > -1) {
let rlist = ipfsearch.results.q;
rlist[topic][data.query] = util.uniquify(rlist[topic][data.query].concat(data.payload));
} else {
stats.droppedMsg += 1;
}
return;
}
if (data.event === 'fileReq') {
stats.soReceived += 1;
if (ipfsearch.subOwners[data.query] === undefined || ipfsearch.subOwners[data.query].indexOf(0) === -1) {
const files = await offerFiles(data.query);
if (files.length > 0) {
Publisher.pubFileRes(topic, data.query, files);
}
} else {
stats.droppedMsg += 1;
}
return;
}
if (data.event === 'fileRes') {
stats.soReceived += 1;
if (ipfsearch.watchlist.f[topic] !== undefined && ipfsearch.watchlist.f[topic].indexOf(data.query) > -1) {
let rlist = ipfsearch.results.f;
rlist[topic][data.query] = util.uniquify(rlist[topic][data.query].concat(data.payload));
} else {
stats.droppedMsg += 1;
}
return;
}
};
/*
* subscribe to a topic-channel
* returns ownerId to keep track of temporary channels
*/
async function sub(topic) {
topic = String(topic);
if (ipfsearch.subbedTopics.indexOf(topic) > -1) {
const tOwners = ipfsearch.subOwners[topic];
const id = tOwners[tOwners.length - 1] + 1
tOwners.push(id)
lLog(`Already listening on channel ${ipfsearch.topic}${topic}, id: ${id}`);
return id;
}
ipfsearch.subbedTopics.push(topic);
ipfsearch.subOwners[topic] = [0];
await ipfs.pubsub.subscribe(`${ipfsearch.topic}${topic}`, receiveMsg)
.then(() => {
lLog(`Listening on channel ${ipfsearch.topic}${topic}`);})
.catch((err) => {
lLog(`Error: Failed to subscribe to ${ipfsearch.topic}${topic}`);
lLog(err);
});
return 0;
}
/*
* unsubscribe from a topic
*/
function unsub(topic, owner) {
topic = String(topic);
const tOwners = ipfsearch.subOwners[topic];
if (owner === -1) {
while (tOwners.length > 0) tOwners.pop();
} else if (tOwners !== undefined && tOwners.indexOf(owner) >= 0) {
tOwners.splice(tOwners.indexOf(owner), 1);
}
if (tOwners !== undefined && tOwners.length < 1 && ipfsearch.subbedTopics.indexOf(topic) >= 0) {
return ipfs.pubsub.unsubscribe(`${ipfsearch.topic}${topic}`)
.then(() => {
ipfsearch.subbedTopics.splice(ipfsearch.subbedTopics.indexOf(topic), 1);
lLog(`Unsubscibed from channel ${ipfsearch.topic}${topic}`)})
.catch(e => { lLog(`Couldn't unsub: ${e}`); });
}
}
/*
* Unsubscribes from all channels (in the subbedTopics array)
*/
async function unsubAll() {
lLog('Unsubscribing all channels...');
await Promise.all(ipfsearch.subbedTopics.map((topic) => { return unsub(topic, -1); }));
if (ipfsearch.subbedTopics.length === 0) return;
lLog('Error: Did not unsubscribe from all channels.');
}
module.exports.listenFor = listenFor;
module.exports.stopListening = stopListening;
module.exports.sub = sub;
module.exports.unsub = unsub;
module.exports.unsubAll = unsubAll;