Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix polaris Subscriber #2026

Merged
merged 24 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
08af2cc
fix(proto): fix getting attributes issue (#1968)
justxuewei Jul 12, 2022
7017726
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 17, 2022
6747f90
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 17, 2022
20a56d5
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 17, 2022
43cd167
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 17, 2022
c854fba
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 17, 2022
83174a2
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 17, 2022
a8513bd
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 18, 2022
5d7c857
Merge branch 'apache:3.0' into 3.0
jasondeng1997 Aug 18, 2022
511d8e8
Merge branch 'apache:3.0' into 3.0
jasondeng1997 Aug 18, 2022
9543bd9
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 18, 2022
268b46e
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 18, 2022
22aa477
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 20, 2022
833da65
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 20, 2022
b281847
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
54692c3
Merge branch 'apache:3.0' into 3.0
jasondeng1997 Aug 22, 2022
888fee9
Merge branch 'apache:3.0' into 3.0
jasondeng1997 Aug 22, 2022
7fa2a40
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
e304b72
Merge branch 'apache:3.0' into 3.0
jasondeng1997 Aug 22, 2022
3a3335f
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
472d6ef
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
84cf509
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
f6615cc
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
f5a933d
Merge remote-tracking branch 'origin/3.0' into 3.0
jasondeng1997 Aug 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions registry/polaris/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

type subscriber func(remoting.EventType, []model.Instance)
type item func(remoting.EventType, []model.Instance)

type PolarisServiceWatcher struct {
consumer api.ConsumerAPI
subscribeParam *api.WatchServiceRequest
lock *sync.RWMutex
subscribers []subscriber
subscribers []item
execOnce *sync.Once
}

Expand All @@ -48,7 +48,7 @@ func newPolarisWatcher(param *api.WatchServiceRequest, consumer api.ConsumerAPI)
subscribeParam: param,
consumer: consumer,
lock: &sync.RWMutex{},
subscribers: make([]subscriber, 0),
subscribers: make([]item, 0),
execOnce: &sync.Once{},
}
return watcher, nil
Expand Down
1 change: 1 addition & 0 deletions registry/polaris/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewPolarisListener(url *common.URL) (*polarisListener, error) {
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
}

return listener, nil
}

Expand Down
15 changes: 15 additions & 0 deletions registry/polaris/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) error {

// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
var (
newParam api.WatchServiceRequest
newConsumer api.ConsumerAPI
)

role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
Expand All @@ -163,15 +168,25 @@ func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.No
continue
}

watcher, err := newPolarisWatcher(&newParam, newConsumer)
if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {

serviceEvent, err := listener.Next()

if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return err
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
watcher.startWatch()
}
}
}
Expand Down