Skip to content

Commit

Permalink
Make implementing Close required by reader.Readers
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Aug 11, 2020
1 parent 8fccc74 commit c0c9844
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 8 deletions.
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (h *Harvester) Run() error {
}

h.stop()
h.log.Close()
h.reader.Close()
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (f *Log) wait() {
}

// Close closes the done channel but no th the file handler
func (f *Log) Close() {
func (f *Log) Close() error {
close(f.done)
// Note: File reader is not closed here because that leads to race conditions
return nil
}
10 changes: 7 additions & 3 deletions libbeat/reader/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type CheckFunc func(offset int64, buf []byte) bool
// Is is useful is you want to detect if you have received garbage from a network volume.
type Reader struct {
log *logp.Logger
reader io.Reader
reader io.ReadCloser
buffer bytes.Buffer
minBufferSize int
maxFailures int
Expand All @@ -59,7 +59,7 @@ type Reader struct {
// NewReader returns a debug reader.
func NewReader(
log *logp.Logger,
reader io.Reader,
reader io.ReadCloser,
minBufferSize int,
maxFailures int,
predicate CheckFunc,
Expand Down Expand Up @@ -115,6 +115,10 @@ func (r *Reader) Read(p []byte) (int, error) {
return n, err
}

func (r *Reader) Close() error {
return r.reader.Close()
}

func makeNullCheck(log *logp.Logger, minSize int) CheckFunc {
// create a slice with null bytes to match on the buffer.
pattern := make([]byte, minSize, minSize)
Expand Down Expand Up @@ -159,7 +163,7 @@ func summarizeBufferInfo(idx int, buf []byte) (int, []byte) {

// AppendReaders look into the current enabled log selector and will add any debug reader that match
// the selectors.
func AppendReaders(reader io.Reader) (io.Reader, error) {
func AppendReaders(reader io.ReadCloser) (io.ReadCloser, error) {
var err error

if logp.HasSelector("detect_null_bytes") || logp.HasSelector("*") {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/multiline/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,7 @@ func (cr *counterReader) resetState() {
func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) {
cr.state = next
}

func (cr *counterReader) Close() error {
return cr.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/multiline/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ func (pr *patternReader) setState(next func(pr *patternReader) (reader.Message,
pr.state = next
}

func (pr *patternReader) Close() error {
return pr.reader.Close()
}

// matchers
func afterMatcher(pat match.Matcher) (matcher, error) {
return genPatternMatcher(pat, func(last, current []byte) []byte {
Expand Down
2 changes: 2 additions & 0 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package reader

import (
"errors"
"io"
)

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
io.Closer
Next() (Message, error)
}

Expand Down
6 changes: 5 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}
Expand All @@ -59,3 +59,7 @@ func (r EncoderReader) Next() (reader.Message, error) {
Bytes: sz,
}, err
}

func (r EncoderReader) Close() error {
return r.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
}
return message, err
}

func (r *LimitReader) Close() error {
return r.reader.Close()
}
8 changes: 6 additions & 2 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const unlimited = 0
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
Expand All @@ -48,7 +48,7 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
Expand Down Expand Up @@ -271,3 +271,7 @@ func (r *LineReader) decode(end int) (int, error) {
r.byteCount += start
return start, err
}

func (r *LineReader) Close() error {
return r.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/strip_newline.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (p *StripNewline) autoLineEndingChars(l []byte) int {
}
return 1
}

func (p *StripNewline) Close() error {
return p.reader.Close()
}
15 changes: 15 additions & 0 deletions libbeat/reader/readfile/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TimeoutReader struct {
signal error
running bool
ch chan lineMessage
done chan struct{}
}

type lineMessage struct {
Expand All @@ -54,6 +55,7 @@ func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *Time
signal: signal,
timeout: t,
ch: make(chan lineMessage, 1),
done: make(chan struct{}),
}
}

Expand All @@ -67,6 +69,11 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
r.running = true
go func() {
for {
select {
case <-r.done:
return
default:
}
message, err := r.reader.Next()
r.ch <- lineMessage{message, err}
if err != nil {
Expand All @@ -85,5 +92,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
return msg.line, msg.err
case <-timer.C:
return reader.Message{}, r.signal
case <-r.done:
return reader.Message{}, nil
}
}

func (r *TimeoutReader) Close() error {
close(r.done)

return r.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,7 @@ func stripNewLineWin(msg *reader.Message) {
return r == '\n' || r == '\r'
})
}

func (p *DockerJSONReader) Close() error {
return p.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/readjson/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func (r *JSONReader) Next() (reader.Message, error) {
return message, nil
}

func (r *JSONReader) Close() error {
return r.reader.Close()
}

func createJSONError(message string) common.MapStr {
return common.MapStr{"message": message, "type": "json"}
}
Expand Down

0 comments on commit c0c9844

Please sign in to comment.