Skip to content
This repository has been archived by the owner on Jun 28, 2023. It is now read-only.

Commit

Permalink
add multiple watch dir
Browse files Browse the repository at this point in the history
  • Loading branch information
ezotrank committed Oct 18, 2014
1 parent f0bd02c commit 293d39b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 45 deletions.
111 changes: 72 additions & 39 deletions logsend/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,46 @@ package logsend
import (
"github.com/ActiveState/tail"
"github.com/howeyc/fsnotify"
"io/ioutil"
"os"
"path/filepath"
)

func WatchFiles(dir, configFile string) {
func walkLogDir(dir string) (files []string, err error) {
if string(dir[len(dir)-1]) != "/" {
dir = dir + "/"
}
visit := func(path string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
}
abs, err := filepath.Abs(path)
if err != nil {
Conf.Logger.Fatalln(err)
}
files = append(files, abs)
return nil
}
err = filepath.Walk(dir, visit)
return
}

func WatchFiles(dirs []string, configFile string) {
// load config
groups, err := LoadConfigFromFile(configFile)
if err != nil {
Conf.Logger.Fatalln("can't load config", err)
}

// get list of all files in watch dir
files, err := ioutil.ReadDir(dir)
if err != nil {
Conf.Logger.Fatalln("can't read logs dir", err)
files := make([]string, 0)
for _, dir := range dirs {
fs, err := walkLogDir(dir)
if err != nil {
panic(err)
}
for _, f := range fs {
files = append(files, f)
}
}

// assign file per group
Expand All @@ -27,7 +51,7 @@ func WatchFiles(dir, configFile string) {
Conf.Logger.Fatalln("can't assign file per group", err)
}

doneCh := make(chan bool)
doneCh := make(chan string)
assignedFilesCount := len(assignedFiles)

for _, file := range assignedFiles {
Expand All @@ -36,21 +60,28 @@ func WatchFiles(dir, configFile string) {
}

if Conf.ContinueWatch {
go continueWatch(&dir, groups)
for _, dir := range dirs {
go continueWatch(&dir, groups)
}
}

select {
case done := <-doneCh:
assignedFilesCount = -1
if assignedFilesCount == 0 {
debug(done)
Conf.Logger.Println("done")
}
for {
select {
case fpath := <-doneCh:
assignedFilesCount = assignedFilesCount - 1
if assignedFilesCount == 0 {
Conf.Logger.Printf("finished reading file %+v", fpath)
if Conf.ReadOnce {
return
}
}

}
}

}

func assignFiles(files []os.FileInfo, groups []*Group) (outFiles []*File, err error) {
func assignFiles(files []string, groups []*Group) (outFiles []*File, err error) {
for _, group := range groups {
var assignedFiles []*File
if assignedFiles, err = getFilesByGroup(files, group); err == nil {
Expand All @@ -64,6 +95,23 @@ func assignFiles(files []os.FileInfo, groups []*Group) (outFiles []*File, err er
return
}

func getFilesByGroup(allFiles []string, group *Group) ([]*File, error) {
files := make([]*File, 0)
regex := *group.Mask
for _, f := range allFiles {
if !regex.MatchString(filepath.Base(f)) {
continue
}
file, err := NewFile(f)
if err != nil {
return files, err
}
file.group = group
files = append(files, file)
}
return files, nil
}

func continueWatch(dir *string, groups []*Group) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -78,8 +126,8 @@ func continueWatch(dir *string, groups []*Group) {
select {
case ev := <-watcher.Event:
if ev.IsCreate() {
files := make([]os.FileInfo, 0)
file, err := os.Stat(ev.Name)
files := make([]string, 0)
file, err := filepath.Abs(ev.Name)
if err != nil {
Conf.Logger.Printf("can't get file %+v", err)
continue
Expand All @@ -104,30 +152,15 @@ func continueWatch(dir *string, groups []*Group) {
watcher.Close()
}

func getFilesByGroup(allFiles []os.FileInfo, group *Group) ([]*File, error) {
files := make([]*File, 0)
regex := *group.Mask
for _, f := range allFiles {
if !regex.MatchString(f.Name()) {
continue
}
filepath := filepath.Join(Conf.WatchDir, f.Name())
file, err := NewFile(filepath)
if err != nil {
return files, err
}
file.group = group
files = append(files, file)
}
return files, nil
}

func NewFile(fpath string) (*File, error) {
file := &File{}
var err error
if Conf.ReadWholeLog {
Conf.Logger.Println("read whole logs")
if Conf.ReadWholeLog && Conf.ReadOnce {
Conf.Logger.Printf("read whole file once %+v", fpath)
file.Tail, err = tail.TailFile(fpath, tail.Config{})
} else if Conf.ReadWholeLog {
Conf.Logger.Printf("read whole file and continue %+v", fpath)
file.Tail, err = tail.TailFile(fpath, tail.Config{Follow: true, ReOpen: true})
} else {
seekInfo := &tail.SeekInfo{Offset: 0, Whence: 2}
file.Tail, err = tail.TailFile(fpath, tail.Config{Follow: true, ReOpen: true, Location: seekInfo})
Expand All @@ -138,12 +171,12 @@ func NewFile(fpath string) (*File, error) {
type File struct {
Tail *tail.Tail
group *Group
doneCh chan bool
doneCh chan string
}

func (self *File) tail() {
Conf.Logger.Printf("start tailing %+v", self.Tail.Filename)
defer func() { self.doneCh <- true }()
defer func() { self.doneCh <- self.Tail.Filename }()
for line := range self.Tail.Lines {
checkLineRules(&line.Text, self.group.Rules)
}
Expand Down
11 changes: 9 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
watchDir = flag.String("watch-dir", "./tmp", "log directories")
watchDir = flag.String("watch-dir", "", "deprecated, simply add the directory as an argument, in the end")
config = flag.String("config", "", "path to config.json file")
check = flag.Bool("check", false, "check config.json")
debug = flag.Bool("debug", false, "turn on debug messages")
Expand Down Expand Up @@ -58,8 +58,15 @@ func main() {
panic(err)
}

logDirs := make([]string, 0)
if len(flag.Args()) > 0 {
logDirs = flag.Args()
} else {
logDirs = append(logDirs, *watchDir)
}

if fi.Mode()&os.ModeNamedPipe == 0 {
logsend.WatchFiles(*watchDir, *config)
logsend.WatchFiles(logDirs, *config)
} else {
flag.VisitAll(logsend.LoadRawConfig)
logsend.ProcessStdin()
Expand Down
10 changes: 6 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ curl -L http://logsend.io/get|bash -s /tmp
As daemon, watching file in directory:

```
logsend -watch-dir=/logs -config=config.json
logsend -config=config.json /logs
```

Using PIPE:
Expand All @@ -62,7 +62,7 @@ tail -F /logs/*.log |logsend -influx-dbname test -influx-host 'hosta:4444' -rege
Daemonize:

```
logsend -watch-dir=/logs -config=config.json 2> error.log &
logsend -config=config.json /logs 2> error.log &
```

## Benchmarks
Expand Down Expand Up @@ -157,7 +157,7 @@ logsend -watch-dir=/logs -config=config.json 2> error.log &
## Starting

```
logsend -watch-dir=~/some_logs_folder
logsend -config config.json /logs
```

## Configuration
Expand Down Expand Up @@ -563,4 +563,6 @@ tail -F some.log| logsend -config config.json
## Tips

* use flag `-debug` for more info
* use flag `-dry-run` for processing logs but not send to destination
* use flag `-dry-run` for processing logs but not send to destination
* use flag `-read-whole-log` for reading whole log file and continue reading
* use flag `-read-once` better for use with -read-whole-log, just read whole log and exit

0 comments on commit 293d39b

Please sign in to comment.