Skip to content

Commit

Permalink
feat: support fileDir notify source (#3380)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Nov 26, 2024
1 parent 6152fcf commit 1d93d7e
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 1 deletion.
4 changes: 4 additions & 0 deletions etc/sources/dirwatch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
default:
path: /example
allowedExtension:
- txt
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ require (
github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa // indirect
github.com/gabriel-vasile/mimetype v1.4.5 // indirect
github.com/go-faster/city v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa h1:RDBNVkRviHZtvDvId8XSGPu3rmpmSe+wKRcEWNgsfWU=
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
Expand Down
2 changes: 2 additions & 0 deletions internal/binder/io/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/binder"
"github.com/lf-edge/ekuiper/v2/internal/io/dirwatch"
"github.com/lf-edge/ekuiper/v2/internal/io/file"
"github.com/lf-edge/ekuiper/v2/internal/io/http"
"github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver"
Expand All @@ -37,6 +38,7 @@ func init() {
modules.RegisterSource("httppull", func() api.Source { return &http.HttpPullSource{} })
modules.RegisterSource("httppush", func() api.Source { return &http.HttpPushSource{} })
modules.RegisterSource("file", file.GetSource)
modules.RegisterSource("dirwatch", dirwatch.GetSource)
modules.RegisterSource("memory", func() api.Source { return memory.GetSource() })
modules.RegisterSource("neuron", neuron.GetSource)
modules.RegisterSource("websocket", func() api.Source { return websocket.GetSource() })
Expand Down
274 changes: 274 additions & 0 deletions internal/io/dirwatch/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dirwatch

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/fsnotify/fsnotify"
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

type FileDirSource struct {
config *FileDirSourceConfig
taskCh chan *FileSourceTask
fileContentCh chan []byte

watcher *fsnotify.Watcher
rewindMeta *FileDirSourceRewindMeta
wg *sync.WaitGroup
}

func (f *FileDirSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error {
f.wg.Add(2)
go f.startHandleTask(ctx, ingest, ingestError)
go f.handleFileDirNotify(ctx)
return f.readDirFile()
}

type FileDirSourceConfig struct {
Path string `json:"path"`
AllowedExtension []string `json:"allowedExtension"`
}

func (f *FileDirSource) Provision(ctx api.StreamContext, configs map[string]any) error {
c := &FileDirSourceConfig{}
if err := cast.MapToStruct(configs, c); err != nil {
return err
}
f.config = c
f.taskCh = make(chan *FileSourceTask, 1024)
f.fileContentCh = make(chan []byte, 1024)
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
f.watcher = watcher
f.rewindMeta = &FileDirSourceRewindMeta{}
if err := f.watcher.Add(f.config.Path); err != nil {
return err
}
conf.Log.Infof("start to watch %v, rule:%v", f.config.Path, ctx.GetRuleId())
f.wg = &sync.WaitGroup{}
return nil
}

func (f *FileDirSource) Close(ctx api.StreamContext) error {
f.watcher.Close()
f.wg.Wait()
return nil
}

func (f *FileDirSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error {
return nil
}

func (f *FileDirSource) handleFileDirNotify(ctx api.StreamContext) {
defer f.wg.Done()
for {
select {
case <-ctx.Done():
return
case event, ok := <-f.watcher.Events:
if !ok {
return
}
switch {
case event.Has(fsnotify.Write):
f.taskCh <- &FileSourceTask{
name: event.Name,
taskType: WriteFile,
}
case event.Has(fsnotify.Create):
f.taskCh <- &FileSourceTask{
name: event.Name,
taskType: CreateFile,
}
}
case err, ok := <-f.watcher.Errors:
if !ok {
return
}
ctx.GetLogger().Errorf("dirwatch err:%v", err.Error())
}
}
}

func (f *FileDirSource) startHandleTask(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) {
defer f.wg.Done()
for {
select {
case task := <-f.taskCh:
switch task.taskType {
case WriteFile, CreateFile:
f.ingestFileContent(ctx, task.name, ingest, ingestError)
}
case <-ctx.Done():
return
}
}
}

func (f *FileDirSource) ingestFileContent(ctx api.StreamContext, fileName string, ingest api.TupleIngest, ingestError api.ErrorIngest) {
if !checkFileExtension(fileName, f.config.AllowedExtension) {
return
}
willRead, modifyTime, err := f.checkFileRead(fileName)
if err != nil {
ingestError(ctx, err)
return
}
if willRead {
c, err := os.ReadFile(fileName)
if err != nil {
ingestError(ctx, fmt.Errorf("read file %s err: %v", fileName, err))
return
}
message := make(map[string]interface{})
message["filename"] = filepath.Base(fileName)
message["modifyTime"] = modifyTime.Unix()
message["content"] = c
f.updateRewindMeta(fileName, modifyTime)
ingest(ctx, message, nil, time.Now())
}
}

func (f *FileDirSource) checkFileRead(fileName string) (bool, time.Time, error) {
fInfo, err := os.Stat(fileName)
if err != nil {
return false, time.Time{}, err
}
if fInfo.IsDir() {
return false, time.Time{}, fmt.Errorf("%s is a directory", fileName)
}
fTime := fInfo.ModTime()
if fTime.After(f.rewindMeta.LastModifyTime) {
return true, fTime, nil
}
return false, time.Time{}, nil
}

func (f *FileDirSource) updateRewindMeta(_ string, modifyTime time.Time) {
if modifyTime.After(f.rewindMeta.LastModifyTime) {
f.rewindMeta.LastModifyTime = modifyTime
}
}

func (f *FileDirSource) GetOffset() (any, error) {
c, err := json.Marshal(f.rewindMeta)
return string(c), err
}

func (f *FileDirSource) Rewind(offset any) error {
c, ok := offset.(string)
if !ok {
return fmt.Errorf("fileDirSource rewind failed")
}
f.rewindMeta = &FileDirSourceRewindMeta{}
if err := json.Unmarshal([]byte(c), f.rewindMeta); err != nil {
return err
}
return nil
}

func (f *FileDirSource) ResetOffset(input map[string]any) error {
return fmt.Errorf("FileDirSource ResetOffset not supported")
}

func (f *FileDirSource) readDirFile() error {
entries, err := os.ReadDir(f.config.Path)
if err != nil {
return err
}
files := make(FileWithTimeSlice, 0)
for _, entry := range entries {
if !entry.IsDir() {
fileName := entry.Name()
info, err := entry.Info()
if err != nil {
return err
}
files = append(files, FileWithTime{name: fileName, modifyTime: info.ModTime()})
}
}
sort.Sort(files)
for _, file := range files {
f.taskCh <- &FileSourceTask{name: filepath.Join(f.config.Path, file.name), taskType: CreateFile}
}
return nil
}

type FileWithTime struct {
name string
modifyTime time.Time
}

type FileWithTimeSlice []FileWithTime

func (f FileWithTimeSlice) Len() int {
return len(f)
}

func (f FileWithTimeSlice) Less(i, j int) bool {
return f[i].modifyTime.Before(f[j].modifyTime)
}

func (f FileWithTimeSlice) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}

type FileSourceTask struct {
name string
previousName string
taskType FileTaskType
}

type FileTaskType int

const (
CreateFile FileTaskType = iota
WriteFile
)

type FileDirSourceRewindMeta struct {
LastModifyTime time.Time `json:"lastModifyTime"`
}

func checkFileExtension(name string, allowedExtension []string) bool {
if len(allowedExtension) < 1 {
return true
}
fileExt := strings.TrimPrefix(filepath.Ext(name), ".")
for _, ext := range allowedExtension {
if fileExt == ext {
return true
}
}
return false
}

func GetSource() api.Source {
return &FileDirSource{}
}
77 changes: 77 additions & 0 deletions internal/io/dirwatch/source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dirwatch

import (
"encoding/json"
"os"
"testing"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/stretchr/testify/require"

mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func TestFileDirSource(t *testing.T) {
path, err := os.Getwd()
require.NoError(t, err)
fileDirSource := &FileDirSource{}
c := map[string]interface{}{
"path": path,
"allowedExtension": []string{"txt"},
}
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()
require.NoError(t, fileDirSource.Provision(ctx, c))
require.NoError(t, fileDirSource.Connect(ctx, nil))
output := make(chan any, 10)
require.NoError(t, fileDirSource.Subscribe(ctx, func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
output <- data
}, func(ctx api.StreamContext, err error) {}))
time.Sleep(10 * time.Millisecond)
f, err := os.Create("./test.txt")
require.NoError(t, err)
_, err = f.Write([]byte("123"))
require.NoError(t, err)
f.Close()
got := <-output
gotM, ok := got.(map[string]interface{})
require.True(t, ok)
require.Equal(t, []byte("123"), gotM["content"])
offset, err := fileDirSource.GetOffset()
require.NoError(t, err)
meta := &FileDirSourceRewindMeta{}
require.NoError(t, json.Unmarshal([]byte(offset.(string)), meta))
require.True(t, meta.LastModifyTime.After(time.Time{}))
require.Error(t, fileDirSource.ResetOffset(nil))
require.NoError(t, fileDirSource.Rewind(offset))
require.NoError(t, os.Remove("./test.txt"))
time.Sleep(10 * time.Millisecond)
cancel()
fileDirSource.Close(ctx)
}

func TestCheckFileExtension(t *testing.T) {
require.True(t, checkFileExtension("test.txt", []string{}))
require.True(t, checkFileExtension("test.txt", []string{"txt", "jpg"}))
require.False(t, checkFileExtension("test.md", []string{"txt", "jpg"}))
}

func TestRewind(t *testing.T) {
fileDirSource := &FileDirSource{}
require.Error(t, fileDirSource.Rewind(nil))
require.Error(t, fileDirSource.Rewind("123"))
}
Loading

0 comments on commit 1d93d7e

Please sign in to comment.