@@ -29,7 +29,7 @@ export interface PubSubComponents {
29
29
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
30
30
* and specifies the API that pubsub routers should have.
31
31
*/
32
- export abstract class PubSubBaseProtocol < Events extends { [ s : string ] : any } = PubSubEvents > extends EventEmitter < Events > implements PubSub < Events > {
32
+ export abstract class PubSubBaseProtocol < Events extends Record < string , any > = PubSubEvents > extends EventEmitter < Events > implements PubSub < Events > {
33
33
public started : boolean
34
34
/**
35
35
* Map of topics to which peers are subscribed to
@@ -108,10 +108,8 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
108
108
109
109
/**
110
110
* Register the pubsub protocol onto the libp2p node.
111
- *
112
- * @returns {void }
113
111
*/
114
- async start ( ) {
112
+ async start ( ) : Promise < void > {
115
113
if ( this . started || ! this . enabled ) {
116
114
return
117
115
}
@@ -121,10 +119,12 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
121
119
const registrar = this . components . registrar
122
120
// Incoming streams
123
121
// Called after a peer dials us
124
- await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . handle ( multicodec , this . _onIncomingStream , {
125
- maxInboundStreams : this . maxInboundStreams ,
126
- maxOutboundStreams : this . maxOutboundStreams
127
- } ) ) )
122
+ await Promise . all ( this . multicodecs . map ( async multicodec => {
123
+ await registrar . handle ( multicodec , this . _onIncomingStream , {
124
+ maxInboundStreams : this . maxInboundStreams ,
125
+ maxOutboundStreams : this . maxOutboundStreams
126
+ } )
127
+ } ) )
128
128
129
129
// register protocol with topology
130
130
// Topology callbacks called on connection manager changes
@@ -141,7 +141,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
141
141
/**
142
142
* Unregister the pubsub protocol and the streams with other peers will be closed.
143
143
*/
144
- async stop ( ) {
144
+ async stop ( ) : Promise < void > {
145
145
if ( ! this . started || ! this . enabled ) {
146
146
return
147
147
}
@@ -150,10 +150,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
150
150
151
151
// unregister protocol and handlers
152
152
if ( this . _registrarTopologyIds != null ) {
153
- this . _registrarTopologyIds ?. map ( id => registrar . unregister ( id ) )
153
+ this . _registrarTopologyIds ?. forEach ( id => {
154
+ registrar . unregister ( id )
155
+ } )
154
156
}
155
157
156
- await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . unhandle ( multicodec ) ) )
158
+ await Promise . all ( this . multicodecs . map ( async multicodec => {
159
+ await registrar . unhandle ( multicodec )
160
+ } ) )
157
161
158
162
log ( 'stopping' )
159
163
for ( const peerStreams of this . peers . values ( ) ) {
@@ -166,14 +170,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
166
170
log ( 'stopped' )
167
171
}
168
172
169
- isStarted ( ) {
173
+ isStarted ( ) : boolean {
170
174
return this . started
171
175
}
172
176
173
177
/**
174
178
* On an inbound stream opened
175
179
*/
176
- protected _onIncomingStream ( data : IncomingStreamData ) {
180
+ protected _onIncomingStream ( data : IncomingStreamData ) : void {
177
181
const { stream, connection } = data
178
182
const peerId = connection . remotePeer
179
183
@@ -186,13 +190,13 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
186
190
const inboundStream = peer . attachInboundStream ( stream )
187
191
188
192
this . processMessages ( peerId , inboundStream , peer )
189
- . catch ( err => log ( err ) )
193
+ . catch ( err => { log ( err ) } )
190
194
}
191
195
192
196
/**
193
197
* Registrar notifies an established connection with pubsub protocol
194
198
*/
195
- protected _onPeerConnected ( peerId : PeerId , conn : Connection ) {
199
+ protected _onPeerConnected ( peerId : PeerId , conn : Connection ) : void {
196
200
log ( 'connected %p' , peerId )
197
201
198
202
void Promise . resolve ( ) . then ( async ( ) => {
@@ -221,7 +225,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
221
225
/**
222
226
* Registrar notifies a closing connection with pubsub protocol
223
227
*/
224
- protected _onPeerDisconnected ( peerId : PeerId , conn ?: Connection ) {
228
+ protected _onPeerDisconnected ( peerId : PeerId , conn ?: Connection ) : void {
225
229
const idB58Str = peerId . toString ( )
226
230
227
231
log ( 'connection ended' , idB58Str )
@@ -258,7 +262,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
258
262
/**
259
263
* Notifies the router that a peer has been disconnected
260
264
*/
261
- protected _removePeer ( peerId : PeerId ) {
265
+ protected _removePeer ( peerId : PeerId ) : PeerStreams | undefined {
262
266
const peerStreams = this . peers . get ( peerId )
263
267
if ( peerStreams == null ) {
264
268
return
@@ -284,7 +288,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
284
288
/**
285
289
* Responsible for processing each RPC message received by other peers.
286
290
*/
287
- async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8ArrayList > , peerStreams : PeerStreams ) {
291
+ async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8ArrayList > , peerStreams : PeerStreams ) : Promise < void > {
288
292
try {
289
293
await pipe (
290
294
stream ,
@@ -320,7 +324,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
320
324
} ) ) ,
321
325
messages
322
326
} )
323
- . catch ( err => log ( err ) )
327
+ . catch ( err => { log ( err ) } )
324
328
}
325
329
}
326
330
)
@@ -378,7 +382,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
378
382
log . error ( err )
379
383
}
380
384
} ) )
381
- . catch ( err => log ( err ) )
385
+ . catch ( err => { log ( err ) } )
382
386
}
383
387
384
388
return true
@@ -387,7 +391,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
387
391
/**
388
392
* Handles a subscription change from a peer
389
393
*/
390
- processRpcSubOpt ( id : PeerId , subOpt : PubSubRPCSubscription ) {
394
+ processRpcSubOpt ( id : PeerId , subOpt : PubSubRPCSubscription ) : void {
391
395
const t = subOpt . topic
392
396
393
397
if ( t == null ) {
@@ -412,7 +416,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
412
416
/**
413
417
* Handles a message from a peer
414
418
*/
415
- async processMessage ( from : PeerId , msg : Message ) {
419
+ async processMessage ( from : PeerId , msg : Message ) : Promise < void > {
416
420
if ( this . components . peerId . equals ( from ) && ! this . emitSelf ) {
417
421
return
418
422
}
@@ -442,7 +446,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
442
446
* The default msgID implementation
443
447
* Child class can override this.
444
448
*/
445
- getMsgId ( msg : Message ) {
449
+ getMsgId ( msg : Message ) : Promise < Uint8Array > | Uint8Array {
446
450
const signaturePolicy = this . globalSignaturePolicy
447
451
switch ( signaturePolicy ) {
448
452
case 'StrictSign' :
@@ -470,7 +474,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
470
474
* Whether to accept a message from a peer
471
475
* Override to create a graylist
472
476
*/
473
- acceptFrom ( id : PeerId ) {
477
+ acceptFrom ( id : PeerId ) : boolean {
474
478
return true
475
479
}
476
480
@@ -495,10 +499,10 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
495
499
/**
496
500
* Send an rpc object to a peer
497
501
*/
498
- send ( peer : PeerId , data : { messages ?: Message [ ] , subscriptions ?: string [ ] , subscribe ?: boolean } ) {
502
+ send ( peer : PeerId , data : { messages ?: Message [ ] , subscriptions ?: string [ ] , subscribe ?: boolean } ) : void {
499
503
const { messages, subscriptions, subscribe } = data
500
504
501
- return this . sendRpc ( peer , {
505
+ this . sendRpc ( peer , {
502
506
subscriptions : ( subscriptions ?? [ ] ) . map ( str => ( { topic : str , subscribe : Boolean ( subscribe ) } ) ) ,
503
507
messages : ( messages ?? [ ] ) . map ( toRpcMessage )
504
508
} )
@@ -507,7 +511,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
507
511
/**
508
512
* Send an rpc object to a peer
509
513
*/
510
- sendRpc ( peer : PeerId , rpc : PubSubRPC ) {
514
+ sendRpc ( peer : PeerId , rpc : PubSubRPC ) : void {
511
515
const peerStreams = this . peers . get ( peer )
512
516
513
517
if ( peerStreams == null || ! peerStreams . isWritable ) {
@@ -523,7 +527,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
523
527
* Validates the given message. The signature will be checked for authenticity.
524
528
* Throws an error on invalid messages
525
529
*/
526
- async validate ( from : PeerId , message : Message ) { // eslint-disable-line require-await
530
+ async validate ( from : PeerId , message : Message ) : Promise < void > { // eslint-disable-line require-await
527
531
const signaturePolicy = this . globalSignaturePolicy
528
532
switch ( signaturePolicy ) {
529
533
case 'StrictNoSign' :
@@ -671,7 +675,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
671
675
/**
672
676
* Subscribes to a given topic.
673
677
*/
674
- subscribe ( topic : string ) {
678
+ subscribe ( topic : string ) : void {
675
679
if ( ! this . started ) {
676
680
throw new Error ( 'Pubsub has not started' )
677
681
}
@@ -690,7 +694,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
690
694
/**
691
695
* Unsubscribe from the given topic
692
696
*/
693
- unsubscribe ( topic : string ) {
697
+ unsubscribe ( topic : string ) : void {
694
698
if ( ! this . started ) {
695
699
throw new Error ( 'Pubsub is not started' )
696
700
}
@@ -713,7 +717,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
713
717
/**
714
718
* Get the list of topics which the peer is subscribed to.
715
719
*/
716
- getTopics ( ) {
720
+ getTopics ( ) : string [ ] {
717
721
if ( ! this . started ) {
718
722
throw new Error ( 'Pubsub is not started' )
719
723
}
0 commit comments