@@ -12,20 +12,24 @@ import {
12
12
verifySignature
13
13
} from './sign.js'
14
14
import type { PeerId } from '@libp2p/interface-peer-id'
15
- import type { IncomingStreamData } from '@libp2p/interface-registrar'
15
+ import type { IncomingStreamData , Registrar } from '@libp2p/interface-registrar'
16
16
import type { Connection } from '@libp2p/interface-connection'
17
17
import { PubSub , Message , StrictNoSign , StrictSign , PubSubInit , PubSubEvents , PeerStreams , PubSubRPCMessage , PubSubRPC , PubSubRPCSubscription , SubscriptionChangeData , PublishResult , TopicValidatorFn , TopicValidatorResult } from '@libp2p/interface-pubsub'
18
18
import { PeerMap , PeerSet } from '@libp2p/peer-collections'
19
- import { Components , Initializable } from '@libp2p/components'
20
19
import type { Uint8ArrayList } from 'uint8arraylist'
21
20
22
21
const log = logger ( 'libp2p:pubsub' )
23
22
23
+ export interface PubSubComponents {
24
+ peerId : PeerId
25
+ registrar : Registrar
26
+ }
27
+
24
28
/**
25
29
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
26
30
* and specifies the API that pubsub routers should have.
27
31
*/
28
- export abstract class PubSubBaseProtocol < Events extends { [ s : string ] : any } = PubSubEvents > extends EventEmitter < Events > implements PubSub < Events > , Initializable {
32
+ export abstract class PubSubBaseProtocol < Events extends { [ s : string ] : any } = PubSubEvents > extends EventEmitter < Events > implements PubSub < Events > {
29
33
public started : boolean
30
34
/**
31
35
* Map of topics to which peers are subscribed to
@@ -60,14 +64,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
60
64
public topicValidators : Map < string , TopicValidatorFn >
61
65
public queue : Queue
62
66
public multicodecs : string [ ]
63
- public components : Components = new Components ( )
67
+ public components : PubSubComponents
64
68
65
69
private _registrarTopologyIds : string [ ] | undefined
66
70
protected enabled : boolean
67
71
private readonly maxInboundStreams : number
68
72
private readonly maxOutboundStreams : number
69
73
70
- constructor ( props : PubSubInit ) {
74
+ constructor ( components : PubSubComponents , props : PubSubInit ) {
71
75
super ( )
72
76
73
77
const {
@@ -80,6 +84,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
80
84
maxOutboundStreams = 1
81
85
} = props
82
86
87
+ this . components = components
83
88
this . multicodecs = ensureArray ( multicodecs )
84
89
this . enabled = props . enabled !== false
85
90
this . started = false
@@ -99,10 +104,6 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
99
104
this . _onPeerDisconnected = this . _onPeerDisconnected . bind ( this )
100
105
}
101
106
102
- init ( components : Components ) {
103
- this . components = components
104
- }
105
-
106
107
// LIFECYCLE METHODS
107
108
108
109
/**
@@ -117,7 +118,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
117
118
118
119
log ( 'starting' )
119
120
120
- const registrar = this . components . getRegistrar ( )
121
+ const registrar = this . components . registrar
121
122
// Incoming streams
122
123
// Called after a peer dials us
123
124
await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . handle ( multicodec , this . _onIncomingStream , {
@@ -145,7 +146,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
145
146
return
146
147
}
147
148
148
- const registrar = this . components . getRegistrar ( )
149
+ const registrar = this . components . registrar
149
150
150
151
// unregister protocol and handlers
151
152
if ( this . _registrarTopologyIds != null ) {
@@ -412,7 +413,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
412
413
* Handles a message from a peer
413
414
*/
414
415
async processMessage ( from : PeerId , msg : Message ) {
415
- if ( this . components . getPeerId ( ) . equals ( from ) && ! this . emitSelf ) {
416
+ if ( this . components . peerId . equals ( from ) && ! this . emitSelf ) {
416
417
return
417
418
}
418
419
@@ -425,7 +426,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
425
426
}
426
427
427
428
if ( this . subscriptions . has ( msg . topic ) ) {
428
- const isFromSelf = this . components . getPeerId ( ) . equals ( from )
429
+ const isFromSelf = this . components . peerId . equals ( from )
429
430
430
431
if ( ! isFromSelf || this . emitSelf ) {
431
432
super . dispatchEvent ( new CustomEvent < Message > ( 'message' , {
@@ -584,7 +585,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
584
585
const signaturePolicy = this . globalSignaturePolicy
585
586
switch ( signaturePolicy ) {
586
587
case 'StrictSign' :
587
- return await signMessage ( this . components . getPeerId ( ) , message , this . encodeMessage . bind ( this ) )
588
+ return await signMessage ( this . components . peerId , message , this . encodeMessage . bind ( this ) )
588
589
case 'StrictNoSign' :
589
590
return await Promise . resolve ( {
590
591
type : 'unsigned' ,
@@ -627,7 +628,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
627
628
}
628
629
629
630
const message = {
630
- from : this . components . getPeerId ( ) ,
631
+ from : this . components . peerId ,
631
632
topic,
632
633
data : data ?? new Uint8Array ( 0 ) ,
633
634
sequenceNumber : randomSeqno ( )
@@ -649,10 +650,10 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
649
650
}
650
651
651
652
// send to all the other peers
652
- const result = await this . publishMessage ( this . components . getPeerId ( ) , rpcMessage )
653
+ const result = await this . publishMessage ( this . components . peerId , rpcMessage )
653
654
654
655
if ( emittedToSelf ) {
655
- result . recipients = [ ...result . recipients , this . components . getPeerId ( ) ]
656
+ result . recipients = [ ...result . recipients , this . components . peerId ]
656
657
}
657
658
658
659
return result
0 commit comments