From 1db3f11d1a38179eb360ab6772f746d12ec81455 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 17 Dec 2024 11:44:14 +0800 Subject: [PATCH 1/3] feat(file): support inotify subscribe Signed-off-by: Jiyong Huang --- internal/io/file/source.go | 15 ++++++----- internal/io/file/watch.go | 54 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) create mode 100644 internal/io/file/watch.go diff --git a/internal/io/file/source.go b/internal/io/file/source.go index c4f9ec61a5..8c13c5861b 100644 --- a/internal/io/file/source.go +++ b/internal/io/file/source.go @@ -164,6 +164,10 @@ func (fs *Source) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) er // For batch source, it ingest the whole file, thus it need a reader node to coordinate and read the content into lines/array func (fs *Source) Pull(ctx api.StreamContext, _ time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) { fs.Load(ctx, ingest, ingestError) + if fs.config.Interval == 0 && fs.eof != nil { + fs.eof(ctx) + ctx.GetLogger().Debug("All tuples sent") + } } func (fs *Source) SetEofIngest(eof api.EOFIngest) { @@ -172,7 +176,6 @@ func (fs *Source) SetEofIngest(eof api.EOFIngest) { func (fs *Source) Close(ctx api.StreamContext) error { ctx.GetLogger().Infof("Close file source") - // do nothing return nil } @@ -215,10 +218,6 @@ func (fs *Source) Load(ctx api.StreamContext, ingest api.TupleIngest, ingestErro } else { fs.parseFile(ctx, fs.file, ingest, ingestError) } - if fs.config.Interval == 0 && fs.eof != nil { - fs.eof(ctx) - ctx.GetLogger().Debug("All tuples sent") - } } func (fs *Source) parseFile(ctx api.StreamContext, file string, ingest api.TupleIngest, ingestError api.ErrorIngest) { @@ -390,6 +389,8 @@ func GetSource() api.Source { var ( // ingest possibly []byte and tuple _ api.PullTupleSource = &Source{} - _ api.Bounded = &Source{} - _ model.InfoNode = &Source{} + // if interval is not set, it uses inotify + _ api.TupleSource = &Source{} + _ api.Bounded = &Source{} + _ model.InfoNode = &Source{} ) diff --git a/internal/io/file/watch.go b/internal/io/file/watch.go new file mode 100644 index 0000000000..613e28113f --- /dev/null +++ b/internal/io/file/watch.go @@ -0,0 +1,54 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "github.com/fsnotify/fsnotify" + "github.com/lf-edge/ekuiper/contract/v2/api" +) + +func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { + fs.Load(ctx, ingest, ingestError) + if fs.isDir { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + err = watcher.Add(fs.file) + if err != nil { + return err + } + go func() { + defer watcher.Close() + for { + select { + case <-ctx.Done(): + return + case event := <-watcher.Events: + switch { + //case event.Has(fsnotify.Write): + case event.Has(fsnotify.Create): + fs.parseFile(ctx, event.Name, ingest, ingestError) + } + case err = <-watcher.Errors: + ctx.GetLogger().Errorf("file watch err:%v", err.Error()) + } + } + }() + } else { + fs.eof(ctx) + } + return nil +} From 1e8d683f2bea7f7fbaa245eaae714f327bbe58a1 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 17 Dec 2024 16:01:50 +0800 Subject: [PATCH 2/3] feat(file): add watch subscribe Signed-off-by: Jiyong Huang --- internal/io/file/watch.go | 3 + internal/io/file/watch_test.go | 131 +++++++++++++++++++++++++++++++++ pkg/mock/test_source.go | 11 +-- 3 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 internal/io/file/watch_test.go diff --git a/internal/io/file/watch.go b/internal/io/file/watch.go index 613e28113f..eacbe29c27 100644 --- a/internal/io/file/watch.go +++ b/internal/io/file/watch.go @@ -21,6 +21,7 @@ import ( func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { fs.Load(ctx, ingest, ingestError) + ctx.GetLogger().Infof("file watch loaded initially") if fs.isDir { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -40,6 +41,7 @@ func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, inges switch { //case event.Has(fsnotify.Write): case event.Has(fsnotify.Create): + ctx.GetLogger().Debugf("file watch receive creat event") fs.parseFile(ctx, event.Name, ingest, ingestError) } case err = <-watcher.Errors: @@ -48,6 +50,7 @@ func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, inges } }() } else { + ctx.GetLogger().Infof("file watch exit") fs.eof(ctx) } return nil diff --git a/internal/io/file/watch_test.go b/internal/io/file/watch_test.go new file mode 100644 index 0000000000..f4b2e7a3c9 --- /dev/null +++ b/internal/io/file/watch_test.go @@ -0,0 +1,131 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "io" + "os" + "path/filepath" + "testing" + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/lf-edge/ekuiper/v2/pkg/mock" + "github.com/lf-edge/ekuiper/v2/pkg/model" + "github.com/lf-edge/ekuiper/v2/pkg/timex" +) + +type FileWrapper struct { + f *Source +} + +func (f *FileWrapper) Provision(ctx api.StreamContext, configs map[string]any) error { + return f.f.Provision(ctx, configs) +} + +func (f FileWrapper) Close(ctx api.StreamContext) error { + return f.f.Close(ctx) +} + +func (f FileWrapper) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error { + return f.f.Connect(ctx, sch) +} + +func (f FileWrapper) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { + return f.f.Subscribe(ctx, ingest, ingestError) +} + +var _ api.TupleSource = &FileWrapper{} + +func TestWatchLinesFile(t *testing.T) { + path, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + path = filepath.Join(path, "test") + func() { + src, err := os.Open(filepath.Join(path, "test.lines")) + require.NoError(t, err) + defer src.Close() + dest, err := os.Create(filepath.Join(path, "test.lines.copy")) + assert.NoError(t, err) + defer dest.Close() + _, err = io.Copy(dest, src) + assert.NoError(t, err) + }() + + meta := map[string]any{ + "file": filepath.Join(path, "test.lines.copy"), + } + mc := timex.Clock + exp := []api.MessageTuple{ + model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, mc.Now()), + } + r := &FileWrapper{f: &Source{}} + mock.TestSourceConnector(t, r, map[string]any{ + "path": path, + "fileType": "lines", + "datasource": "test.lines.copy", + "actionAfterRead": 1, + }, exp, func() { + // do nothing + }) +} + +func TestWatchDir(t *testing.T) { + path, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + wpath := filepath.Join(path, "watch") + err = os.MkdirAll(wpath, os.ModePerm) + require.NoError(t, err) + defer os.RemoveAll(wpath) + meta := map[string]any{ + "file": filepath.Join(wpath, "test.lines.copy"), + } + mc := timex.Clock + exp := []api.MessageTuple{ + model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, mc.Now()), + } + r := &FileWrapper{f: &Source{}} + go func() { + time.Sleep(100 * time.Millisecond) + src, err := os.Open(filepath.Join(path, "test", "test.lines")) + require.NoError(t, err) + defer src.Close() + dest, err := os.Create(filepath.Join(wpath, "test.lines.copy")) + assert.NoError(t, err) + defer dest.Close() + _, err = io.Copy(dest, src) + assert.NoError(t, err) + }() + mock.TestSourceConnector(t, r, map[string]any{ + "path": wpath, + "fileType": "lines", + "actionAfterRead": 1, + }, exp, func() { + // do nothing + }) +} diff --git a/pkg/mock/test_source.go b/pkg/mock/test_source.go index 528f7fadfa..ec46ed762f 100644 --- a/pkg/mock/test_source.go +++ b/pkg/mock/test_source.go @@ -124,10 +124,6 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any } go func() { switch ss := r.(type) { - case api.BytesSource: - err = ss.Subscribe(ctx, ingestBytes, ingestErr) - case api.TupleSource: - err = ss.Subscribe(ctx, ingestTuples, ingestErr) case api.PullBytesSource, api.PullTupleSource: switch ss := r.(type) { case api.PullBytesSource: @@ -153,6 +149,10 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any } } }() + case api.BytesSource: + err = ss.Subscribe(ctx, ingestBytes, ingestErr) + case api.TupleSource: + err = ss.Subscribe(ctx, ingestTuples, ingestErr) default: panic("wrong source type") } @@ -168,10 +168,11 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any wg.Wait() close(finished) }() + time.Sleep(5 * time.Second) select { case <-ctx.Done(): case <-finished: - cancel() + //cancel() case <-ticker: cancel() assert.Fail(t, "timeout") From ca1cb9102560aa68a6d0b9fe856ca028c1aa0f15 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 17 Dec 2024 16:15:29 +0800 Subject: [PATCH 3/3] feat(planner): plan for file watch Signed-off-by: Jiyong Huang --- internal/io/file/source.go | 14 ++++++++-- internal/io/file/watch.go | 41 ++++++++++++++++++++++----- internal/io/file/watch_test.go | 46 ++++++++----------------------- internal/topo/node/source_node.go | 4 +++ pkg/mock/test_source.go | 3 +- pkg/model/io.go | 1 + 6 files changed, 63 insertions(+), 46 deletions(-) diff --git a/internal/io/file/source.go b/internal/io/file/source.go index 8c13c5861b..e28833af3f 100644 --- a/internal/io/file/source.go +++ b/internal/io/file/source.go @@ -382,6 +382,15 @@ func (fs *Source) Info() (i model.NodeInfo) { return } +// TransformType must call after provision +func (fs *Source) TransformType() api.Source { + // If interval is not set, use watch source + if fs.config.Interval == 0 { + return &WatchWrapper{f: fs} + } + return fs +} + func GetSource() api.Source { return &Source{} } @@ -390,7 +399,6 @@ var ( // ingest possibly []byte and tuple _ api.PullTupleSource = &Source{} // if interval is not set, it uses inotify - _ api.TupleSource = &Source{} - _ api.Bounded = &Source{} - _ model.InfoNode = &Source{} + _ api.Bounded = &Source{} + _ model.InfoNode = &Source{} ) diff --git a/internal/io/file/watch.go b/internal/io/file/watch.go index eacbe29c27..2d6c51fd5f 100644 --- a/internal/io/file/watch.go +++ b/internal/io/file/watch.go @@ -19,15 +19,35 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" ) -func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { - fs.Load(ctx, ingest, ingestError) +type WatchWrapper struct { + f *Source +} + +func (f *WatchWrapper) SetEofIngest(eof api.EOFIngest) { + f.f.SetEofIngest(eof) +} + +func (f *WatchWrapper) Provision(ctx api.StreamContext, configs map[string]any) error { + return f.f.Provision(ctx, configs) +} + +func (f *WatchWrapper) Close(ctx api.StreamContext) error { + return f.f.Close(ctx) +} + +func (f *WatchWrapper) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error { + return f.f.Connect(ctx, sch) +} + +func (f *WatchWrapper) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { + f.f.Load(ctx, ingest, ingestError) ctx.GetLogger().Infof("file watch loaded initially") - if fs.isDir { + if f.f.isDir { watcher, err := fsnotify.NewWatcher() if err != nil { return err } - err = watcher.Add(fs.file) + err = watcher.Add(f.f.file) if err != nil { return err } @@ -39,10 +59,10 @@ func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, inges return case event := <-watcher.Events: switch { - //case event.Has(fsnotify.Write): + // case event.Has(fsnotify.Write): case event.Has(fsnotify.Create): ctx.GetLogger().Debugf("file watch receive creat event") - fs.parseFile(ctx, event.Name, ingest, ingestError) + f.f.parseFile(ctx, event.Name, ingest, ingestError) } case err = <-watcher.Errors: ctx.GetLogger().Errorf("file watch err:%v", err.Error()) @@ -51,7 +71,14 @@ func (fs *Source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, inges }() } else { ctx.GetLogger().Infof("file watch exit") - fs.eof(ctx) + if f.f != nil && f.f.eof != nil { + f.f.eof(ctx) + } } return nil } + +var ( + _ api.TupleSource = &WatchWrapper{} + _ api.Bounded = &WatchWrapper{} +) diff --git a/internal/io/file/watch_test.go b/internal/io/file/watch_test.go index f4b2e7a3c9..1896b5463b 100644 --- a/internal/io/file/watch_test.go +++ b/internal/io/file/watch_test.go @@ -30,28 +30,6 @@ import ( "github.com/lf-edge/ekuiper/v2/pkg/timex" ) -type FileWrapper struct { - f *Source -} - -func (f *FileWrapper) Provision(ctx api.StreamContext, configs map[string]any) error { - return f.f.Provision(ctx, configs) -} - -func (f FileWrapper) Close(ctx api.StreamContext) error { - return f.f.Close(ctx) -} - -func (f FileWrapper) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error { - return f.f.Connect(ctx, sch) -} - -func (f FileWrapper) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { - return f.f.Subscribe(ctx, ingest, ingestError) -} - -var _ api.TupleSource = &FileWrapper{} - func TestWatchLinesFile(t *testing.T) { path, err := os.Getwd() if err != nil { @@ -72,14 +50,14 @@ func TestWatchLinesFile(t *testing.T) { meta := map[string]any{ "file": filepath.Join(path, "test.lines.copy"), } - mc := timex.Clock + timex.Set(123456) exp := []api.MessageTuple{ - model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, mc.Now()), - model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, mc.Now()), - model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, mc.Now()), - model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, timex.GetNow()), + model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, timex.GetNow()), + model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, timex.GetNow()), + model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, timex.GetNow()), } - r := &FileWrapper{f: &Source{}} + r := &WatchWrapper{f: &Source{}} mock.TestSourceConnector(t, r, map[string]any{ "path": path, "fileType": "lines", @@ -102,14 +80,14 @@ func TestWatchDir(t *testing.T) { meta := map[string]any{ "file": filepath.Join(wpath, "test.lines.copy"), } - mc := timex.Clock + timex.Set(654321) exp := []api.MessageTuple{ - model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, mc.Now()), - model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, mc.Now()), - model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, mc.Now()), - model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, mc.Now()), + model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, timex.GetNow()), + model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, timex.GetNow()), + model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, timex.GetNow()), + model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, timex.GetNow()), } - r := &FileWrapper{f: &Source{}} + r := &WatchWrapper{f: &Source{}} go func() { time.Sleep(100 * time.Millisecond) src, err := os.Open(filepath.Join(path, "test", "test.lines")) diff --git a/internal/topo/node/source_node.go b/internal/topo/node/source_node.go index 08a801259d..8f86b7623e 100644 --- a/internal/topo/node/source_node.go +++ b/internal/topo/node/source_node.go @@ -30,6 +30,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/infra" + "github.com/lf-edge/ekuiper/v2/pkg/model" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) @@ -55,6 +56,9 @@ func NewSourceNode(ctx api.StreamContext, name string, ss api.Source, props map[ return nil, err } ctx.GetLogger().Infof("provision source %s with props %+v", name, props) + if sit, ok := ss.(model.InfoNode); ok { + ss = sit.TransformType() + } cc := &sourceConf{} err = cast.MapToStruct(props, cc) if err != nil { diff --git a/pkg/mock/test_source.go b/pkg/mock/test_source.go index ec46ed762f..841c01389a 100644 --- a/pkg/mock/test_source.go +++ b/pkg/mock/test_source.go @@ -168,11 +168,10 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any wg.Wait() close(finished) }() - time.Sleep(5 * time.Second) select { case <-ctx.Done(): case <-finished: - //cancel() + cancel() case <-ticker: cancel() assert.Fail(t, "timeout") diff --git a/pkg/model/io.go b/pkg/model/io.go index f522254b6e..1dc247dbee 100644 --- a/pkg/model/io.go +++ b/pkg/model/io.go @@ -33,6 +33,7 @@ type StreamReader interface { // InfoNode explain the node itself. Mainly used for planner to decide the split of source/sink type InfoNode interface { Info() NodeInfo + TransformType() api.Source } type NodeInfo struct {