From 20d42200ad452950d79af94d82e3c3f49164207e Mon Sep 17 00:00:00 2001 From: Wim Date: Thu, 7 Oct 2021 16:38:43 +0200 Subject: [PATCH 1/4] added dynamic prefix for watcher of consul keys --- harvester.go | 22 +++++++++++++++++++--- harvester_integration_test.go | 2 +- monitor/consul/watcher.go | 19 +++++++++++++------ monitor/consul/watcher_integration_test.go | 4 ++-- monitor/redis/watcher_integration_test.go | 2 +- seed/consul/getter_integration_test.go | 2 +- seed/redis/getter_integration_test.go | 2 +- 7 files changed, 38 insertions(+), 15 deletions(-) diff --git a/harvester.go b/harvester.go index 235bc1ae..84b4724a 100644 --- a/harvester.go +++ b/harvester.go @@ -50,8 +50,8 @@ func (h *harvester) Harvest(ctx context.Context) error { } type consulConfig struct { - addr, dataCenter, token string - timeout time.Duration + addr, dataCenter, token, folderPrefix string + timeout time.Duration } // Builder of a harvester instance. @@ -98,6 +98,22 @@ func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Du return b } +// WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config +// and monitors every field found tagged with ConsulLogger. +func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder { + if b.err != nil { + return b + } + b.monitorConsulCfg = &consulConfig{ + addr: addr, + dataCenter: dataCenter, + token: token, + folderPrefix: folderPrefix, + timeout: timeout, + } + return b +} + // WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config // and monitors every field found tagged with ConsulLogger. func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder { @@ -254,7 +270,7 @@ func (b *Builder) setupConsulMonitoring(cfg *config.Config) (*consul.Watcher, er continue } log.Debugf(`automatically monitoring consul key "%s"`, consulKey) - items = append(items, consul.NewKeyItem(consulKey)) + items = append(items, consul.NewKeyItemWithPrefix(consulKey, b.monitorConsulCfg.folderPrefix)) } return consul.New(b.monitorConsulCfg.addr, b.monitorConsulCfg.dataCenter, b.monitorConsulCfg.token, b.monitorConsulCfg.timeout, items...) diff --git a/harvester_integration_test.go b/harvester_integration_test.go index a2d105dd..c336511f 100644 --- a/harvester_integration_test.go +++ b/harvester_integration_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration package harvester diff --git a/monitor/consul/watcher.go b/monitor/consul/watcher.go index c4584992..e5d3d468 100644 --- a/monitor/consul/watcher.go +++ b/monitor/consul/watcher.go @@ -4,6 +4,7 @@ package consul import ( "context" "errors" + "path" "time" "github.com/beatlabs/harvester/change" @@ -15,8 +16,9 @@ import ( // Item definition. type Item struct { - tp string - key string + tp string + key string + prefix string } // NewKeyItem creates a new key watch item for the watcher. @@ -24,6 +26,11 @@ func NewKeyItem(key string) Item { return Item{tp: "key", key: key} } +// NewKeyItemWithPrefix creates a new key item for a given key and prefix. +func NewKeyItemWithPrefix(key, prefix string) Item { + return Item{tp: "key", key: key, prefix: prefix} +} + // NewPrefixItem creates a prefix key watch item for the watcher. func NewPrefixItem(key string) Item { return Item{tp: "keyprefix", key: key} @@ -72,7 +79,7 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error { var err error switch i.tp { case "key": - pl, err = w.createKeyPlan(i.key, ch) + pl, err = w.createKeyPlanWithPrefix(i.key, i.prefix, ch) case "keyprefix": pl, err = w.createKeyPrefixPlan(i.key, ch) } @@ -100,8 +107,8 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error { return nil } -func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch.Plan, error) { - pl, err := w.getPlan("key", key) +func (w *Watcher) createKeyPlanWithPrefix(key, prefix string, ch chan<- []*change.Change) (*watch.Plan, error) { + pl, err := w.getPlan("key", path.Join(prefix, key)) if err != nil { return nil, err } @@ -113,7 +120,7 @@ func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch. if !ok { log.Errorf("data is not kv pair: %v", data) } else { - ch <- []*change.Change{change.New(config.SourceConsul, pair.Key, string(pair.Value), pair.ModifyIndex)} + ch <- []*change.Change{change.New(config.SourceConsul, key, string(pair.Value), pair.ModifyIndex)} } } log.Debugf("plan for key %s created", key) diff --git a/monitor/consul/watcher_integration_test.go b/monitor/consul/watcher_integration_test.go index f84f9dbe..e05be1d9 100644 --- a/monitor/consul/watcher_integration_test.go +++ b/monitor/consul/watcher_integration_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration package consul @@ -43,7 +43,7 @@ func TestMain(m *testing.M) { func TestWatch(t *testing.T) { ch := make(chan []*change.Change) - w, err := New(addr, "", "", 0, NewKeyItem("key1"), NewPrefixItem("prefix1")) + w, err := New(addr, "", "", 0, NewKeyItemWithPrefix("key1", ""), NewPrefixItem("prefix1")) require.NoError(t, err) require.NotNil(t, w) ctx, cnl := context.WithCancel(context.Background()) diff --git a/monitor/redis/watcher_integration_test.go b/monitor/redis/watcher_integration_test.go index 080b31a6..422f7aa3 100644 --- a/monitor/redis/watcher_integration_test.go +++ b/monitor/redis/watcher_integration_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration package redis diff --git a/seed/consul/getter_integration_test.go b/seed/consul/getter_integration_test.go index 197d72b5..a232925c 100644 --- a/seed/consul/getter_integration_test.go +++ b/seed/consul/getter_integration_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration package consul diff --git a/seed/redis/getter_integration_test.go b/seed/redis/getter_integration_test.go index 2dc323cf..ee6fa861 100644 --- a/seed/redis/getter_integration_test.go +++ b/seed/redis/getter_integration_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration package redis From 469c8615b627a7737c8a3f96840cce37c9ac304d Mon Sep 17 00:00:00 2001 From: Wim Date: Fri, 8 Oct 2021 10:47:09 +0200 Subject: [PATCH 2/4] add integration test case for key with folder prefix --- monitor/consul/watcher_integration_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/monitor/consul/watcher_integration_test.go b/monitor/consul/watcher_integration_test.go index e05be1d9..b9b564cc 100644 --- a/monitor/consul/watcher_integration_test.go +++ b/monitor/consul/watcher_integration_test.go @@ -4,6 +4,7 @@ package consul import ( "context" + "fmt" "log" "os" "testing" @@ -43,7 +44,7 @@ func TestMain(m *testing.M) { func TestWatch(t *testing.T) { ch := make(chan []*change.Change) - w, err := New(addr, "", "", 0, NewKeyItemWithPrefix("key1", ""), NewPrefixItem("prefix1")) + w, err := New(addr, "", "", 0, NewKeyItemWithPrefix("key4", "consul/folder"), NewKeyItemWithPrefix("key1", ""), NewPrefixItem("prefix")) require.NoError(t, err) require.NotNil(t, w) ctx, cnl := context.WithCancel(context.Background()) @@ -61,6 +62,8 @@ func TestWatch(t *testing.T) { assert.Equal(t, "3", cng.Value()) case "key1": assert.Equal(t, "1", cng.Value()) + case "key4": + assert.Equal(t, "42", cng.Value()) default: assert.Fail(t, "key invalid", cng.Key()) } @@ -82,7 +85,11 @@ func cleanup(consul *api.Client) error { } func setup(consul *api.Client) error { - _, err := consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil) + _, err := consul.KV().Put(&api.KVPair{Key: "consul/folder/key4", Value: []byte("42")}, nil) + if err != nil { + return err + } + _, err = consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil) if err != nil { return err } From a02495d5cf7e78128cca5bced4469e2e9558f708 Mon Sep 17 00:00:00 2001 From: Wim Date: Fri, 8 Oct 2021 11:06:25 +0200 Subject: [PATCH 3/4] revert to build tags < go 1.17 --- harvester.go | 21 ++++++--------------- harvester_integration_test.go | 2 +- monitor/consul/watcher_integration_test.go | 3 +-- monitor/redis/watcher_integration_test.go | 2 +- seed/consul/getter_integration_test.go | 2 +- seed/redis/getter_integration_test.go | 2 +- 6 files changed, 11 insertions(+), 21 deletions(-) diff --git a/harvester.go b/harvester.go index 84b4724a..284e632e 100644 --- a/harvester.go +++ b/harvester.go @@ -98,6 +98,12 @@ func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Du return b } +// WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config +// and monitors every field found tagged with ConsulLogger. +func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder { + return b.WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout) +} + // WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config // and monitors every field found tagged with ConsulLogger. func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder { @@ -114,21 +120,6 @@ func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderP return b } -// WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config -// and monitors every field found tagged with ConsulLogger. -func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder { - if b.err != nil { - return b - } - b.monitorConsulCfg = &consulConfig{ - addr: addr, - dataCenter: dataCenter, - token: token, - timeout: timeout, - } - return b -} - // WithRedisSeed enables support for seeding values with redis. func (b *Builder) WithRedisSeed(client redis.UniversalClient) *Builder { if b.err != nil { diff --git a/harvester_integration_test.go b/harvester_integration_test.go index c336511f..a2d105dd 100644 --- a/harvester_integration_test.go +++ b/harvester_integration_test.go @@ -1,4 +1,4 @@ -//go:build integration +// +build integration package harvester diff --git a/monitor/consul/watcher_integration_test.go b/monitor/consul/watcher_integration_test.go index b9b564cc..7e822d61 100644 --- a/monitor/consul/watcher_integration_test.go +++ b/monitor/consul/watcher_integration_test.go @@ -1,10 +1,9 @@ -//go:build integration +// +build integration package consul import ( "context" - "fmt" "log" "os" "testing" diff --git a/monitor/redis/watcher_integration_test.go b/monitor/redis/watcher_integration_test.go index 422f7aa3..080b31a6 100644 --- a/monitor/redis/watcher_integration_test.go +++ b/monitor/redis/watcher_integration_test.go @@ -1,4 +1,4 @@ -//go:build integration +// +build integration package redis diff --git a/seed/consul/getter_integration_test.go b/seed/consul/getter_integration_test.go index a232925c..197d72b5 100644 --- a/seed/consul/getter_integration_test.go +++ b/seed/consul/getter_integration_test.go @@ -1,4 +1,4 @@ -//go:build integration +// +build integration package consul diff --git a/seed/redis/getter_integration_test.go b/seed/redis/getter_integration_test.go index ee6fa861..2dc323cf 100644 --- a/seed/redis/getter_integration_test.go +++ b/seed/redis/getter_integration_test.go @@ -1,4 +1,4 @@ -//go:build integration +// +build integration package redis From 75a5de25150a20552d534eea0aa6419820f861aa Mon Sep 17 00:00:00 2001 From: Wim Date: Fri, 8 Oct 2021 11:30:43 +0200 Subject: [PATCH 4/4] add unit test verifying monitorConsulCfg created by WithConsulFolderPrefixMonitor --- harvester_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/harvester_test.go b/harvester_test.go index a27e8140..27f23f72 100644 --- a/harvester_test.go +++ b/harvester_test.go @@ -2,6 +2,7 @@ package harvester import ( "context" + "path/filepath" "testing" "time" @@ -88,6 +89,7 @@ func TestCreateWithConsulAndRedis(t *testing.T) { got, err := New(tt.args.cfg). WithConsulSeed(tt.args.consulAddress, "", "", 0). WithConsulMonitor(tt.args.consulAddress, "", "", 0). + WithConsulFolderPrefixMonitor(tt.args.consulAddress, "", "", "", 0). WithRedisSeed(tt.args.seedRedisClient). WithRedisMonitor(tt.args.monitorRedisClient, tt.args.monitoringPollInterval). Create() @@ -128,6 +130,43 @@ func TestWithNotification(t *testing.T) { } } +func TestWithConsulFolderPrefixMonitor(t *testing.T) { + tests := []struct { + Name string + InputFolderPrefix string + ExpectedKeyLocation string + }{ + { + Name: "Setup Consul with folder prefix", + InputFolderPrefix: "folder/prefix", + ExpectedKeyLocation: "folder/prefix/key1", + }, + { + Name: "Setup Consul with empty folder prefix", + ExpectedKeyLocation: "key1", + }, + { + Name: "Setup Consul with folder prefix trailing /", + InputFolderPrefix: "folder/prefix/", + ExpectedKeyLocation: "folder/prefix/key1", + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + builder := New(testConfig{}) + builder.WithConsulFolderPrefixMonitor("addr", "data-center", "token", test.InputFolderPrefix, time.Second*42) + + assert.Equal(t, "addr", builder.monitorConsulCfg.addr) + assert.Equal(t, "data-center", builder.monitorConsulCfg.dataCenter) + assert.Equal(t, "token", builder.monitorConsulCfg.token) + assert.Equal(t, time.Second*42, builder.monitorConsulCfg.timeout) + assert.Equal(t, test.ExpectedKeyLocation, filepath.Join(builder.monitorConsulCfg.folderPrefix, "key1")) + }) + } + +} + func TestCreate_NoConsulOrRedis(t *testing.T) { cfg := &testConfigNoConsul{} got, err := New(cfg).Create()