Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add etcd && server #1

Merged
merged 27 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7fdf43d
feat:add etcd && server
ViolaPioggia Oct 28, 2023
d43b57b
style:gofumpt
ViolaPioggia Oct 28, 2023
4cb6f62
feat:add license
ViolaPioggia Oct 28, 2023
7eedb6d
feat:add go.sum
ViolaPioggia Oct 28, 2023
7922c71
feat:update go.mod
ViolaPioggia Oct 28, 2023
7f8de1c
feat:update go.mod
ViolaPioggia Oct 28, 2023
f594492
feat:update go.mod
ViolaPioggia Oct 28, 2023
df0d6af
feat:update go.mod
ViolaPioggia Oct 28, 2023
584d8ad
feat:update go.mod
ViolaPioggia Oct 28, 2023
f969f03
feat:update go.mod
ViolaPioggia Oct 28, 2023
c748514
feat:update go.mod
ViolaPioggia Oct 28, 2023
df189d9
feat:add client deregister callback
ViolaPioggia Oct 29, 2023
da4d3cd
feat:update server
ViolaPioggia Oct 30, 2023
edf800a
feat:update go.mod
ViolaPioggia Oct 30, 2023
a9b6ed0
feat:update go.mod
ViolaPioggia Oct 30, 2023
d65b107
style:change variable name
ViolaPioggia Oct 31, 2023
257ebae
fix:fix slice
ViolaPioggia Oct 31, 2023
d906d71
style:change global variable place
ViolaPioggia Oct 31, 2023
e62351e
feat:update go.mod
ViolaPioggia Oct 31, 2023
c71f23d
feat:add example && fix bugs
ViolaPioggia Oct 31, 2023
5ad73c6
feat:add license header
ViolaPioggia Oct 31, 2023
5082aaf
style:update style
ViolaPioggia Nov 1, 2023
cc19d34
feat:add license header
ViolaPioggia Nov 1, 2023
12ff580
fix:fix etcd client initialize block
ViolaPioggia Nov 2, 2023
1295c97
style:gofumpt
ViolaPioggia Nov 2, 2023
1553dcd
style:change code description
ViolaPioggia Nov 2, 2023
7a3e65e
style:change DialTimeout into Timeout
ViolaPioggia Nov 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"bytes"
"context"
"strconv"
"sync"
"text/template"

"go.etcd.io/etcd/api/v3/mvccpb"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/cloudwego/kitex/pkg/klog"
"go.uber.org/zap"
)

var (
m sync.Mutex
ctxMap = make(map[string]context.CancelFunc)
Num int64
felix021 marked this conversation as resolved.
Show resolved Hide resolved
)

type Key struct {
Prefix string
Path string
}

type Client interface {
SetParser(ConfigParser)
ClientConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error)
ServerConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error)
RegisterConfigCallback(ctx context.Context, key string, clientId int64, callback func(string, ConfigParser))
DeregisterConfig(key string, uniqueId int64)
}

type client struct {
ecli *clientv3.Client
// support customise parser
parser ConfigParser
prefixTemplate *template.Template
serverPathTemplate *template.Template
clientPathTemplate *template.Template
}

// Options etcd config options. All the fields have default value.
type Options struct {
Node []string
Prefix string
ServerPathFormat string
clientPathFormat string
LoggerConfig *zap.Config
ConfigParser ConfigParser
}

// New Create a default etcd client
// It can create a client with default config by env variable.
// See: env.go
func New(opts Options) (Client, error) {
if opts.Node == nil {
opts.Node = []string{EtcdDefaultNode}
}
if opts.ConfigParser == nil {
opts.ConfigParser = defaultConfigParse()
}
if opts.Prefix == "" {
opts.Prefix = EtcdDefaultConfigPrefix
}
if opts.ServerPathFormat == "" {
opts.ServerPathFormat = EtcdDefaultServerPath
}
if opts.clientPathFormat == "" {
opts.clientPathFormat = EtcdDefaultClientPath
}
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: opts.Node,
LogConfig: opts.LoggerConfig,
})
if err != nil {
return nil, err
}
prefixTemplate, err := template.New("prefix").Parse(opts.Prefix)
if err != nil {
return nil, err
}
serverNameTemplate, err := template.New("serverName").Parse(opts.ServerPathFormat)
if err != nil {
return nil, err
}
clientNameTemplate, err := template.New("clientName").Parse(opts.clientPathFormat)
if err != nil {
return nil, err
}
c := &client{
ecli: etcdClient,
parser: opts.ConfigParser,
prefixTemplate: prefixTemplate,
serverPathTemplate: serverNameTemplate,
clientPathTemplate: clientNameTemplate,
}
return c, nil
}

func (c *client) SetParser(parser ConfigParser) {
c.parser = parser
}

func (c *client) ClientConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error) {
return c.configParam(cpc, c.clientPathTemplate, cfs...)
}

func (c *client) ServerConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error) {
return c.configParam(cpc, c.serverPathTemplate, cfs...)
}

// configParam render config parameters. All the parameters can be customized with CustomFunction.
// ConfigParam explain:
// 1. Prefix: KitexConfig by default.
// 2. ServerPath: {{.ServerServiceName}}/{{.Category}} by default.
// ClientPath: {{.ClientServiceName}}/{{.ServerServiceName}}/{{.Category}} by default.
func (c *client) configParam(cpc *ConfigParamConfig, t *template.Template, cfs ...CustomFunction) (Key, error) {
param := Key{}

var err error
param.Path, err = c.render(cpc, t)
if err != nil {
return param, err
}
param.Prefix, err = c.render(cpc, c.prefixTemplate)
if err != nil {
return param, err
}

for _, cf := range cfs {
cf(&param)
}
return param, nil
}

