Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification Addition #73

Merged
merged 9 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ The `Harvester` builder pattern is used to create a `Harvester` instance. The bu

The above snippet set's up a `Harvester` instance with consul seed and monitor.

## Notification support

In order to be able to monitor the changes in the configuration we can provide

## Consul

Consul has support for versioning (`ModifyIndex`) which allows us to change the value only if the version is higher than the one currently.
Expand Down
18 changes: 15 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ type Field struct {
version uint64
structField CfgType
sources map[Source]string
chNotify chan<- string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the ability to track changes in configuration is awesome, but I think sending just a string doesn't give too much value.
It would be better to send a notification struct with more information like the previous and next values along with the field (at least its name) so that some kind of calculation can be done from the user.
For example if there is a bigger change than x% or maybe send these values to another service (ex. auditing, event message etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. On it.

}

// newField constructor.
func newField(prefix string, fld reflect.StructField, val reflect.Value) *Field {
func newField(prefix string, fld reflect.StructField, val reflect.Value, chNotify chan<- string) *Field {
f := &Field{
name: prefix + fld.Name,
tp: fld.Type.Name(),
version: 0,
structField: val.Addr().Interface().(CfgType),
sources: make(map[Source]string),
chNotify: chNotify,
}

for _, tag := range sourceTags {
Expand Down Expand Up @@ -94,27 +96,37 @@ func (f *Field) Set(value string, version uint64) error {
return nil
}

prevValue := f.structField.String()

if err := f.structField.SetString(value); err != nil {
return err
}

f.version = version
log.Infof("field %q updated with value %q, version: %d", f.name, f, version)
f.sendNotification(prevValue, value)
return nil
}

func (f *Field) sendNotification(prev string, current string) {
if f.chNotify == nil {
return
}
f.chNotify <- fmt.Sprintf("field [%s] of type [%s] changed from [%s] to [%s]", f.name, f.tp, prev, current)
}

// Config manages configuration and handles updates on the values.
type Config struct {
Fields []*Field
}

// New creates a new monitor.
func New(cfg interface{}) (*Config, error) {
func New(cfg interface{}, chNotify chan<- string) (*Config, error) {
if cfg == nil {
return nil, errors.New("configuration is nil")
}

ff, err := newParser().ParseCfg(cfg)
ff, err := newParser().ParseCfg(cfg, chNotify)
if err != nil {
return nil, err
}
Expand Down
57 changes: 31 additions & 26 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,29 @@ import (

func TestField_Set(t *testing.T) {
c := testConfig{}
cfg, err := New(&c)
cfg, err := New(&c, nil)
require.NoError(t, err)
cfg.Fields[0].version = 2
type args struct {
value string
version uint64
}
tests := []struct {
name string
tests := map[string]struct {
field Field
args args
wantErr bool
}{
{name: "success String", field: *cfg.Fields[0], args: args{value: "John Doe", version: 3}, wantErr: false},
{name: "success Int64", field: *cfg.Fields[1], args: args{value: "18", version: 1}, wantErr: false},
{name: "success Float64", field: *cfg.Fields[2], args: args{value: "99.9", version: 1}, wantErr: false},
{name: "success Bool", field: *cfg.Fields[3], args: args{value: "true", version: 1}, wantErr: false},
{name: "failure Int64", field: *cfg.Fields[1], args: args{value: "XXX", version: 1}, wantErr: true},
{name: "failure Float64", field: *cfg.Fields[2], args: args{value: "XXX", version: 1}, wantErr: true},
{name: "failure Bool", field: *cfg.Fields[3], args: args{value: "XXX", version: 1}, wantErr: true},
{name: "warn String version older", field: *cfg.Fields[0], args: args{value: "John Doe", version: 2}, wantErr: false},
"success String": {field: *cfg.Fields[0], args: args{value: "John Doe", version: 3}, wantErr: false},
"success Int64": {field: *cfg.Fields[1], args: args{value: "18", version: 1}, wantErr: false},
"success Float64": {field: *cfg.Fields[2], args: args{value: "99.9", version: 1}, wantErr: false},
"success Bool": {field: *cfg.Fields[3], args: args{value: "true", version: 1}, wantErr: false},
"failure Int64": {field: *cfg.Fields[1], args: args{value: "XXX", version: 1}, wantErr: true},
"failure Float64": {field: *cfg.Fields[2], args: args{value: "XXX", version: 1}, wantErr: true},
"failure Bool": {field: *cfg.Fields[3], args: args{value: "XXX", version: 1}, wantErr: true},
"warn String version older": {field: *cfg.Fields[0], args: args{value: "John Doe", version: 2}, wantErr: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
err := tt.field.Set(tt.args.value, tt.args.version)
if tt.wantErr {
assert.Error(t, err)
Expand All @@ -48,22 +47,21 @@ func TestNew(t *testing.T) {
type args struct {
cfg interface{}
}
tests := []struct {
name string
tests := map[string]struct {
args args
wantErr bool
}{
{name: "success", args: args{cfg: &testConfig{}}, wantErr: false},
{name: "cfg is nil", args: args{cfg: nil}, wantErr: true},
{name: "cfg is not pointer", args: args{cfg: testConfig{}}, wantErr: true},
{name: "cfg field not supported", args: args{cfg: &testInvalidTypeConfig{}}, wantErr: true},
{name: "cfg duplicate consul key", args: args{cfg: &testDuplicateConfig{}}, wantErr: true},
{name: "cfg tagged struct not supported", args: args{cfg: &testInvalidNestedStructWithTags{}}, wantErr: true},
{name: "cfg nested duplicate consul key", args: args{cfg: &testDuplicateNestedConsulConfig{}}, wantErr: true},
"success": {args: args{cfg: &testConfig{}}, wantErr: false},
"cfg is nil": {args: args{cfg: nil}, wantErr: true},
"cfg is not pointer": {args: args{cfg: testConfig{}}, wantErr: true},
"cfg field not supported": {args: args{cfg: &testInvalidTypeConfig{}}, wantErr: true},
"cfg duplicate consul key": {args: args{cfg: &testDuplicateConfig{}}, wantErr: true},
"cfg tagged struct not supported": {args: args{cfg: &testInvalidNestedStructWithTags{}}, wantErr: true},
"cfg nested duplicate consul key": {args: args{cfg: &testDuplicateNestedConsulConfig{}}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(tt.args.cfg)
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
got, err := New(tt.args.cfg, nil)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
Expand Down Expand Up @@ -97,20 +95,27 @@ func assertField(t *testing.T, fld *Field, name, typ string, sources map[Source]

func TestConfig_Set(t *testing.T) {
c := testConfig{}
cfg, err := New(&c)
chNotify := make(chan string, 1)
cfg, err := New(&c, chNotify)
require.NoError(t, err)
err = cfg.Fields[0].Set("John Doe", 1)
assert.NoError(t, err)
assert.Equal(t, "field [Name] of type [String] changed from [] to [John Doe]", <-chNotify)
err = cfg.Fields[1].Set("18", 1)
assert.NoError(t, err)
assert.Equal(t, "field [Age] of type [Int64] changed from [0] to [18]", <-chNotify)
err = cfg.Fields[2].Set("99.9", 1)
assert.NoError(t, err)
assert.Equal(t, "field [Balance] of type [Float64] changed from [0.000000] to [99.9]", <-chNotify)
err = cfg.Fields[3].Set("true", 1)
assert.NoError(t, err)
assert.Equal(t, "field [HasJob] of type [Bool] changed from [false] to [true]", <-chNotify)
err = cfg.Fields[4].Set("6000", 1)
assert.NoError(t, err)
assert.Equal(t, "field [PositionSalary] of type [Int64] changed from [0] to [6000]", <-chNotify)
err = cfg.Fields[5].Set("baz", 1)
assert.NoError(t, err)
assert.Equal(t, "field [LevelOneLevelTwoDeepField] of type [String] changed from [] to [baz]", <-chNotify)
assert.Equal(t, "John Doe", c.Name.Get())
assert.Equal(t, int64(18), c.Age.Get())
assert.Equal(t, 99.9, c.Balance.Get())
Expand Down
4 changes: 2 additions & 2 deletions config/custom_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestCustomField(t *testing.T) {
c := &testConfig{}
cfg, err := config.New(c)
cfg, err := config.New(c, nil)
assert.NoError(t, err)
err = cfg.Fields[0].Set("expected", 1)
assert.NoError(t, err)
Expand All @@ -24,7 +24,7 @@ func TestCustomField(t *testing.T) {

func TestErrorValidationOnCustomField(t *testing.T) {
c := &testConfig{}
cfg, err := config.New(c)
cfg, err := config.New(c, nil)
assert.NoError(t, err)
err = cfg.Fields[0].Set("not_expected", 1)
assert.Error(t, err)
Expand Down
14 changes: 7 additions & 7 deletions config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ func newParser() *parser {
return &parser{}
}

func (p *parser) ParseCfg(cfg interface{}) ([]*Field, error) {
func (p *parser) ParseCfg(cfg interface{}, chNotify chan<- string) ([]*Field, error) {
p.dups = make(map[Source]string)

tp := reflect.TypeOf(cfg)
if tp.Kind() != reflect.Ptr {
return nil, errors.New("configuration should be a pointer type")
}

return p.getFields("", tp.Elem(), reflect.ValueOf(cfg).Elem())
return p.getFields("", tp.Elem(), reflect.ValueOf(cfg).Elem(), chNotify)
}

func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value) ([]*Field, error) {
func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value, chNotify chan<- string) ([]*Field, error) {
var ff []*Field

for i := 0; i < tp.NumField(); i++ {
Expand All @@ -46,13 +46,13 @@ func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value) ([

switch typ {
case typeField:
fld, err := p.createField(prefix, f, val.Field(i))
fld, err := p.createField(prefix, f, val.Field(i), chNotify)
if err != nil {
return nil, err
}
ff = append(ff, fld)
case typeStruct:
nested, err := p.getFields(prefix+f.Name, f.Type, val.Field(i))
nested, err := p.getFields(prefix+f.Name, f.Type, val.Field(i), chNotify)
if err != nil {
return nil, err
}
Expand All @@ -62,8 +62,8 @@ func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value) ([
return ff, nil
}

func (p *parser) createField(prefix string, f reflect.StructField, val reflect.Value) (*Field, error) {
fld := newField(prefix, f, val)
func (p *parser) createField(prefix string, f reflect.StructField, val reflect.Value, chNotify chan<- string) (*Field, error) {
fld := newField(prefix, f, val, chNotify)

value, ok := fld.Sources()[SourceConsul]
if ok {
Expand Down
53 changes: 53 additions & 0 deletions examples/06_notification/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"context"
"log"
"os"
"sync"

"github.com/beatlabs/harvester"
harvestersync "github.com/beatlabs/harvester/sync"
)

type config struct {
IndexName harvestersync.String `seed:"customers-v1"`
CacheRetention harvestersync.Int64 `seed:"43200" env:"ENV_CACHE_RETENTION_SECONDS"`
LogLevel harvestersync.String `seed:"DEBUG" flag:"loglevel"`
}

func main() {
ctx, cnl := context.WithCancel(context.Background())
defer cnl()

err := os.Setenv("ENV_CACHE_RETENTION_SECONDS", "86400")
if err != nil {
log.Fatalf("failed to set env var: %v", err)
}

cfg := config{}
chNotify := make(chan string)
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
for change := range chNotify {
log.Printf("notification: " + change)
}
wg.Done()
}()

h, err := harvester.New(&cfg).WithNotification(chNotify).Create()
if err != nil {
log.Fatalf("failed to create harvester: %v", err)
}

err = h.Harvest(ctx)
if err != nil {
log.Fatalf("failed to harvest configuration: %v", err)
}

log.Printf("Config : IndexName: %s, CacheRetention: %d, LogLevel: %s\n", cfg.IndexName.Get(), cfg.CacheRetention.Get(), cfg.LogLevel.Get())
close(chNotify)
wg.Wait()
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/beatlabs/harvester

go 1.13
go 1.15

require (
github.com/hashicorp/go-hclog v0.15.0
github.com/hashicorp/consul/api v1.8.1
github.com/hashicorp/go-hclog v0.15.0
github.com/stretchr/testify v1.6.1
)
Loading