Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add client && fix etcd config watcher #2

Merged
merged 14 commits into from
Nov 17, 2023
Merged
2 changes: 1 addition & 1 deletion client/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func WithCircuitBreaker(dest, src string, etcdClient etcd.Client, uniqueID int64
for _, f := range opts.EtcdCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
key := "/" + param.Prefix + "/" + param.Path
felix021 marked this conversation as resolved.
Show resolved Hide resolved
cbSuite := initCircuitBreaker(key, dest, src, etcdClient, uniqueID)

return []client.Option{
Expand Down
2 changes: 1 addition & 1 deletion client/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func WithRetryPolicy(dest, src string, etcdClient etcd.Client, uniqueID int64, o
for _, f := range opts.EtcdCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
key := "/" + param.Prefix + "/" + param.Path
rc := initRetryContainer(key, dest, etcdClient, uniqueID)
return []client.Option{
client.WithRetryContainer(rc),
Expand Down
2 changes: 1 addition & 1 deletion client/rpc_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func WithRPCTimeout(dest, src string, etcdClient etcd.Client, uniqueID int64, op
for _, f := range opts.EtcdCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
key := "/" + param.Prefix + "/" + param.Path
return []client.Option{
client.WithTimeoutProvider(initRPCTimeoutContainer(key, dest, etcdClient, uniqueID)),
client.WithCloseCallbacks(func() error {
Expand Down
21 changes: 12 additions & 9 deletions etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ import (
"go.uber.org/zap"
)

var (
m sync.Mutex
ctxMap = make(map[string]context.CancelFunc)
)
var m sync.Mutex

type Key struct {
Prefix string
Expand All @@ -56,6 +53,7 @@ type client struct {
prefixTemplate *template.Template
serverPathTemplate *template.Template
clientPathTemplate *template.Template
cancelMap map[string]context.CancelFunc
felix021 marked this conversation as resolved.
Show resolved Hide resolved
}

// Options etcd config options. All the fields have default value.
Expand Down Expand Up @@ -117,6 +115,7 @@ func NewClient(opts Options) (Client, error) {
prefixTemplate: prefixTemplate,
serverPathTemplate: serverNameTemplate,
clientPathTemplate: clientNameTemplate,
cancelMap: make(map[string]context.CancelFunc),
}
return c, nil
}
Expand Down Expand Up @@ -170,10 +169,7 @@ func (c *client) render(cpc *ConfigParamConfig, t *template.Template) (string, e
func (c *client) RegisterConfigCallback(ctx context.Context, key string, uniqueID int64, callback func(bool, string, ConfigParser)) {
go func() {
clientCtx, cancel := context.WithCancel(context.Background())
m.Lock()
clientKey := key + "/" + strconv.FormatInt(uniqueID, 10)
ctxMap[clientKey] = cancel
m.Unlock()
c.register(key, uniqueID, cancel)
watchChan := c.ecli.Watch(ctx, key)
for {
select {
Expand Down Expand Up @@ -214,7 +210,14 @@ func (c *client) RegisterConfigCallback(ctx context.Context, key string, uniqueI
func (c *client) DeregisterConfig(key string, uniqueID int64) {
m.Lock()
clientKey := key + "/" + strconv.FormatInt(uniqueID, 10)
cancel := ctxMap[clientKey]
cancel := c.cancelMap[clientKey]
cancel()
m.Unlock()
}

func (c *client) register(key string, uniqueID int64, cancel context.CancelFunc) {
felix021 marked this conversation as resolved.
Show resolved Hide resolved
m.Lock()
clientKey := key + "/" + strconv.FormatInt(uniqueID, 10)
c.cancelMap[clientKey] = cancel
felix021 marked this conversation as resolved.
Show resolved Hide resolved
m.Unlock()
}
2 changes: 1 addition & 1 deletion server/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func WithLimiter(dest string, etcdClient etcd.Client, uniqueID int64, opts utils
for _, f := range opts.EtcdCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
key := "/" + param.Prefix + "/" + param.Path
server.RegisterShutdownHook(func() {
etcdClient.DeregisterConfig(key, uniqueID)
})
Expand Down
Loading