Skip to content

Commit

Permalink
Merge pull request elastic#235 from urso/bug/192-check-bulk-status-codes
Browse files Browse the repository at this point in the history
fail bulk request if an item index failed
  • Loading branch information
andrewkroh committed Nov 3, 2015
2 parents 2ac5744 + b461724 commit 64784fe
Show file tree
Hide file tree
Showing 37 changed files with 576 additions and 267 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ All notable changes to this project will be documented in this file based on the
### Bugfixes
- Use stderr for console log output. #219
- Handle empty event array in publisher. #207
- Limit number of workers for Elasticsearch output to 1 per configured host. packetbeat#226
- Respect '*' debug selector in IsDebug. #226 (elastic/packetbeat#339)
- Limit number of workers for Elasticsearch output. elastic/packetbeat#226
- On Windows, remove service related error message when running in the console. #242
- Fix waitRetry no configured in single output mode configuration. elastic/filebeat#144
- Use http as the default scheme in the elasticsearch hosts #253
- Respect max bulk size if bulk publisher (collector) is disabled or sync flag is set.
- Always evaluate status code from Elasticsearch responses when indexing events. #192

### Added
- Add Console output plugin. #218
Expand All @@ -27,6 +28,10 @@ All notable changes to this project will be documented in this file based on the
- Add logging messages for bulk publishing in case of error #229
- Add option to configure number of parallel workers publishing to Elasticsearch
or Logstash.
- Set default bulk size for Elasticsearch output to 50.
- Set default http timeout for Elasticsearch to 90s.
- Improve publish retry if sync flag is set by retrying only up to max bulk size
events instead of all events to be published.

### Deprecated

Expand All @@ -40,8 +45,9 @@ All notable changes to this project will be documented in this file based on the
### Bugfixes
- Determine Elasticsearch index for an event based on UTC time #81
- Fixing ES output's defaultDeadTimeout so that it is 60 seconds #103
- Es outputer: fix timestamp conversion #91
- ES outputer: fix timestamp conversion #91
- Fix TLS insecure config option #239
- ES outputer: check bulk API per item status code for retransmit on failure.

### Added
- Add logstash output plugin #151
Expand Down
9 changes: 5 additions & 4 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ The host of the Elasticsearch server. This option is deprecated as it is replace

The port of the Elasticsearch server. This option is deprecated as it is replaced by <<hosts-option>>.


===== username

Basic authentication username for connecting to Elasticsearch.
Expand All @@ -327,7 +326,6 @@ An HTTP path prefix that is prepended to the HTTP API calls. This is useful for
the cases where Elasticsearch listens behind an HTTP reverse proxy that exports
the API under a custom prefix.


===== index

The index root name where to write events to. The default is `packetbeat` and
Expand All @@ -336,7 +334,6 @@ generates `[packetbeat-]YYYY.MM.DD` indexes (e.g. `packetbeat-2015.04.26`).
The index root name where to write events to. The default is the beat its name.
For example `packetbeat` generates `[packetbeat-]YYYY.MM.DD` indexes (e.g. `packetbeat-2015.04.26`).


===== max_retries

The number of times a particular Elasticsearch index operation is attempted. If
Expand All @@ -346,7 +343,11 @@ dropped. The default is 3.
===== bulk_max_size

The maximum number of events to bulk in a single Elasticsearch bulk API index request.
The default is 10000.
The default is 50.

===== timeout

The http request timeout in seconds for Elasticsearch request. The default is 90.

===== flush_interval

Expand Down
7 changes: 5 additions & 2 deletions etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ output:
#max_retries: 3

# The maximum number of events to bulk in a single Elasticsearch bulk API index request.
# The default is 10000.
#bulk_max_size: 10000
# The default is 50.
#bulk_max_size: 50

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

