Skip to content

Commit

Permalink
exit or continue when receive nil event
Browse files Browse the repository at this point in the history
  • Loading branch information
childe committed Apr 8, 2021
1 parent 42b36f0 commit 5fd98ff
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
11 changes: 8 additions & 3 deletions input/input_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ func (box *InputBox) beat(workerIdx int) {
for !box.stop {
event = box.input.ReadOneEvent()
if event == nil {
if !box.stop && box.shutdownWhenNil {
glog.Info("receive nil message. shutdown...")
glog.V(5).Info("received nil message.")
if box.stop {
break
}
if box.shutdownWhenNil {
glog.Info("received nil message. shutdown...")
box.mainThreadExitChan <- struct{}{}
} else {
continue
}
return
}
for fs, v := range box.addFields {
event = fs.SetField(event, v.Render(event), "", false)
Expand Down
29 changes: 11 additions & 18 deletions input/stdin_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package input
import (
"bufio"
"os"
"time"

"github.com/childe/gohangout/codec"
"github.com/childe/gohangout/topology"
Expand Down Expand Up @@ -36,29 +37,21 @@ func newStdinInput(config map[interface{}]interface{}) topology.Input {
messages: make(chan []byte, 10),
}

go func() {
for p.scanner.Scan() && !p.stop {
t := p.scanner.Bytes()
msg := make([]byte, len(t))
copy(msg, t)
p.messages <- msg
}
if err := p.scanner.Err(); err != nil {
glog.Errorf("%s", err)
}

// trigger shutdown
close(p.messages)
}()
return p
}

func (p *StdinInput) ReadOneEvent() map[string]interface{} {
text, more := <-p.messages
if !more || text == nil {
return nil
if p.scanner.Scan() {
t := p.scanner.Bytes()
msg := make([]byte, len(t))
copy(msg, t)
return p.decoder.Decode(msg)
}
if err := p.scanner.Err(); err != nil {
glog.Errorf("stdin scan error: %v", err)
}
return p.decoder.Decode(text)
time.Sleep(time.Millisecond * 100)
return nil
}

func (p *StdinInput) Shutdown() {
Expand Down

0 comments on commit 5fd98ff

Please sign in to comment.