Skip to content

Commit

Permalink
Merge pull request #1 from ViolaPioggia/main
Browse files Browse the repository at this point in the history
feat:add etcd && server
  • Loading branch information
Felix021 authored Nov 2, 2023
2 parents 9427239 + 7a3e65e commit 4074b43
Show file tree
Hide file tree
Showing 12 changed files with 2,929 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
unit-benchmark-test:
strategy:
matrix:
go: [ 1.17, 1.18, 1.19 ]
go: [ 1.19 ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
221 changes: 221 additions & 0 deletions etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"bytes"
"context"
"strconv"
"sync"
"text/template"
"time"

"go.etcd.io/etcd/api/v3/mvccpb"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/cloudwego/kitex/pkg/klog"
"go.uber.org/zap"
)

var (
m sync.Mutex
ctxMap = make(map[string]context.CancelFunc)
)

type Key struct {
Prefix string
Path string
}

type Client interface {
SetParser(ConfigParser)
ClientConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error)
ServerConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error)
RegisterConfigCallback(ctx context.Context, key string, clientId int64, callback func(string, ConfigParser))
DeregisterConfig(key string, uniqueId int64)
}

type client struct {
ecli *clientv3.Client
// support customise parser
parser ConfigParser
etcdTimeout time.Duration
prefixTemplate *template.Template
serverPathTemplate *template.Template
clientPathTemplate *template.Template
}

// Options etcd config options. All the fields have default value.
type Options struct {
Node []string
Prefix string
ServerPathFormat string
clientPathFormat string
Timeout time.Duration
LoggerConfig *zap.Config
ConfigParser ConfigParser
}

// New Create a default etcd client
// It can create a client with default config by env variable.
// See: env.go
func New(opts Options) (Client, error) {
if opts.Node == nil {
opts.Node = []string{EtcdDefaultNode}
}
if opts.ConfigParser == nil {
opts.ConfigParser = defaultConfigParse()
}
if opts.Prefix == "" {
opts.Prefix = EtcdDefaultConfigPrefix
}
if opts.Timeout == 0 {
opts.Timeout = EtcdDefaultTimeout
}
if opts.ServerPathFormat == "" {
opts.ServerPathFormat = EtcdDefaultServerPath
}
if opts.clientPathFormat == "" {
opts.clientPathFormat = EtcdDefaultClientPath
}
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: opts.Node,
LogConfig: opts.LoggerConfig,
})
if err != nil {
return nil, err
}
prefixTemplate, err := template.New("prefix").Parse(opts.Prefix)
if err != nil {
return nil, err
}
serverNameTemplate, err := template.New("serverName").Parse(opts.ServerPathFormat)
if err != nil {
return nil, err
}
clientNameTemplate, err := template.New("clientName").Parse(opts.clientPathFormat)
if err != nil {
return nil, err
}
c := &client{
ecli: etcdClient,
parser: opts.ConfigParser,
etcdTimeout: opts.Timeout,
prefixTemplate: prefixTemplate,
serverPathTemplate: serverNameTemplate,
clientPathTemplate: clientNameTemplate,
}
return c, nil
}

func (c *client) SetParser(parser ConfigParser) {
c.parser = parser
}

func (c *client) ClientConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error) {
return c.configParam(cpc, c.clientPathTemplate, cfs...)
}

func (c *client) ServerConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (Key, error) {
return c.configParam(cpc, c.serverPathTemplate, cfs...)
}

// configParam render config parameters. All the parameters can be customized with CustomFunction.
// ConfigParam explain:
// 1. Prefix: KitexConfig by default.
// 2. ServerPath: {{.ServerServiceName}}/{{.Category}} by default.
// ClientPath: {{.ClientServiceName}}/{{.ServerServiceName}}/{{.Category}} by default.
func (c *client) configParam(cpc *ConfigParamConfig, t *template.Template, cfs ...CustomFunction) (Key, error) {
param := Key{}

var err error
param.Path, err = c.render(cpc, t)
if err != nil {
return param, err
}
param.Prefix, err = c.render(cpc, c.prefixTemplate)
if err != nil {
return param, err
}

for _, cf := range cfs {
cf(&param)
}
return param, nil
}

func (c *client) render(cpc *ConfigParamConfig, t *template.Template) (string, error) {
var tpl bytes.Buffer
err := t.Execute(&tpl, cpc)
if err != nil {
return "", err
}
return tpl.String(), nil
}