func (c *client) render(cpc *ConfigParamConfig, t *template.Template) (string, error) {
var tpl bytes.Buffer
err := t.Execute(&tpl, cpc)
if err != nil {
return "", err
}
return tpl.String(), nil
}

// RegisterConfigCallback register the callback function to etcd client.
func (c *client) RegisterConfigCallback(ctx context.Context, key string, uniqueID int64, callback func(string, ConfigParser)) {
clientCtx, cancel := context.WithCancel(context.Background())
go func() {
m.Lock()
tmp := key + "/" + strconv.FormatInt(uniqueID, 10)
ctxMap[tmp] = cancel
m.Unlock()
watchChan := c.ecli.Watch(ctx, key)
for {
select {
case <-clientCtx.Done():
return
case watchResp := <-watchChan:
for _, event := range watchResp.Events {
eventType := event.Type
// 检查事件类型
felix021 marked this conversation as resolved.
Show resolved Hide resolved
if eventType == mvccpb.PUT {
// 配置被更新
value := string(event.Kv.Value)
klog.Debugf("[etcd] config key: %s updated,value is %s", key, value)
callback(value, c.parser)
} else if eventType == mvccpb.DELETE {
// 配置被删除
klog.Debugf("[etcd] config key: %s deleted", key)
callback("", c.parser)
}
}
}
}
}()

data, err := c.ecli.Get(context.Background(), key)
// the etcd client has handled the not exist error.
if err != nil {
klog.Debugf("[etcd] key: %s config get value failed", key)
return
}
if data.Kvs == nil {
callback("", c.parser)
return
}
callback(string(data.Kvs[0].Value), c.parser)
}

func (c *client) DeregisterConfig(key string, uniqueID int64) {
m.Lock()
clientKey := key + "/" + strconv.FormatInt(uniqueID, 10)
cancel := ctxMap[clientKey]
cancel()
m.Unlock()
}
56 changes: 56 additions & 0 deletions etcd/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"encoding/json"
)

const (
EtcdDefaultNode = "http://127.0.0.1:2379"
EtcdDefaultConfigPrefix = "KitexConfig"
EtcdDefaultClientPath = "{{.ClientServiceName}}/{{.ServerServiceName}}/{{.Category}}"
EtcdDefaultServerPath = "{{.ServerServiceName}}/{{.Category}}"
)

var _ ConfigParser = &parser{}

// CustomFunction use for customize the config parameters.
type CustomFunction func(*Key)

// ConfigParamConfig use for render the path or prefix info by go template, ref: https://pkg.go.dev/text/template
// The fixed key shows as below.
type ConfigParamConfig struct {
Category string
ClientServiceName string
ServerServiceName string
}

// ConfigParser the parser for etcd config.
type ConfigParser interface {
Decode(data string, config interface{}) error
}

type parser struct{}

// Decode decodes the data to struct in specified format.
func (p *parser) Decode(data string, config interface{}) error {
return json.Unmarshal([]byte(data), config)
}

// DefaultConfigParse default etcd config parser.
func defaultConfigParse() ConfigParser {
return &parser{}
}
54 changes: 54 additions & 0 deletions example/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 CloudWeGo Authors
felix021 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main

import (
"context"
"log"

"github.com/cloudwego/kitex-examples/kitex_gen/api"
"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
"github.com/kitex-contrib/config-etcd/etcd"
etcdServer "github.com/kitex-contrib/config-etcd/server"
)

var _ api.Echo = &EchoImpl{}

// EchoImpl implements the last service interface defined in the IDL.
type EchoImpl struct{}

// Echo implements the Echo interface.
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
klog.Info("echo called")
return &api.Response{Message: req.Message}, nil
}

func main() {
klog.SetLevel(klog.LevelDebug)
serviceName := "ServiceName"
etcdClient, _ := etcd.New(etcd.Options{})
svr := echo.NewServer(
new(EchoImpl),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
server.WithSuite(etcdServer.NewSuite(serviceName, etcdClient)),
)
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
} else {
log.Println("server stopped")
}
}
66 changes: 66 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module github.com/kitex-contrib/config-etcd

go 1.19
felix021 marked this conversation as resolved.
Show resolved Hide resolved

require (
github.com/cloudwego/kitex v0.7.3
github.com/cloudwego/kitex-examples v0.2.2
go.etcd.io/etcd v3.3.27+incompatible
go.etcd.io/etcd/client/v3 v3.5.10
go.uber.org/zap v1.26.0
)

require (
github.com/apache/thrift v0.19.0 // indirect
github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/choleraehyq/pid v0.0.17 // indirect
github.com/cloudwego/configmanager v0.2.0 // indirect
github.com/cloudwego/dynamicgo v0.1.3 // indirect
github.com/cloudwego/fastpb v0.0.4 // indirect
github.com/cloudwego/frugal v0.1.8 // indirect
github.com/cloudwego/localsession v0.0.2 // indirect
github.com/cloudwego/netpoll v0.5.1 // indirect
github.com/cloudwego/thriftgo v0.3.2-0.20230828085742-edaddf2c17af // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230509042627-b1315fad0c5a // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/jhump/protoreflect v1.8.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/oleiade/lane v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.2.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace (
github.com/apache/thrift v0.19.0 => github.com/apache/thrift v0.13.0
)
Loading
Loading