diff --git a/harvester.go b/harvester.go index 235bc1ae..284e632e 100644 --- a/harvester.go +++ b/harvester.go @@ -50,8 +50,8 @@ func (h *harvester) Harvest(ctx context.Context) error { } type consulConfig struct { - addr, dataCenter, token string - timeout time.Duration + addr, dataCenter, token, folderPrefix string + timeout time.Duration } // Builder of a harvester instance. @@ -101,14 +101,21 @@ func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Du // WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config // and monitors every field found tagged with ConsulLogger. func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder { + return b.WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout) +} + +// WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config +// and monitors every field found tagged with ConsulLogger. +func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder { if b.err != nil { return b } b.monitorConsulCfg = &consulConfig{ - addr: addr, - dataCenter: dataCenter, - token: token, - timeout: timeout, + addr: addr, + dataCenter: dataCenter, + token: token, + folderPrefix: folderPrefix, + timeout: timeout, } return b } @@ -254,7 +261,7 @@ func (b *Builder) setupConsulMonitoring(cfg *config.Config) (*consul.Watcher, er continue } log.Debugf(`automatically monitoring consul key "%s"`, consulKey) - items = append(items, consul.NewKeyItem(consulKey)) + items = append(items, consul.NewKeyItemWithPrefix(consulKey, b.monitorConsulCfg.folderPrefix)) } return consul.New(b.monitorConsulCfg.addr, b.monitorConsulCfg.dataCenter, b.monitorConsulCfg.token, b.monitorConsulCfg.timeout, items...) diff --git a/harvester_test.go b/harvester_test.go index a27e8140..27f23f72 100644 --- a/harvester_test.go +++ b/harvester_test.go @@ -2,6 +2,7 @@ package harvester import ( "context" + "path/filepath" "testing" "time" @@ -88,6 +89,7 @@ func TestCreateWithConsulAndRedis(t *testing.T) { got, err := New(tt.args.cfg). WithConsulSeed(tt.args.consulAddress, "", "", 0). WithConsulMonitor(tt.args.consulAddress, "", "", 0). + WithConsulFolderPrefixMonitor(tt.args.consulAddress, "", "", "", 0). WithRedisSeed(tt.args.seedRedisClient). WithRedisMonitor(tt.args.monitorRedisClient, tt.args.monitoringPollInterval). Create() @@ -128,6 +130,43 @@ func TestWithNotification(t *testing.T) { } } +func TestWithConsulFolderPrefixMonitor(t *testing.T) { + tests := []struct { + Name string + InputFolderPrefix string + ExpectedKeyLocation string + }{ + { + Name: "Setup Consul with folder prefix", + InputFolderPrefix: "folder/prefix", + ExpectedKeyLocation: "folder/prefix/key1", + }, + { + Name: "Setup Consul with empty folder prefix", + ExpectedKeyLocation: "key1", + }, + { + Name: "Setup Consul with folder prefix trailing /", + InputFolderPrefix: "folder/prefix/", + ExpectedKeyLocation: "folder/prefix/key1", + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + builder := New(testConfig{}) + builder.WithConsulFolderPrefixMonitor("addr", "data-center", "token", test.InputFolderPrefix, time.Second*42) + + assert.Equal(t, "addr", builder.monitorConsulCfg.addr) + assert.Equal(t, "data-center", builder.monitorConsulCfg.dataCenter) + assert.Equal(t, "token", builder.monitorConsulCfg.token) + assert.Equal(t, time.Second*42, builder.monitorConsulCfg.timeout) + assert.Equal(t, test.ExpectedKeyLocation, filepath.Join(builder.monitorConsulCfg.folderPrefix, "key1")) + }) + } + +} + func TestCreate_NoConsulOrRedis(t *testing.T) { cfg := &testConfigNoConsul{} got, err := New(cfg).Create() diff --git a/monitor/consul/watcher.go b/monitor/consul/watcher.go index c4584992..e5d3d468 100644 --- a/monitor/consul/watcher.go +++ b/monitor/consul/watcher.go @@ -4,6 +4,7 @@ package consul import ( "context" "errors" + "path" "time" "github.com/beatlabs/harvester/change" @@ -15,8 +16,9 @@ import ( // Item definition. type Item struct { - tp string - key string + tp string + key string + prefix string } // NewKeyItem creates a new key watch item for the watcher. @@ -24,6 +26,11 @@ func NewKeyItem(key string) Item { return Item{tp: "key", key: key} } +// NewKeyItemWithPrefix creates a new key item for a given key and prefix. +func NewKeyItemWithPrefix(key, prefix string) Item { + return Item{tp: "key", key: key, prefix: prefix} +} + // NewPrefixItem creates a prefix key watch item for the watcher. func NewPrefixItem(key string) Item { return Item{tp: "keyprefix", key: key} @@ -72,7 +79,7 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error { var err error switch i.tp { case "key": - pl, err = w.createKeyPlan(i.key, ch) + pl, err = w.createKeyPlanWithPrefix(i.key, i.prefix, ch) case "keyprefix": pl, err = w.createKeyPrefixPlan(i.key, ch) } @@ -100,8 +107,8 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error { return nil } -func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch.Plan, error) { - pl, err := w.getPlan("key", key) +func (w *Watcher) createKeyPlanWithPrefix(key, prefix string, ch chan<- []*change.Change) (*watch.Plan, error) { + pl, err := w.getPlan("key", path.Join(prefix, key)) if err != nil { return nil, err } @@ -113,7 +120,7 @@ func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch. if !ok { log.Errorf("data is not kv pair: %v", data) } else { - ch <- []*change.Change{change.New(config.SourceConsul, pair.Key, string(pair.Value), pair.ModifyIndex)} + ch <- []*change.Change{change.New(config.SourceConsul, key, string(pair.Value), pair.ModifyIndex)} } } log.Debugf("plan for key %s created", key) diff --git a/monitor/consul/watcher_integration_test.go b/monitor/consul/watcher_integration_test.go index f84f9dbe..7e822d61 100644 --- a/monitor/consul/watcher_integration_test.go +++ b/monitor/consul/watcher_integration_test.go @@ -43,7 +43,7 @@ func TestMain(m *testing.M) { func TestWatch(t *testing.T) { ch := make(chan []*change.Change) - w, err := New(addr, "", "", 0, NewKeyItem("key1"), NewPrefixItem("prefix1")) + w, err := New(addr, "", "", 0, NewKeyItemWithPrefix("key4", "consul/folder"), NewKeyItemWithPrefix("key1", ""), NewPrefixItem("prefix")) require.NoError(t, err) require.NotNil(t, w) ctx, cnl := context.WithCancel(context.Background()) @@ -61,6 +61,8 @@ func TestWatch(t *testing.T) { assert.Equal(t, "3", cng.Value()) case "key1": assert.Equal(t, "1", cng.Value()) + case "key4": + assert.Equal(t, "42", cng.Value()) default: assert.Fail(t, "key invalid", cng.Key()) } @@ -82,7 +84,11 @@ func cleanup(consul *api.Client) error { } func setup(consul *api.Client) error { - _, err := consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil) + _, err := consul.KV().Put(&api.KVPair{Key: "consul/folder/key4", Value: []byte("42")}, nil) + if err != nil { + return err + } + _, err = consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil) if err != nil { return err }