// RegisterConfigCallback register the callback function to etcd client.
func (c *client) RegisterConfigCallback(ctx context.Context, key string, uniqueID int64, callback func(string, ConfigParser)) {
clientCtx, cancel := context.WithCancel(context.Background())
go func() {
m.Lock()
clientKey := key + "/" + strconv.FormatInt(uniqueID, 10)
ctxMap[clientKey] = cancel
m.Unlock()
watchChan := c.ecli.Watch(ctx, key)
for {
select {
case <-clientCtx.Done():
return
case watchResp := <-watchChan:
for _, event := range watchResp.Events {
eventType := event.Type
// check the event type
if eventType == mvccpb.PUT {
// config is updated
value := string(event.Kv.Value)
klog.Debugf("[etcd] config key: %s updated,value is %s", key, value)
callback(value, c.parser)
} else if eventType == mvccpb.DELETE {
// config is deleted
klog.Debugf("[etcd] config key: %s deleted", key)
callback("", c.parser)
}
}
}
}
}()
ctx, cancel = context.WithTimeout(context.Background(), c.etcdTimeout)
defer cancel()
data, err := c.ecli.Get(ctx, key)
// the etcd client has handled the not exist error.
if err != nil {
klog.Debugf("[etcd] key: %s config get value failed", key)
return
}
if data.Kvs == nil {
callback("", c.parser)
return
}
callback(string(data.Kvs[0].Value), c.parser)
}

func (c *client) DeregisterConfig(key string, uniqueID int64) {
m.Lock()
clientKey := key + "/" + strconv.FormatInt(uniqueID, 10)
cancel := ctxMap[clientKey]
cancel()
m.Unlock()
}
24 changes: 24 additions & 0 deletions etcd/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import "sync/atomic"

var globalNum int64

func AllocateUniqueID() int64 {
atomic.AddInt64(&globalNum, 1)
return atomic.LoadInt64(&globalNum)
}
58 changes: 58 additions & 0 deletions etcd/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import (
"encoding/json"
"time"
)

const (
EtcdDefaultNode = "http://127.0.0.1:2379"
EtcdDefaultConfigPrefix = "KitexConfig"
EtcdDefaultTimeout = 5 * time.Second
EtcdDefaultClientPath = "{{.ClientServiceName}}/{{.ServerServiceName}}/{{.Category}}"
EtcdDefaultServerPath = "{{.ServerServiceName}}/{{.Category}}"
)

var _ ConfigParser = &parser{}

// CustomFunction use for customize the config parameters.
type CustomFunction func(*Key)

// ConfigParamConfig use for render the path or prefix info by go template, ref: https://pkg.go.dev/text/template
// The fixed key shows as below.
type ConfigParamConfig struct {
Category string
ClientServiceName string
ServerServiceName string
}

// ConfigParser the parser for etcd config.
type ConfigParser interface {
Decode(data string, config interface{}) error
}

type parser struct{}

// Decode decodes the data to struct in specified format.
func (p *parser) Decode(data string, config interface{}) error {
return json.Unmarshal([]byte(data), config)
}

// DefaultConfigParse default etcd config parser.
func defaultConfigParse() ConfigParser {
return &parser{}
}
66 changes: 66 additions & 0 deletions example/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module example

go 1.19

require (
github.com/cloudwego/kitex v0.7.3
github.com/cloudwego/kitex-examples v0.2.2
github.com/kitex-contrib/config-etcd v0.0.0-00010101000000-000000000000
)

require (
github.com/apache/thrift v0.19.0 // indirect
github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/choleraehyq/pid v0.0.17 // indirect
github.com/cloudwego/configmanager v0.2.0 // indirect
github.com/cloudwego/dynamicgo v0.1.3 // indirect
github.com/cloudwego/fastpb v0.0.4 // indirect
github.com/cloudwego/frugal v0.1.8 // indirect
github.com/cloudwego/localsession v0.0.2 // indirect
github.com/cloudwego/netpoll v0.5.1 // indirect
github.com/cloudwego/thriftgo v0.3.2-0.20230828085742-edaddf2c17af // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230509042627-b1315fad0c5a // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/jhump/protoreflect v1.8.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/oleiade/lane v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/v3 v3.5.10 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/arch v0.2.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/kitex-contrib/config-etcd => ../

replace github.com/apache/thrift v0.19.0 => github.com/apache/thrift v0.13.0
Loading

0 comments on commit 4074b43

Please sign in to comment.