Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add etcd && server #1

Merged
merged 27 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7fdf43d
feat:add etcd && server
ViolaPioggia Oct 28, 2023
d43b57b
style:gofumpt
ViolaPioggia Oct 28, 2023
4cb6f62
feat:add license
ViolaPioggia Oct 28, 2023
7eedb6d
feat:add go.sum
ViolaPioggia Oct 28, 2023
7922c71
feat:update go.mod
ViolaPioggia Oct 28, 2023
7f8de1c
feat:update go.mod
ViolaPioggia Oct 28, 2023
f594492
feat:update go.mod
ViolaPioggia Oct 28, 2023
df0d6af
feat:update go.mod
ViolaPioggia Oct 28, 2023
584d8ad
feat:update go.mod
ViolaPioggia Oct 28, 2023
f969f03
feat:update go.mod
ViolaPioggia Oct 28, 2023
c748514
feat:update go.mod
ViolaPioggia Oct 28, 2023
df189d9
feat:add client deregister callback
ViolaPioggia Oct 29, 2023
da4d3cd
feat:update server
ViolaPioggia Oct 30, 2023
edf800a
feat:update go.mod
ViolaPioggia Oct 30, 2023
a9b6ed0
feat:update go.mod
ViolaPioggia Oct 30, 2023
d65b107
style:change variable name
ViolaPioggia Oct 31, 2023
257ebae
fix:fix slice
ViolaPioggia Oct 31, 2023
d906d71
style:change global variable place
ViolaPioggia Oct 31, 2023
e62351e
feat:update go.mod
ViolaPioggia Oct 31, 2023
c71f23d
feat:add example && fix bugs
ViolaPioggia Oct 31, 2023
5ad73c6
feat:add license header
ViolaPioggia Oct 31, 2023
5082aaf
style:update style
ViolaPioggia Nov 1, 2023
cc19d34
feat:add license header
ViolaPioggia Nov 1, 2023
12ff580
fix:fix etcd client initialize block
ViolaPioggia Nov 2, 2023
1295c97
style:gofumpt
ViolaPioggia Nov 2, 2023
1553dcd
style:change code description
ViolaPioggia Nov 2, 2023
7a3e65e
style:change DialTimeout into Timeout
ViolaPioggia Nov 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"bytes"
"context"
"strconv"
"sync"
"text/template"

"github.com/cloudwego/kitex/pkg/klog"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

var (
m sync.Mutex
ctxMap map[string]context.CancelFunc
)

type Key struct {
Prefix string
Path string
}

type Client interface {
SetParser(ConfigParser)
ClientConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error)
ServerConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error)
RegisterConfigCallback(ctx context.Context, key string, clientId int64, callback func(string, ConfigParser))
DeregisterConfig(key string, clientId int64)
}

type client struct {
ecli *clientv3.Client
// support customise parser
parser ConfigParser
prefixTemplate *template.Template
serverPathTemplate *template.Template
clientPathTemplate *template.Template
}

// Options etcd config options. All the fields have default value.
type Options struct {
Node []string
Prefix string
ServerPathFormat string
clientPathFormat string
LoggerConfig *zap.Config
ConfigParser ConfigParser
}

// New Create a default etcd client
// It can create a client with default config by env variable.
// See: env.go
func New(opts Options) (Client, error) {
if opts.Node == nil {
opts.Node = []string{EtcdDefaultNode}
}
if opts.ConfigParser == nil {
opts.ConfigParser = defaultConfigParse()
}
if opts.Prefix == "" {
opts.Prefix = EtcdDefaultConfigPrefix
}
if opts.ServerPathFormat == "" {
opts.ServerPathFormat = EtcdDefaultServerPath
}
if opts.clientPathFormat == "" {
opts.clientPathFormat = EtcdDefaultClientPath
}
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: opts.Node,
LogConfig: opts.LoggerConfig,
})
if err != nil {
return nil, err
}
prefixTemplate, err := template.New("prefix").Parse(opts.Prefix)
if err != nil {
return nil, err
}
serverNameTemplate, err := template.New("serverName").Parse(opts.ServerPathFormat)
if err != nil {
return nil, err
}
clientNameTemplate, err := template.New("clientName").Parse(opts.clientPathFormat)
if err != nil {
return nil, err
}
c := &client{
ecli: etcdClient,
parser: opts.ConfigParser,
prefixTemplate: prefixTemplate,
serverPathTemplate: serverNameTemplate,
clientPathTemplate: clientNameTemplate,
}
return c, nil
}

func (c *client) SetParser(parser ConfigParser) {
c.parser = parser
}

