Skip to content

Commit

Permalink
Fix polaris Subscriber #2026
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj authored Aug 25, 2022
2 parents 19ddecf + f5a933d commit c134397
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
6 changes: 3 additions & 3 deletions registry/polaris/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

type subscriber func(remoting.EventType, []model.Instance)
type item func(remoting.EventType, []model.Instance)

type PolarisServiceWatcher struct {
consumer api.ConsumerAPI
subscribeParam *api.WatchServiceRequest
lock *sync.RWMutex
subscribers []subscriber
subscribers []item
execOnce *sync.Once
}

Expand All @@ -48,7 +48,7 @@ func newPolarisWatcher(param *api.WatchServiceRequest, consumer api.ConsumerAPI)
subscribeParam: param,
consumer: consumer,
lock: &sync.RWMutex{},
subscribers: make([]subscriber, 0),
subscribers: make([]item, 0),
execOnce: &sync.Once{},
}
return watcher, nil
Expand Down
1 change: 1 addition & 0 deletions registry/polaris/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewPolarisListener(url *common.URL) (*polarisListener, error) {
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
}

return listener, nil
}

Expand Down
15 changes: 15 additions & 0 deletions registry/polaris/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) error {

// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
var (
newParam api.WatchServiceRequest
newConsumer api.ConsumerAPI
)

role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
Expand All @@ -163,15 +168,25 @@ func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.No
continue
}

watcher, err := newPolarisWatcher(&newParam, newConsumer)
if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {

serviceEvent, err := listener.Next()

if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return err
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
watcher.startWatch()
}
}
}
Expand Down

0 comments on commit c134397

Please sign in to comment.