Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to close HTTP connections and wait for completion on output reload/close #10599

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticsearch

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -49,6 +50,7 @@ type Client struct {
timeout time.Duration

// buffered bulk requests
ctx clientContext
bulkRequ *bulkRequest

// buffered json response reader
Expand All @@ -61,6 +63,11 @@ type Client struct {
observer outputs.Observer
}

type clientContext struct {
ctx context.Context
cancel context.CancelFunc
}

// ClientSettings contains the settings for a client.
type ClientSettings struct {
URL string
Expand Down Expand Up @@ -669,7 +676,14 @@ func (client *Client) String() string {
// the configured host, updates the known Elasticsearch version and calls
// globally configured handlers.
func (client *Client) Connect() error {
return client.Connection.Connect()
err := client.Connection.Connect()
if err != nil {
return err
}

client.ctx.init()
client.bulkRequ.requ = client.bulkRequ.requ.WithContext(client.ctx.Context())
return nil
}

// Connect connects the client. It runs a GET request against the root URL of
Expand Down Expand Up @@ -725,8 +739,19 @@ func (conn *Connection) Ping() (string, error) {
return response.Version.Number, nil
}

// Close closes idle connections and can cancel active requests.
func (client *Client) Close() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Client.Close should have comment or be unexported

client.ctx.close() // cancel active requests using the client context
client.Connection.Close()
return nil
}

// Close closes a connection.
func (conn *Connection) Close() error {
t := conn.http.Transport
if ci, ok := t.(interface{ CloseIdleConnections() }); ok {
ci.CloseIdleConnections()
}
return nil
}

Expand Down Expand Up @@ -826,3 +851,16 @@ func closing(c io.Closer) {
logp.Warn("Close failed with: %v", err)
}
}

func (c *clientContext) init() {
c.ctx, c.cancel = context.WithCancel(context.Background())
}

func (c *clientContext) close() {
c.cancel()
c.cancel = func() {}
}

func (c *clientContext) Context() context.Context {
return c.ctx
}
Loading