# The number of seconds to wait for new events between two bulk API index requests.
# If `bulk_max_size` is reached before this interval expires, addition bulk index
Expand Down
64 changes: 35 additions & 29 deletions outputs/elasticsearch/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,84 +83,90 @@ func readCountResult(obj []byte) (*CountResults, error) {
// searchable. In case id is empty, a new id is created over a HTTP POST request.
// Otherwise, a HTTP PUT request is issued.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
func (es *Client) Index(
func (es *Connection) Index(
index, docType, id string,
params map[string]string,
body interface{},
) (*QueryResult, error) {
) (int, *QueryResult, error) {
method := "PUT"
if id == "" {
method = "POST"
}

resp, err := es.apiCall(method, index, docType, id, params, body)
status, resp, err := es.apiCall(method, index, docType, id, params, body)
if err != nil {
return nil, err
return status, nil, err
}
return readQueryResult(resp)
result, err := readQueryResult(resp)
return status, result, err
}

// Refresh an index. Call this after doing inserts or creating/deleting
// indexes in unit tests.
func (es *Client) Refresh(index string) (*QueryResult, error) {
resp, err := es.apiCall("POST", index, "", "_refresh", nil, nil)
func (es *Connection) Refresh(index string) (int, *QueryResult, error) {
status, resp, err := es.apiCall("POST", index, "", "_refresh", nil, nil)
if err != nil {
return nil, err
return status, nil, err
}
return readQueryResult(resp)
result, err := readQueryResult(resp)
return status, result, err
}

// CreateIndex creates a new index, optionally with settings and mappings passed in
// the body.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
//
func (es *Client) CreateIndex(index string, body interface{}) (*QueryResult, error) {
resp, err := es.apiCall("PUT", index, "", "", nil, body)
func (es *Connection) CreateIndex(index string, body interface{}) (int, *QueryResult, error) {
status, resp, err := es.apiCall("PUT", index, "", "", nil, body)
if err != nil {
return nil, err
return status, nil, err
}
return readQueryResult(resp)
result, err := readQueryResult(resp)
return status, result, err
}

// Delete deletes a typed JSON document from a specific index based on its id.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
func (es *Client) Delete(index string, docType string, id string, params map[string]string) (*QueryResult, error) {
resp, err := es.apiCall("DELETE", index, docType, id, params, nil)
func (es *Connection) Delete(index string, docType string, id string, params map[string]string) (int, *QueryResult, error) {
status, resp, err := es.apiCall("DELETE", index, docType, id, params, nil)
if err != nil {
return nil, err
return status, nil, err
}
return readQueryResult(resp)
result, err := readQueryResult(resp)
return status, result, err
}

// A search request can be executed purely using a URI by providing request parameters.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
func (es *Client) SearchURI(index string, docType string, params map[string]string) (*SearchResults, error) {
resp, err := es.apiCall("GET", index, docType, "_search", params, nil)
func (es *Connection) SearchURI(index string, docType string, params map[string]string) (int, *SearchResults, error) {
status, resp, err := es.apiCall("GET", index, docType, "_search", params, nil)
if err != nil {
return nil, err
return status, nil, err
}
return readSearchResult(resp)
result, err := readSearchResult(resp)
return status, result, err
}

func (es *Client) CountSearchURI(
func (es *Connection) CountSearchURI(
index string, docType string,
params map[string]string,
) (*CountResults, error) {
resp, err := es.apiCall("GET", index, docType, "_count", params, nil)
) (int, *CountResults, error) {
status, resp, err := es.apiCall("GET", index, docType, "_count", params, nil)
if err != nil {
return nil, err
return status, nil, err
}
return readCountResult(resp)
result, err := readCountResult(resp)
return status, result, err
}

func (es *Client) apiCall(
func (es *Connection) apiCall(
method, index, docType, id string,
params map[string]string,
body interface{},
) ([]byte, error) {
) (int, []byte, error) {
path, err := makePath(index, docType, id)
if err != nil {
return nil, err
return 0, nil, err
}
return es.request(method, path, params, body)
}
6 changes: 3 additions & 3 deletions outputs/elasticsearch/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestIndex(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Index(index, "test", "1", params, body)
_, resp, err := client.Index(index, "test", "1", params, body)
if err != nil {
t.Errorf("Index() returns error: %s", err)
}
Expand All @@ -41,15 +41,15 @@ func TestIndex(t *testing.T) {
params = map[string]string{
"q": "user:test",
}
result, err := client.SearchURI(index, "test", params)
_, result, err := client.SearchURI(index, "test", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
if result.Hits.Total != 1 {
t.Errorf("Wrong number of search results: %d", result.Hits.Total)
}

resp, err = client.Delete(index, "test", "1", nil)
_, resp, err = client.Delete(index, "test", "1", nil)
if err != nil {
t.Errorf("Delete() returns error: %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions outputs/elasticsearch/api_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestOneHostSuccessResp(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Index(index, "test", "1", params, body)
_, resp, err := client.Index(index, "test", "1", params, body)
if err != nil {
t.Errorf("Index() returns error: %s", err)
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestOneHost500Resp(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
_, err = client.Index(index, "test", "1", params, body)
_, _, err = client.Index(index, "test", "1", params, body)

if err == nil {
t.Errorf("Index() should return error.")
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestOneHost503Resp(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
_, err := client.Index(index, "test", "1", params, body)
_, _, err := client.Index(index, "test", "1", params, body)
if err == nil {
t.Errorf("Index() should return error.")
}
Expand Down
32 changes: 25 additions & 7 deletions outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type bulkMetaIndex struct {
DocType string `json:"_type"`
}

type BulkResult struct {
Items []json.RawMessage `json:"items"`
}

func (r *bulkRequest) Send(meta, obj interface{}) error {
var err error

Expand All @@ -40,19 +44,20 @@ func (r *bulkRequest) Send(meta, obj interface{}) error {
return err
}

func (r *bulkRequest) Flush() (*QueryResult, error) {
func (r *bulkRequest) Flush() (int, *BulkResult, error) {
if r.buf.Len() == 0 {
logp.Debug("elasticsearch", "Empty channel. Wait for more data.")
return nil, nil
return 0, nil, nil
}

resp, err := r.es.sendBulkRequest("POST", r.path, r.params, &r.buf)
status, resp, err := r.es.sendBulkRequest("POST", r.path, r.params, &r.buf)
if err != nil {
return nil, err
return status, nil, err
}
r.buf.Truncate(0)

return readQueryResult(resp)
result, err := readBulkResult(resp)
return status, result, err
}

// Bulk performs many index/delete operations in a single API call.
Expand Down Expand Up @@ -90,7 +95,7 @@ func (conn *Connection) BulkWith(
return nil, nil
}

resp, err := conn.sendBulkRequest("POST", path, params, &buf)
_, resp, err := conn.sendBulkRequest("POST", path, params, &buf)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,7 +125,7 @@ func (conn *Connection) sendBulkRequest(
method, path string,
params map[string]string,
buf *bytes.Buffer,
) ([]byte, error) {
) (int, []byte, error) {
url := makeURL(conn.URL, path, params)
logp.Debug("elasticsearch", "Sending bulk request to %s", url)

Expand Down Expand Up @@ -154,3 +159,16 @@ func bulkEncode(metaBuilder MetaBuilder, body []interface{}) bytes.Buffer {
}
return buf
}

func readBulkResult(obj []byte) (*BulkResult, error) {
if obj == nil {
return nil, nil
}

var result BulkResult
err := json.Unmarshal(obj, &result)
if err != nil {
return nil, err
}
return &result, nil
}
10 changes: 5 additions & 5 deletions outputs/elasticsearch/bulkapi_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func TestBulk(t *testing.T) {
params = map[string]string{
"q": "field1:value1",
}
result, err := client.SearchURI(index, "type1", params)
_, result, err := client.SearchURI(index, "type1", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
if result.Hits.Total != 1 {
t.Errorf("Wrong number of search results: %d", result.Hits.Total)
}

_, err = client.Delete(index, "", "", nil)
_, _, err = client.Delete(index, "", "", nil)
if err != nil {
t.Errorf("Delete() returns error: %s", err)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestBulkMoreOperations(t *testing.T) {
params = map[string]string{
"q": "field1:value3",
}
result, err := client.SearchURI(index, "type1", params)
_, result, err := client.SearchURI(index, "type1", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
Expand All @@ -168,15 +168,15 @@ func TestBulkMoreOperations(t *testing.T) {
params = map[string]string{
"q": "field2:value2",
}
result, err = client.SearchURI(index, "type1", params)
_, result, err = client.SearchURI(index, "type1", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
if result.Hits.Total != 1 {
t.Errorf("Wrong number of search results: %d", result.Hits.Total)
}

_, err = client.Delete(index, "", "", nil)
_, _, err = client.Delete(index, "", "", nil)
if err != nil {
t.Errorf("Delete() returns error: %s", err)
}
Expand Down
Loading

0 comments on commit 64784fe

Please sign in to comment.