Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(io): file source support inotify subscribe #3448

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions internal/io/file/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (fs *Source) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) er
// For batch source, it ingest the whole file, thus it need a reader node to coordinate and read the content into lines/array
func (fs *Source) Pull(ctx api.StreamContext, _ time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
fs.Load(ctx, ingest, ingestError)
if fs.config.Interval == 0 && fs.eof != nil {
fs.eof(ctx)
ctx.GetLogger().Debug("All tuples sent")
}
}

func (fs *Source) SetEofIngest(eof api.EOFIngest) {
Expand All @@ -172,7 +176,6 @@ func (fs *Source) SetEofIngest(eof api.EOFIngest) {

func (fs *Source) Close(ctx api.StreamContext) error {
ctx.GetLogger().Infof("Close file source")
// do nothing
return nil
}

Expand Down Expand Up @@ -215,10 +218,6 @@ func (fs *Source) Load(ctx api.StreamContext, ingest api.TupleIngest, ingestErro
} else {
fs.parseFile(ctx, fs.file, ingest, ingestError)
}
if fs.config.Interval == 0 && fs.eof != nil {
fs.eof(ctx)
ctx.GetLogger().Debug("All tuples sent")
}
}

func (fs *Source) parseFile(ctx api.StreamContext, file string, ingest api.TupleIngest, ingestError api.ErrorIngest) {
Expand Down Expand Up @@ -383,13 +382,23 @@ func (fs *Source) Info() (i model.NodeInfo) {
return
}

// TransformType must call after provision
func (fs *Source) TransformType() api.Source {
// If interval is not set, use watch source
if fs.config.Interval == 0 {
return &WatchWrapper{f: fs}
}
return fs
}

func GetSource() api.Source {
return &Source{}
}

var (
// ingest possibly []byte and tuple
_ api.PullTupleSource = &Source{}
_ api.Bounded = &Source{}
_ model.InfoNode = &Source{}
// if interval is not set, it uses inotify
_ api.Bounded = &Source{}
_ model.InfoNode = &Source{}
)
84 changes: 84 additions & 0 deletions internal/io/file/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package file

import (
"github.com/fsnotify/fsnotify"
"github.com/lf-edge/ekuiper/contract/v2/api"
)

type WatchWrapper struct {
f *Source
}

func (f *WatchWrapper) SetEofIngest(eof api.EOFIngest) {
f.f.SetEofIngest(eof)
}

func (f *WatchWrapper) Provision(ctx api.StreamContext, configs map[string]any) error {
return f.f.Provision(ctx, configs)
}

func (f *WatchWrapper) Close(ctx api.StreamContext) error {
return f.f.Close(ctx)
}

func (f *WatchWrapper) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error {
return f.f.Connect(ctx, sch)
}

func (f *WatchWrapper) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error {
f.f.Load(ctx, ingest, ingestError)
ctx.GetLogger().Infof("file watch loaded initially")
if f.f.isDir {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}

Check warning on line 49 in internal/io/file/watch.go

View check run for this annotation

Codecov / codecov/patch

internal/io/file/watch.go#L48-L49

Added lines #L48 - L49 were not covered by tests
err = watcher.Add(f.f.file)
if err != nil {
return err
}

Check warning on line 53 in internal/io/file/watch.go

View check run for this annotation

Codecov / codecov/patch

internal/io/file/watch.go#L52-L53

Added lines #L52 - L53 were not covered by tests
go func() {
defer watcher.Close()
for {
select {
case <-ctx.Done():
return
case event := <-watcher.Events:
switch {
// case event.Has(fsnotify.Write):
case event.Has(fsnotify.Create):
ctx.GetLogger().Debugf("file watch receive creat event")
f.f.parseFile(ctx, event.Name, ingest, ingestError)
}
case err = <-watcher.Errors:
ctx.GetLogger().Errorf("file watch err:%v", err.Error())

Check warning on line 68 in internal/io/file/watch.go

View check run for this annotation

Codecov / codecov/patch

internal/io/file/watch.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}
}()
} else {
ctx.GetLogger().Infof("file watch exit")
if f.f != nil && f.f.eof != nil {
f.f.eof(ctx)
}
}
return nil
}

var (
_ api.TupleSource = &WatchWrapper{}
_ api.Bounded = &WatchWrapper{}
)
109 changes: 109 additions & 0 deletions internal/io/file/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package file

import (
"io"
"os"
"path/filepath"
"testing"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/pkg/mock"
"github.com/lf-edge/ekuiper/v2/pkg/model"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

func TestWatchLinesFile(t *testing.T) {
path, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
path = filepath.Join(path, "test")
func() {
src, err := os.Open(filepath.Join(path, "test.lines"))
require.NoError(t, err)
defer src.Close()
dest, err := os.Create(filepath.Join(path, "test.lines.copy"))
assert.NoError(t, err)
defer dest.Close()
_, err = io.Copy(dest, src)
assert.NoError(t, err)
}()

meta := map[string]any{
"file": filepath.Join(path, "test.lines.copy"),
}
timex.Set(123456)
exp := []api.MessageTuple{
model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, timex.GetNow()),
model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, timex.GetNow()),
model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, timex.GetNow()),
model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, timex.GetNow()),
}
r := &WatchWrapper{f: &Source{}}
mock.TestSourceConnector(t, r, map[string]any{
"path": path,
"fileType": "lines",
"datasource": "test.lines.copy",
"actionAfterRead": 1,
}, exp, func() {
// do nothing
})
}

