Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watcherx): allow requesting current data #243

Merged
merged 9 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/windows_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: |
go test -failfast -timeout=20m $(go list ./... | grep -v watcherx | grep -v sqlcon)
go test -failfast -timeout=20m $(go list ./... | grep -v sqlcon)
shell: bash
2 changes: 1 addition & 1 deletion configx/koanf_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ func (f *KoanfFile) Read() (map[string]interface{}, error) {

// WatchChannel watches the file and triggers a callback when it changes. It is a
// blocking function that internally spawns a goroutine to watch for changes.
func (f *KoanfFile) WatchChannel(c watcherx.EventChannel) error {
func (f *KoanfFile) WatchChannel(c watcherx.EventChannel) (watcherx.Watcher, error) {
return watcherx.WatchFile(f.ctx, f.path, c)
}
3 changes: 1 addition & 2 deletions configx/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,13 @@ func (p *Provider) addConfigFile(ctx context.Context, path string, k *koanf.Koan
cancel()
cancel = cancelInner
p.runOnChanges(e, nil)
close(c)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This caused a panic in some new test case because the receiver closed the channel instead of the sender.

span.Finish()
return
}
}
}(c)

if err := fp.WatchChannel(c); err != nil {
if _, err := fp.WatchChannel(c); err != nil {
close(c)
return err
}
Expand Down
1 change: 1 addition & 0 deletions configx/provider_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestReload(t *testing.T) {
modifiers = append(modifiers,
WithLogrusWatcher(l),
AttachWatcher(func(event watcherx.Event, err error) {
t.Log("going to call done")
wg.Done()
}),
WithContext(ctx),
Expand Down
34 changes: 29 additions & 5 deletions watcherx/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ type (
scheme string
}
EventChannel chan Event
Watcher interface {
DispatchNow() error
}
dispatcher struct {
trigger chan struct{}
}
)

// ErrSchemeUnknown is just for checking with errors.Is()
var ErrSchemeUnknown = &errSchemeUnknown{}
var (
// ErrSchemeUnknown is just for checking with errors.Is()
ErrSchemeUnknown = &errSchemeUnknown{}
ErrWatcherNotRunning = fmt.Errorf("watcher is not running")
)

func (e *errSchemeUnknown) Is(other error) bool {
_, ok := other.(*errSchemeUnknown)
Expand All @@ -25,12 +34,27 @@ func (e *errSchemeUnknown) Error() string {
return fmt.Sprintf("unknown scheme '%s' to watch", e.scheme)
}

func Watch(ctx context.Context, u *url.URL, c EventChannel) error {
func newDispatcher() *dispatcher {
return &dispatcher{
trigger: make(chan struct{}),
}
}

func (d *dispatcher) DispatchNow() error {
if d.trigger == nil {
return ErrWatcherNotRunning
}
d.trigger <- struct{}{}
return nil
}

func Watch(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error) {
switch u.Scheme {
case "file":
// see urlx.Parse for why the empty string is also file
case "file", "":
return WatchFile(ctx, u.Path, c)
case "ws":
return WatchWebsocket(ctx, u, c)
}
return &errSchemeUnknown{u.Scheme}
return nil, &errSchemeUnknown{u.Scheme}
}
45 changes: 36 additions & 9 deletions watcherx/directory.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package watcherx

import (
"bytes"
"context"
"io/ioutil"
"os"
Expand All @@ -12,10 +11,10 @@ import (
"github.com/pkg/errors"
)

func WatchDirectory(ctx context.Context, dir string, c EventChannel) error {
func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
var subDirs []string
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -27,15 +26,17 @@ func WatchDirectory(ctx context.Context, dir string, c EventChannel) error {
}
return nil
}); err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
for _, d := range append(subDirs, dir) {
if err := w.Add(d); err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
}
go streamDirectoryEvents(ctx, w, c)
return nil

d := newDispatcher()
go streamDirectoryEvents(ctx, w, c, d.trigger, dir)
return d, nil
}

func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) {
Expand Down Expand Up @@ -86,21 +87,47 @@ func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) {
}
} else {
c <- &ChangeEvent{
data: bytes.NewBuffer(data),
data: data,
source: source(e.Name),
}
}
}
}