func (c *client) ClientConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error) {
return c.configParam(cpc, c.clientPathTemplate, cfs...)
}

func (c *client) ServerConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error) {
return c.configParam(cpc, c.serverPathTemplate, cfs...)
}

// configParam render config parameters. All the parameters can be customized with CustomFunction.
// ConfigParam explain:
// 1. Prefix: KitexConfig by default.
// 2. ServerPath: {{.ServerServiceName}}/{{.Category}} by default.
// ClientPath: {{.ClientServiceName}}/{{.ServerServiceName}}/{{.Category}} by default.
func (c *client) configParam(cpc *ConfigParamConfig, t *template.Template, cfs ...CustomFunction) (Key, error) {
param := Key{}

var err error
param.Path, err = c.render(cpc, t)
if err != nil {
return param, err
}
param.Prefix, err = c.render(cpc, c.prefixTemplate)
if err != nil {
return param, err
}

for _, cf := range cfs {
cf(&param)
}
return param, nil
}

func (c *client) render(cpc *ConfigParamConfig, t *template.Template) (string, error) {
var tpl bytes.Buffer
err := t.Execute(&tpl, cpc)
if err != nil {
return "", err
}
return tpl.String(), nil
}

// RegisterConfigCallback register the callback function to etcd client.
func (c *client) RegisterConfigCallback(ctx context.Context, key string, clientId int64, callback func(string, ConfigParser)) {
clientCtx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-clientCtx.Done():
felix021 marked this conversation as resolved.
Show resolved Hide resolved
return
default:
m.Lock()
tmp := key + "/" + strconv.FormatInt(clientId, 10)
felix021 marked this conversation as resolved.
Show resolved Hide resolved
ctxMap[tmp] = cancel
m.Unlock()
watchChan := c.ecli.Watch(ctx, key)
felix021 marked this conversation as resolved.
Show resolved Hide resolved
for watchResp := range watchChan {
for _, event := range watchResp.Events {
eventType := mvccpb.Event_EventType(event.Type)
// 检查事件类型
felix021 marked this conversation as resolved.
Show resolved Hide resolved
if eventType == mvccpb.PUT {
// 配置被更新
value := string(event.Kv.Value)
klog.Debugf("[etcd] config key: %s updated,value is %s", key, value)
callback(value, c.parser)
} else if eventType == mvccpb.DELETE {
// 配置被删除
klog.Debugf("[etcd] config key: %s deleted", key)
callback("", c.parser)
}
}
}
}
}()

data, err := c.ecli.Get(context.Background(), key)
// the etcd client has handled the not exist error.
if err != nil {
klog.Debugf("[etcd] key: %s config get value failed", key)
return
}

callback(string(data.Kvs[0].Value), c.parser)
}

func (c *client) DeregisterConfig(key string, clientId int64) {
tmp := key + "/" + strconv.FormatInt(clientId, 10)
cancel := ctxMap[tmp]
felix021 marked this conversation as resolved.
Show resolved Hide resolved
cancel()
}
56 changes: 56 additions & 0 deletions etcd/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"encoding/json"
)

const (
EtcdDefaultNode = "http://127.0.0.1:2379"
EtcdDefaultConfigPrefix = "KitexConfig"
EtcdDefaultClientPath = "{{.ClientServiceName}}/{{.ServerServiceName}}/{{.Category}}"
EtcdDefaultServerPath = "{{.ServerServiceName}}/{{.Category}}"
)

var _ ConfigParser = &parser{}

// CustomFunction use for customize the config parameters.
type CustomFunction func(*Key)

// ConfigParamConfig use for render the path or prefix info by go template, ref: https://pkg.go.dev/text/template
// The fixed key shows as below.
type ConfigParamConfig struct {
Category string
ClientServiceName string
ServerServiceName string
}

// ConfigParser the parser for etcd config.
type ConfigParser interface {
Decode(data string, config interface{}) error
}

type parser struct{}

// Decode decodes the data to struct in specified format.
func (p *parser) Decode(data string, config interface{}) error {
return json.Unmarshal([]byte(data), config)
}

// DefaultConfigParse default etcd config parser.
func defaultConfigParse() ConfigParser {
return &parser{}
}
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module config-etcd
felix021 marked this conversation as resolved.
Show resolved Hide resolved

go 1.16

require (
github.com/cloudwego/kitex v0.7.3
go.etcd.io/etcd v3.3.27+incompatible
go.etcd.io/etcd/client/v3 v3.5.10
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.59.0 // indirect
)
Loading