@@ -7,17 +7,29 @@ const Readable = require('stream').Readable
7
7
const OFFLINE_ERROR = require ( '../utils' ) . OFFLINE_ERROR
8
8
const FSUB_ERROR = new Error ( `FloodSub is not started.` )
9
9
10
- module . exports = function floodsub ( self ) {
11
- return {
12
- start : promisify ( ( callback ) => {
13
- if ( ! self . isOnline ( ) ) {
14
- throw OFFLINE_ERROR
15
- }
10
+ /* Internal subscriptions state and functions */
11
+ let subscriptions = { }
16
12
17
- self . _floodsub = new FloodSub ( self . _libp2pNode )
18
- return callback ( null , self . _floodsub )
19
- } ) ,
13
+ const addSubscription = ( topic , request , stream ) => {
14
+ subscriptions [ topic ] = { request : request , stream : stream }
15
+ }
16
+
17
+ const removeSubscription = promisify ( ( topic , callback ) => {
18
+ if ( ! subscriptions [ topic ] ) {
19
+ return callback ( new Error ( `Not subscribed to ${ topic } ` ) )
20
+ }
20
21
22
+ // subscriptions[topic].request.abort()
23
+ // subscriptions[topic].stream.end()
24
+ delete subscriptions [ topic ]
25
+
26
+ if ( callback ) {
27
+ callback ( null )
28
+ }
29
+ } )
30
+
31
+ module . exports = function floodsub ( self ) {
32
+ return {
21
33
subscribe : promisify ( ( topic , options , callback ) => {
22
34
// TODO: Clarify with @diasdavid what to do with the `options.discover` param
23
35
// Ref: https://github.com/ipfs/js-ipfs-api/pull/377/files#diff-f0c61c06fd5dc36b6f760b7ea97b1862R50
@@ -34,11 +46,12 @@ module.exports = function floodsub (self) {
34
46
throw FSUB_ERROR
35
47
}
36
48
37
- let rs = new Readable ( )
38
- rs . cancel = ( ) => self . _floodsub . unsubscribe ( topic )
39
-
49
+ let stream = new Readable ( { objectMode : true } )
50
+ stream . _read = ( ) => { }
51
+
40
52
self . _floodsub . on ( topic , ( data ) => {
41
- rs . emit ( 'data' , {
53
+ console . log ( "DATA" , data . toString ( ) )
54
+ stream . emit ( 'data' , {
42
55
data : data . toString ( ) ,
43
56
topicIDs : [ topic ]
44
57
} )
@@ -50,7 +63,14 @@ module.exports = function floodsub (self) {
50
63
return callback ( err )
51
64
}
52
65
53
- callback ( null , rs )
66
+ stream . cancel = promisify ( ( cb ) => {
67
+ self . _floodsub . unsubscribe ( topic )
68
+ removeSubscription ( topic , cb )
69
+ } )
70
+
71
+ // Add the request to the active subscriptions and return the stream
72
+ addSubscription ( topic , null , stream )
73
+ callback ( null , stream )
54
74
} ) ,
55
75
56
76
publish : promisify ( ( topic , data , callback ) => {
0 commit comments