diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7a9746fa51a..51ad3eaea8b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `netflow` module to support 7 bytepad for IPFIX template. {issue}18098[18098] - Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149] - Fix date and timestamp formats for fortigate module {pull}19316[19316] +- Fix memory leak in tcp and unix input sources. {pull}19459[19459] *Heartbeat* diff --git a/filebeat/inputsource/common/closeref.go b/filebeat/inputsource/common/closeref.go deleted file mode 100644 index ec53d2639a4..00000000000 --- a/filebeat/inputsource/common/closeref.go +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package common - -import ( - "sync" - - "github.com/pkg/errors" -) - -// CloserFunc is the function called by the Closer on `Close()`. -type CloserFunc func() - -// ErrClosed is returned when the Closer is closed. -var ErrClosed = errors.New("closer is closed") - -// CloseRef implements a subset of the context.Context interface and it's use to synchronize -// the shutdown of multiple go-routines. -type CloseRef interface { - Done() <-chan struct{} - Err() error -} - -// Closer implements a shutdown strategy when dealing with multiples go-routines, it creates a tree -// of Closer, when you call `Close()` on a parent the `Close()` method will be called on the current -// closer and any of the childs it may have and will remove the current node from the parent. -// -// NOTE: The `Close()` is reentrant but will propage the close only once. -type Closer struct { - mu sync.Mutex - done chan struct{} - err error - parent *Closer - children map[*Closer]struct{} - callback CloserFunc -} - -// Close closes the closes and propagates the close to any child, on close the close callback will -// be called, this can be used for custom cleanup like closing a socket. -func (c *Closer) Close() { - c.mu.Lock() - if c.err != nil { - c.mu.Unlock() - return - } - - // Close the channel first so that all processing in Handle() ends first - close(c.done) - - if c.callback != nil { - c.callback() - } - - // propagate close to children. - if c.children != nil { - for child := range c.children { - child.Close() - } - c.children = nil - } - - c.err = ErrClosed - c.mu.Unlock() - - if c.parent != nil { - c.removeChild(c) - } -} - -// Done returns the synchronization channel, the channel will be closed if `Close()` was called on -// the current node or any parent it may have. -func (c *Closer) Done() <-chan struct{} { - return c.done -} - -// SetCallback sets the underlying callback function invoked -// when the Closer is Closed. -func (c *Closer) SetCallback(callback CloserFunc) { - c.callback = callback -} - -// Err returns an error if the Closer was already closed. -func (c *Closer) Err() error { - c.mu.Lock() - err := c.err - c.mu.Unlock() - return err -} - -func (c *Closer) removeChild(child *Closer) { - c.mu.Lock() - delete(c.children, child) - c.mu.Unlock() -} - -func (c *Closer) addChild(child *Closer) { - c.mu.Lock() - if c.children == nil { - c.children = make(map[*Closer]struct{}) - } - c.children[child] = struct{}{} - c.mu.Unlock() -} - -// WithCloser wraps a new closer into a child of an existing closer. -func WithCloser(parent *Closer, fn CloserFunc) *Closer { - child := &Closer{ - done: make(chan struct{}), - parent: parent, - callback: fn, - } - parent.addChild(child) - return child -} - -// NewCloser creates a new Closer. -func NewCloser(fn CloserFunc) *Closer { - return &Closer{ - done: make(chan struct{}), - callback: fn, - } -} diff --git a/filebeat/inputsource/common/handler.go b/filebeat/inputsource/common/handler.go index 84786086f4e..a55ee1755d5 100644 --- a/filebeat/inputsource/common/handler.go +++ b/filebeat/inputsource/common/handler.go @@ -19,6 +19,7 @@ package common import ( "bufio" + "context" "net" "github.com/pkg/errors" @@ -31,7 +32,7 @@ import ( type HandlerFactory func(config ListenerConfig) ConnectionHandler // ConnectionHandler interface provides mechanisms for handling of incoming connections -type ConnectionHandler func(CloseRef, net.Conn) error +type ConnectionHandler func(context.Context, net.Conn) error // MetadataFunc defines callback executed when a line is read from the split handler. type MetadataFunc func(net.Conn) inputsource.NetworkMetadata @@ -39,7 +40,7 @@ type MetadataFunc func(net.Conn) inputsource.NetworkMetadata // SplitHandlerFactory allows creation of a handler that has splitting capabilities. func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { return func(config ListenerConfig) ConnectionHandler { - return ConnectionHandler(func(closer CloseRef, conn net.Conn) error { + return ConnectionHandler(func(ctx context.Context, conn net.Conn) error { metadata := metadataCallback(conn) maxMessageSize := uint64(config.MaxMessageSize) @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me scanner.Buffer(buffer, int(maxMessageSize)) for { select { - case <-closer.Done(): + case <-ctx.Done(): break default: } - // Ensure that if the Conn is already closed then dont attempt to scan again - if closer.Err() == ErrClosed { - break - } - if !scanner.Scan() { break } diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go index 9d686f922a6..c3ed0362887 100644 --- a/filebeat/inputsource/common/listener.go +++ b/filebeat/inputsource/common/listener.go @@ -20,12 +20,14 @@ package common import ( "bufio" "bytes" + "context" "net" "strings" "sync" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/ctxtool" ) // Family represents the type of connection we're handling @@ -51,9 +53,9 @@ type Listener struct { config *ListenerConfig family Family wg sync.WaitGroup - done chan struct{} log *logp.Logger - closer *Closer + ctx context.Context + cancel context.CancelFunc clientsCount atomic.Int handlerFactory HandlerFactory listenerFactory ListenerFactory @@ -63,10 +65,8 @@ type Listener struct { func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener { return &Listener{ config: config, - done: make(chan struct{}), family: family, log: logp.NewLogger(string(family)).With("address", location), - closer: NewCloser(nil), handlerFactory: handlerFactory, listenerFactory: listenerFactory, } @@ -80,7 +80,12 @@ func (l *Listener) Start() error { return err } - l.closer.SetCallback(func() { l.Listener.Close() }) + l.ctx, l.cancel = context.WithCancel(context.Background()) + go func() { + <-l.ctx.Done() + l.Listener.Close() + }() + l.log.Info("Started listening for " + l.family.String() + " connection") l.wg.Add(1) @@ -101,7 +106,7 @@ func (l *Listener) run() { conn, err := l.Listener.Accept() if err != nil { select { - case <-l.closer.Done(): + case <-l.ctx.Done(): return default: l.log.Debugw("Can not accept the connection", "error", err) @@ -109,14 +114,13 @@ func (l *Listener) run() { } } - handler := l.handlerFactory(*l.config) - closer := WithCloser(l.closer, func() { conn.Close() }) - l.wg.Add(1) go func() { defer logp.Recover("recovering from a " + l.family.String() + " client crash") defer l.wg.Done() - defer closer.Close() + + ctx, cancel := ctxtool.WithFunc(l.ctx, func() { conn.Close() }) + defer cancel() l.registerHandler() defer l.unregisterHandler() @@ -128,7 +132,8 @@ func (l *Listener) run() { l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load()) } - err := handler(closer, conn) + handler := l.handlerFactory(*l.config) + err := handler(ctx, conn) if err != nil { l.log.Debugw("client error", "error", err) } @@ -148,7 +153,7 @@ func (l *Listener) run() { // Stop stops accepting new incoming connections and Close any active clients func (l *Listener) Stop() { l.log.Info("Stopping" + l.family.String() + "server") - l.closer.Close() + l.cancel() l.wg.Wait() l.log.Info(l.family.String() + " server stopped") } diff --git a/go.sum b/go.sum index 86a09edca2e..e1ff648fc32 100644 --- a/go.sum +++ b/go.sum @@ -727,6 +727,7 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=