Skip to content

Commit

Permalink
[7.1] Add sleep to allow ES sufficient time for CCR (#11172) (#12437)
Browse files Browse the repository at this point in the history
* Add sleep to allow ES sufficient time for CCR (#11172)

After repeatedly running the Elasticsearch module integration test in Metricbeat, I found that sometimes Elasticsearch doesn't get enough time to perform CCR and generate CCR stats. This causes the following error, but only some times:

```
--- FAIL: TestFetch (2.44s)
    --- FAIL: TestFetch/ccr (0.08s)
        elasticsearch_integration_test.go:92:
                Error Trace:    elasticsearch_integration_test.go:92
                Error:          Should NOT be empty, but was []
                Test:           TestFetch/ccr
```

So this PR adds a 300ms sleep to give Elasticsearch enough time to perform CCR and generate CCR stats. After testing various sleep durations, I found that 300ms seemed to be the lowest (round) value I could use that consistently passed this test.

Possibly related: #10866

* Fixing formatting
  • Loading branch information
ycombinator authored Jun 6, 2019
1 parent 111bebe commit 31ddac1
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"net/http"
"os"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -252,9 +254,48 @@ func createCCRStats(host string) error {
return err
}

// Give ES sufficient time to do the replication and produce stats
checkCCRStats := func() (bool, error) {
return checkCCRStatsExists(host)
}

exists, err := waitForSuccess(checkCCRStats, 200, 5)
if err != nil {
return errors.Wrap(err, "error checking if CCR stats exist")
}

if !exists {
return fmt.Errorf("expected to find CCR stats but not found")
}

return nil
}

func checkCCRStatsExists(host string) (bool, error) {
resp, err := http.Get("http://" + host + "/_ccr/stats")
if err != nil {
return false, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, err
}

var data struct {
FollowStats struct {
Indices []map[string]interface{} `json:"indices"`
} `json:"follow_stats"`
}
err = json.Unmarshal(body, &data)
if err != nil {
return false, err
}

return len(data.FollowStats.Indices) > 0, nil
}

func setupCCRRemote(host string) error {
remoteSettings, err := ioutil.ReadFile("ccr/_meta/test/test_remote_settings.json")
if err != nil {
Expand Down Expand Up @@ -361,3 +402,23 @@ func httpPutJSON(host, path string, body []byte) ([]byte, *http.Response, error)

return body, resp, nil
}

type checkSuccessFunction func() (bool, error)

func waitForSuccess(f checkSuccessFunction, retryIntervalMs time.Duration, numAttempts int) (bool, error) {
for numAttempts > 0 {
success, err := f()
if err != nil {
return false, err
}

if success {
return success, nil
}

time.Sleep(retryIntervalMs * time.Millisecond)
numAttempts--
}

return false, nil
}

0 comments on commit 31ddac1

Please sign in to comment.