func TestWatchDir(t *testing.T) {
path, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
wpath := filepath.Join(path, "watch")
err = os.MkdirAll(wpath, os.ModePerm)
require.NoError(t, err)
defer os.RemoveAll(wpath)
meta := map[string]any{
"file": filepath.Join(wpath, "test.lines.copy"),
}
timex.Set(654321)
exp := []api.MessageTuple{
model.NewDefaultRawTuple([]byte("{\"id\": 1,\"name\": \"John Doe\"}"), meta, timex.GetNow()),
model.NewDefaultRawTuple([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta, timex.GetNow()),
model.NewDefaultRawTuple([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta, timex.GetNow()),
model.NewDefaultRawTuple([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta, timex.GetNow()),
}
r := &WatchWrapper{f: &Source{}}
go func() {
time.Sleep(100 * time.Millisecond)
src, err := os.Open(filepath.Join(path, "test", "test.lines"))
require.NoError(t, err)
defer src.Close()
dest, err := os.Create(filepath.Join(wpath, "test.lines.copy"))
assert.NoError(t, err)
defer dest.Close()
_, err = io.Copy(dest, src)
assert.NoError(t, err)
}()
mock.TestSourceConnector(t, r, map[string]any{
"path": wpath,
"fileType": "lines",
"actionAfterRead": 1,
}, exp, func() {
// do nothing
})
}
4 changes: 4 additions & 0 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/model"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

Expand All @@ -55,6 +56,9 @@ func NewSourceNode(ctx api.StreamContext, name string, ss api.Source, props map[
return nil, err
}
ctx.GetLogger().Infof("provision source %s with props %+v", name, props)
if sit, ok := ss.(model.InfoNode); ok {
ss = sit.TransformType()
}
cc := &sourceConf{}
err = cast.MapToStruct(props, cc)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/test_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any
}
go func() {
switch ss := r.(type) {
case api.BytesSource:
err = ss.Subscribe(ctx, ingestBytes, ingestErr)
case api.TupleSource:
err = ss.Subscribe(ctx, ingestTuples, ingestErr)
case api.PullBytesSource, api.PullTupleSource:
switch ss := r.(type) {
case api.PullBytesSource:
Expand All @@ -153,6 +149,10 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any
}
}
}()
case api.BytesSource:
err = ss.Subscribe(ctx, ingestBytes, ingestErr)
case api.TupleSource:
err = ss.Subscribe(ctx, ingestTuples, ingestErr)
default:
panic("wrong source type")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/model/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type StreamReader interface {
// InfoNode explain the node itself. Mainly used for planner to decide the split of source/sink
type InfoNode interface {
Info() NodeInfo
TransformType() api.Source
}

type NodeInfo struct {
Expand Down
Loading