func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel) {
func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, dir string) {
for {
select {
case <-ctx.Done():
_ = w.Close()
return
case e := <-w.Events:
handleEvent(e, w, c)
case <-sendNow:
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
data, err := ioutil.ReadFile(path)
if err != nil {
c <- &ErrorEvent{
error: err,
source: source(path),
}
} else {
c <- &ChangeEvent{
data: data,
source: source(path),
}
}
}
return nil
}); err != nil {
c <- &ErrorEvent{
error: err,
source: source(dir),
}
}
}
}
}
75 changes: 55 additions & 20 deletions watcherx/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package watcherx

import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"time"

Expand All @@ -15,8 +16,9 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

require.NoError(t, WatchDirectory(ctx, dir, c))
fileName := path.Join(dir, "example")
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)
fileName := filepath.Join(dir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, f.Close())
Expand All @@ -28,10 +30,11 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

fileName := path.Join(dir, "example")
fileName := filepath.Join(dir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, WatchDirectory(ctx, dir, c))
_, err = WatchDirectory(ctx, dir, c)
require.NoError(t, err)

_, err = fmt.Fprintf(f, "content")
require.NoError(t, f.Close())
Expand All @@ -43,12 +46,13 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

fileName := path.Join(dir, "example")
fileName := filepath.Join(dir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, f.Close())

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err = WatchDirectory(ctx, dir, c)
require.NoError(t, err)
require.NoError(t, os.Remove(fileName))

assertRemove(t, <-c, fileName)
Expand All @@ -58,12 +62,13 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

childDir := path.Join(dir, "child")
childDir := filepath.Join(dir, "child")
require.NoError(t, os.Mkdir(childDir, 0777))

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)

fileName := path.Join(childDir, "example")
fileName := filepath.Join(childDir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, f.Close())
Expand All @@ -75,11 +80,12 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)

childDir := path.Join(dir, "child")
childDir := filepath.Join(dir, "child")
require.NoError(t, os.Mkdir(childDir, 0777))
fileName := path.Join(childDir, "example")
fileName := filepath.Join(childDir, "example")
// there's not much we can do about this timeout as it takes some time until the new watcher is created
time.Sleep(time.Millisecond)
f, err := os.Create(fileName)
Expand All @@ -93,10 +99,11 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

childDir := path.Join(dir, "child")
childDir := filepath.Join(dir, "child")
require.NoError(t, os.Mkdir(childDir, 0777))

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)

require.NoError(t, os.Remove(childDir))

Expand All @@ -113,19 +120,20 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

childDir := path.Join(dir, "child")
subChildDir := path.Join(childDir, "subchild")
childDir := filepath.Join(dir, "child")
subChildDir := filepath.Join(childDir, "subchild")
require.NoError(t, os.MkdirAll(subChildDir, 0777))
f1 := path.Join(subChildDir, "f1")
f1 := filepath.Join(subChildDir, "f1")
f, err := os.Create(f1)
require.NoError(t, err)
require.NoError(t, f.Close())
f2 := path.Join(childDir, "f2")
f2 := filepath.Join(childDir, "f2")
f, err = os.Create(f2)
require.NoError(t, err)
require.NoError(t, f.Close())

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err = WatchDirectory(ctx, dir, c)
require.NoError(t, err)

require.NoError(t, os.RemoveAll(childDir))

Expand All @@ -136,4 +144,31 @@ func TestWatchDirectory(t *testing.T) {
assertRemove(t, events[0], f2)
assertRemove(t, events[1], f1)
})

t.Run("case=sends event when requested", func(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

files := map[string]string{
"a": "foo",
"b": "bar",
"c": "baz",
filepath.Join("d", "a"): "sub dir content",
}
for fn, fc := range files {
fp := filepath.Join(dir, fn)
require.NoError(t, os.MkdirAll(filepath.Dir(fp), 0700))
require.NoError(t, ioutil.WriteFile(fp, []byte(fc), 0600))
}

d, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)
require.NoError(t, d.DispatchNow())

// because filepath.WalkDir walks lexicographically, we can assume the events come in lex order
assertChange(t, <-c, files["a"], filepath.Join(dir, "a"))
assertChange(t, <-c, files["b"], filepath.Join(dir, "b"))
assertChange(t, <-c, files["c"], filepath.Join(dir, "c"))
assertChange(t, <-c, files[filepath.Join("d", "a")], filepath.Join(dir, "d", "a"))
})
}
Loading