Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PS-827: Enable harvester to use dynamic prefix for keys #103

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (h *harvester) Harvest(ctx context.Context) error {
}

type consulConfig struct {
addr, dataCenter, token string
timeout time.Duration
addr, dataCenter, token, folderPrefix string
timeout time.Duration
}

// Builder of a harvester instance.
Expand Down Expand Up @@ -101,14 +101,21 @@ func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Du
// WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder {
return b.WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout)
}

// WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder {
if b.err != nil {
return b
}
b.monitorConsulCfg = &consulConfig{
addr: addr,
dataCenter: dataCenter,
token: token,
timeout: timeout,
addr: addr,
dataCenter: dataCenter,
token: token,
folderPrefix: folderPrefix,
timeout: timeout,
}
return b
}
Expand Down Expand Up @@ -254,7 +261,7 @@ func (b *Builder) setupConsulMonitoring(cfg *config.Config) (*consul.Watcher, er
continue
}
log.Debugf(`automatically monitoring consul key "%s"`, consulKey)
items = append(items, consul.NewKeyItem(consulKey))
items = append(items, consul.NewKeyItemWithPrefix(consulKey, b.monitorConsulCfg.folderPrefix))
}
return consul.New(b.monitorConsulCfg.addr, b.monitorConsulCfg.dataCenter, b.monitorConsulCfg.token,
b.monitorConsulCfg.timeout, items...)
Expand Down
39 changes: 39 additions & 0 deletions harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package harvester

import (
"context"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,7 @@ func TestCreateWithConsulAndRedis(t *testing.T) {
got, err := New(tt.args.cfg).
WithConsulSeed(tt.args.consulAddress, "", "", 0).
WithConsulMonitor(tt.args.consulAddress, "", "", 0).
WithConsulFolderPrefixMonitor(tt.args.consulAddress, "", "", "", 0).
WithRedisSeed(tt.args.seedRedisClient).
WithRedisMonitor(tt.args.monitorRedisClient, tt.args.monitoringPollInterval).
Create()
Expand Down Expand Up @@ -128,6 +130,43 @@ func TestWithNotification(t *testing.T) {
}
}

func TestWithConsulFolderPrefixMonitor(t *testing.T) {
tests := []struct {
Name string
InputFolderPrefix string
ExpectedKeyLocation string
}{
{
Name: "Setup Consul with folder prefix",
InputFolderPrefix: "folder/prefix",
ExpectedKeyLocation: "folder/prefix/key1",
},
{
Name: "Setup Consul with empty folder prefix",
ExpectedKeyLocation: "key1",
},
{
Name: "Setup Consul with folder prefix trailing /",
InputFolderPrefix: "folder/prefix/",
ExpectedKeyLocation: "folder/prefix/key1",
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
builder := New(testConfig{})
builder.WithConsulFolderPrefixMonitor("addr", "data-center", "token", test.InputFolderPrefix, time.Second*42)

assert.Equal(t, "addr", builder.monitorConsulCfg.addr)
assert.Equal(t, "data-center", builder.monitorConsulCfg.dataCenter)
assert.Equal(t, "token", builder.monitorConsulCfg.token)
assert.Equal(t, time.Second*42, builder.monitorConsulCfg.timeout)
assert.Equal(t, test.ExpectedKeyLocation, filepath.Join(builder.monitorConsulCfg.folderPrefix, "key1"))
})
}

}

func TestCreate_NoConsulOrRedis(t *testing.T) {
cfg := &testConfigNoConsul{}
got, err := New(cfg).Create()
Expand Down
19 changes: 13 additions & 6 deletions monitor/consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package consul
import (
"context"
"errors"
"path"
"time"

"github.com/beatlabs/harvester/change"
Expand All @@ -15,15 +16,21 @@ import (

// Item definition.
type Item struct {
tp string
key string
tp string
key string
prefix string
}

// NewKeyItem creates a new key watch item for the watcher.
func NewKeyItem(key string) Item {
return Item{tp: "key", key: key}
}

// NewKeyItemWithPrefix creates a new key item for a given key and prefix.
func NewKeyItemWithPrefix(key, prefix string) Item {
return Item{tp: "key", key: key, prefix: prefix}
}

// NewPrefixItem creates a prefix key watch item for the watcher.
func NewPrefixItem(key string) Item {
return Item{tp: "keyprefix", key: key}
Expand Down Expand Up @@ -72,7 +79,7 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
var err error
switch i.tp {
case "key":
pl, err = w.createKeyPlan(i.key, ch)
pl, err = w.createKeyPlanWithPrefix(i.key, i.prefix, ch)
case "keyprefix":
pl, err = w.createKeyPrefixPlan(i.key, ch)
}
Expand Down Expand Up @@ -100,8 +107,8 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
return nil
}

func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch.Plan, error) {
pl, err := w.getPlan("key", key)
func (w *Watcher) createKeyPlanWithPrefix(key, prefix string, ch chan<- []*change.Change) (*watch.Plan, error) {
pl, err := w.getPlan("key", path.Join(prefix, key))
if err != nil {
return nil, err
}
Expand All @@ -113,7 +120,7 @@ func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch.
if !ok {
log.Errorf("data is not kv pair: %v", data)
} else {
ch <- []*change.Change{change.New(config.SourceConsul, pair.Key, string(pair.Value), pair.ModifyIndex)}
ch <- []*change.Change{change.New(config.SourceConsul, key, string(pair.Value), pair.ModifyIndex)}
}
}
log.Debugf("plan for key %s created", key)
Expand Down
10 changes: 8 additions & 2 deletions monitor/consul/watcher_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestMain(m *testing.M) {

func TestWatch(t *testing.T) {
ch := make(chan []*change.Change)
w, err := New(addr, "", "", 0, NewKeyItem("key1"), NewPrefixItem("prefix1"))
w, err := New(addr, "", "", 0, NewKeyItemWithPrefix("key4", "consul/folder"), NewKeyItemWithPrefix("key1", ""), NewPrefixItem("prefix"))
require.NoError(t, err)
require.NotNil(t, w)
ctx, cnl := context.WithCancel(context.Background())
Expand All @@ -61,6 +61,8 @@ func TestWatch(t *testing.T) {
assert.Equal(t, "3", cng.Value())
case "key1":
assert.Equal(t, "1", cng.Value())
case "key4":
assert.Equal(t, "42", cng.Value())
default:
assert.Fail(t, "key invalid", cng.Key())
}
Expand All @@ -82,7 +84,11 @@ func cleanup(consul *api.Client) error {
}

func setup(consul *api.Client) error {
_, err := consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil)
_, err := consul.KV().Put(&api.KVPair{Key: "consul/folder/key4", Value: []byte("42")}, nil)
if err != nil {
return err
}
_, err = consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil)
if err != nil {
return err
}
Expand Down