Skip to content

Commit

Permalink
optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed Oct 20, 2023
1 parent 4ed902b commit 4db415f
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 36 deletions.
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ func Match(patterns ...string) configurator.Matcher {
}

// Watch 设置监听回调
func Watch(cb configurator.WatchCallbackFunc) {
func Watch(cb configurator.WatchCallbackFunc, names ...string) {
if globalConfigurator == nil {
log.Warn("the configurator component is not injected, and the watch operation will be ignored.")
return
}

globalConfigurator.Watch(cb)
globalConfigurator.Watch(cb, names...)
}

// Load 加载配置项
Expand All @@ -98,13 +98,13 @@ func Load(ctx context.Context, source string, file ...string) ([]*configurator.C
}

// Store 保存配置项
func Store(ctx context.Context, source string, name string, content interface{}) error {
func Store(ctx context.Context, source string, file string, content interface{}, override ...bool) error {
if globalConfigurator == nil {
log.Warn("the configurator component is not injected, and the store operation will be ignored.")
return nil
}

return globalConfigurator.Store(ctx, source, name, content)
return globalConfigurator.Store(ctx, source, file, content, override...)
}

// Close 关闭配置监听
Expand Down
17 changes: 15 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestWatch(t *testing.T) {
select {
case <-ticker1.C:
t.Log(config.Get("config.timezone").String())
t.Log(config.Get("config.pid").String())
case <-ticker2:
config.Close()
return
Expand All @@ -45,11 +46,23 @@ func TestLoad(t *testing.T) {
func TestStore(t *testing.T) {
ctx := context.Background()
file := "config.json"
content := map[string]interface{}{
content1 := map[string]interface{}{
"timezone": "Local",
}

err := config.Store(ctx, etcd.Name, file, content)
content2 := map[string]interface{}{
"timezone": "UTC",
"pid": "./run/gate.pid",
}

err := config.Store(ctx, etcd.Name, file, content1, true)
if err != nil {
t.Fatal(err)
}

time.Sleep(5 * time.Second)

err = config.Store(ctx, etcd.Name, file, content2)
if err != nil {
t.Fatal(err)
}
Expand Down
129 changes: 99 additions & 30 deletions config/configurator/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"github.com/dobyte/due/v2/core/value"
"github.com/dobyte/due/v2/errors"
"github.com/dobyte/due/v2/utils/xconv"
"github.com/dobyte/due/v2/utils/xreflect"
"github.com/imdario/mergo"
"github.com/jinzhu/copier"
"log"
"math"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
Expand All @@ -32,11 +35,11 @@ type Configurator interface {
// Match 匹配多个规则
Match(patterns ...string) Matcher
// Watch 设置监听回调
Watch(cb WatchCallbackFunc)
Watch(cb WatchCallbackFunc, names ...string)
// Load 加载配置项
Load(ctx context.Context, source string, file ...string) ([]*Configuration, error)
// Store 保存配置项
Store(ctx context.Context, source string, file string, content interface{}) error
Store(ctx context.Context, source string, file string, content interface{}, override ...bool) error
// Close 关闭配置监听
Close()
}
Expand All @@ -59,14 +62,19 @@ type Watcher interface {
Stop() error
}

type watcher struct {
names map[string]struct{}
callback WatchCallbackFunc
}

type WatchCallbackFunc func(names ...string)

// Configuration 配置项
type Configuration struct {
decoder Decoder // 解码器
scanner Scanner // 扫描器
Path string // 文件路径
File string // 文件名称
File string // 文件全称
Name string // 文件名称
Format string // 文件格式
Content []byte // 文件内容
Expand All @@ -92,15 +100,15 @@ func (c *Configuration) Scan(dest interface{}) error {
}

type defaultConfigurator struct {
opts *options
ctx context.Context
cancel context.CancelFunc
sources map[string]Source
mu sync.Mutex
idx int64
values [2]map[string]interface{}
rw sync.RWMutex
callbacks []WatchCallbackFunc
opts *options
ctx context.Context
cancel context.CancelFunc
sources map[string]Source
mu sync.Mutex
idx int64
values [2]map[string]interface{}
rw sync.RWMutex
watchers []*watcher
}

var _ Configurator = &defaultConfigurator{}
Expand All @@ -114,7 +122,7 @@ func NewConfigurator(opts ...Option) Configurator {
r := &defaultConfigurator{}
r.opts = o
r.ctx, r.cancel = context.WithCancel(o.ctx)
r.callbacks = make([]WatchCallbackFunc, 0)
r.watchers = make([]*watcher, 0)
r.init()
r.watch()

Expand Down Expand Up @@ -239,17 +247,36 @@ func (c *defaultConfigurator) watch() {
}()

if len(names) > 0 {
c.rw.RLock()
for _, cb := range c.callbacks {
cb(names...)
}
c.rw.RUnlock()
go c.notify(names...)
}
}
}()
}
}

// 通知给监听器
func (c *defaultConfigurator) notify(names ...string) {
c.rw.RLock()
defer c.rw.RUnlock()

for _, w := range c.watchers {
if len(w.names) == 0 {
w.callback(names...)
} else {
validNames := make([]string, 0, int(math.Min(float64(len(w.names)), float64(len(names)))))
for _, name := range names {
if _, ok := w.names[name]; ok {
validNames = append(validNames, name)
}
}

if len(validNames) > 0 {
w.callback(validNames...)
}
}
}
}

// Close 关闭配置监听
func (c *defaultConfigurator) Close() {
c.cancel()
Expand Down Expand Up @@ -468,9 +495,17 @@ func (c *defaultConfigurator) Set(pattern string, value interface{}) error {
}

// Watch 设置监听回调
func (c *defaultConfigurator) Watch(cb WatchCallbackFunc) {
func (c *defaultConfigurator) Watch(cb WatchCallbackFunc, names ...string) {
w := &watcher{}
w.names = make(map[string]struct{}, len(names))
w.callback = cb

for _, name := range names {
w.names[name] = struct{}{}
}

c.rw.Lock()
c.callbacks = append(c.callbacks, cb)
c.watchers = append(c.watchers, w)
c.rw.Unlock()
}

Expand All @@ -494,7 +529,7 @@ func (c *defaultConfigurator) Load(ctx context.Context, source string, file ...s
}

// Store 保存配置项
func (c *defaultConfigurator) Store(ctx context.Context, source string, file string, content interface{}) error {
func (c *defaultConfigurator) Store(ctx context.Context, source string, file string, content interface{}, override ...bool) error {
if content == nil {
return ErrInvalidConfigContent
}
Expand All @@ -505,24 +540,58 @@ func (c *defaultConfigurator) Store(ctx context.Context, source string, file str
}

var (
val []byte
err error
format = strings.TrimPrefix(filepath.Ext(file), ".")
buf []byte
ext = filepath.Ext(file)
format = strings.TrimPrefix(ext, ".")
)

switch content.(type) {
case map[string]interface{}:
val, err = c.opts.encoder(format, content)
case []interface{}:
val, err = c.opts.encoder(format, content)
switch rk, _ := xreflect.Value(content); rk {
case reflect.Map, reflect.Struct:
if len(override) > 0 && override[0] {
buf, err = c.opts.encoder(format, content)
} else {
dest, err := c.copy()
if err != nil {
return err
}

name := strings.TrimSuffix(filepath.Base(file), ext)

val, ok := dest[name]
if !ok {
buf, err = c.opts.encoder(format, content)
} else if v, ok := val.(map[string]interface{}); ok {
buf, err = c.opts.encoder(format, content)
if err != nil {
return err
}

maps, err := c.opts.decoder(format, buf)
if err != nil {
return err
}

err = mergo.Merge(&v, maps, mergo.WithOverride)
if err != nil {
return err
}

buf, err = c.opts.encoder(format, v)
} else {
buf, err = c.opts.encoder(format, content)
}
}
case reflect.Array, reflect.Slice:
buf, err = c.opts.encoder(format, content)
default:
val = xconv.Bytes(content)
buf = xconv.Bytes(xconv.String(content))
}
if err != nil {
return err
}

return s.Store(ctx, file, val)
return s.Store(ctx, file, buf)
}

func reviseKeys(keys []string, values map[string]interface{}) []string {
Expand Down
17 changes: 17 additions & 0 deletions utils/xreflect/reflect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package xreflect

import "reflect"

func Value(i any) (reflect.Kind, reflect.Value) {
var (
rv = reflect.ValueOf(i)
rk = rv.Kind()
)

for rk == reflect.Ptr {
rv = rv.Elem()
rk = rv.Kind()
}

return rk, rv
}

0 comments on commit 4db415f

Please sign in to comment.