diff --git a/go.mod b/go.mod index 960c2f3e..898a7e0e 100644 --- a/go.mod +++ b/go.mod @@ -13,4 +13,5 @@ require ( github.com/libp2p/go-msgio v0.2.0 github.com/multiformats/go-multiaddr v0.5.0 github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee + go.uber.org/atomic v1.7.0 ) diff --git a/pubsub.go b/pubsub.go index e44c7f1c..4c627c68 100644 --- a/pubsub.go +++ b/pubsub.go @@ -221,7 +221,7 @@ type Message struct { ID string ReceivedFrom peer.ID ValidatorData interface{} - Local bool + Local bool } func (m *Message) GetFrom() peer.ID { diff --git a/topic.go b/topic.go index 901edcff..d84e52f8 100644 --- a/topic.go +++ b/topic.go @@ -11,6 +11,7 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/atomic" ) // ErrTopicClosed is returned if a Topic is utilized after it has been closed @@ -30,8 +31,7 @@ type Topic struct { evtHandlerMux sync.RWMutex evtHandlers map[*TopicEventHandler]struct{} - mux sync.RWMutex - closed bool + closed atomic.Bool } // String returns the topic associated with t @@ -47,10 +47,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error { return fmt.Errorf("invalid topic score parameters: %w", err) } - t.mux.Lock() - defer t.mux.Unlock() - - if t.closed { + if t.closed.Load() { return ErrTopicClosed } @@ -84,9 +81,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error { // EventHandler creates a handle for topic specific events // Multiple event handlers may be created and will operate independently of each other func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) { - t.mux.RLock() - defer t.mux.RUnlock() - if t.closed { + if t.closed.Load() { return nil, ErrTopicClosed } @@ -141,9 +136,7 @@ func (t *Topic) sendNotification(evt PeerEvent) { // Note that subscription is not an instantaneous operation. It may take some time // before the subscription is processed by the pubsub main loop and propagated to our peers. func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { - t.mux.RLock() - defer t.mux.RUnlock() - if t.closed { + if t.closed.Load() { return nil, ErrTopicClosed } @@ -184,9 +177,7 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { // cancel function. Subsequent calls increase the reference counter. // To completely disable the relay, all references must be cancelled. func (t *Topic) Relay() (RelayCancelFunc, error) { - t.mux.RLock() - defer t.mux.RUnlock() - if t.closed { + if t.closed.Load() { return nil, ErrTopicClosed } @@ -215,16 +206,14 @@ type ProvideKey func() (crypto.PrivKey, peer.ID) type PublishOptions struct { ready RouterReady customKey ProvideKey - local bool + local bool } type PubOpt func(pub *PublishOptions) error // Publish publishes data to topic. func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error { - t.mux.RLock() - defer t.mux.RUnlock() - if t.closed { + if t.closed.Load() { return ErrTopicClosed } @@ -347,9 +336,7 @@ func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt { // Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. // Does not error if the topic is already closed. func (t *Topic) Close() error { - t.mux.Lock() - defer t.mux.Unlock() - if t.closed { + if t.closed.Load() { return nil } @@ -364,7 +351,7 @@ func (t *Topic) Close() error { err := <-req.resp if err == nil { - t.closed = true + t.closed.Swap(true) } return err @@ -372,9 +359,7 @@ func (t *Topic) Close() error { // ListPeers returns a list of peers we are connected to in the given topic. func (t *Topic) ListPeers() []peer.ID { - t.mux.RLock() - defer t.mux.RUnlock() - if t.closed { + if t.closed.Load() { return []peer.ID{} }