Skip to content

Commit

Permalink
[bugfix] add Promtail cloudflare close (#7394)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
If the http server closes the connection, the client may also return a
non-empty reader io.ReadCloser.

**Which issue(s) this PR fixes**:
Fixes #6150

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
liguozhong and chaudum authored Nov 8, 2022
1 parent b3664ff commit fe49e66
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Check the history of the branch FIXME.
* [7414](https://github.com/grafana/loki/pull/7414) **thepalbi**: Add basic tracing support

##### Fixes
* [7394](https://github.com/grafana/loki/pull/7394) **liguozhong**: Fix issue with the Cloudflare target that caused it to stop working after it received an error in the logpull request as explained in issue https://github.com/grafana/loki/issues/6150
* [6766](https://github.com/grafana/loki/pull/6766) **kavirajk**: fix(logql): Make `LabelSampleExtractor` ignore processing the line if it doesn't contain that specific label. Fixes unwrap behavior explained in the issue https://github.com/grafana/loki/issues/6713
* [7016](https://github.com/grafana/loki/pull/7016) **chodges15**: Fix issue with dropping logs when a file based SD target's labels are updated
* [7461](https://github.com/grafana/loki/pull/7461) **MarNicGit**: Promtail: Fix collecting userdata field from Windows Event Log
Expand Down
3 changes: 3 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func (t *Target) pull(ctx context.Context, start, end time.Time) error {
level.Warn(t.logger).Log("msg", "failed iterating over logs, out of cloudflare range, not retrying", "err", err, "start", start, "end", end, "retries", backoff.NumRetries())
return nil
} else if err != nil {
if it != nil {
it.Close()
}
errs.Add(err)
backoff.Wait()
continue
Expand Down
34 changes: 34 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,37 @@ func Test_CloudflareTarget(t *testing.T) {
require.Greater(t, newPos, end.UnixNano())
}

func Test_RetryErrorLogpullReceived(t *testing.T) {
var (
w = log.NewSyncWriter(os.Stderr)
logger = log.NewLogfmtLogger(w)
end = time.Unix(0, time.Hour.Nanoseconds())
start = time.Unix(0, end.Add(-30*time.Minute).UnixNano())
client = fake.New(func() {})
cfClient = newFakeCloudflareClient()
)
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
err: ErrorLogpullReceived,
}, nil).Times(2) // just retry once
// replace the client
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
return cfClient, nil
}
defaultBackoff.MinBackoff = 0
defaultBackoff.MaxBackoff = 5
ta := &Target{
logger: logger,
handler: client,
client: cfClient,
config: &scrapeconfig.CloudflareConfig{
Labels: make(model.LabelSet),
},
metrics: NewMetrics(nil),
}

require.NoError(t, ta.pull(context.Background(), start, end))
}

func Test_RetryErrorIterating(t *testing.T) {
var (
w = log.NewSyncWriter(os.Stderr)
Expand All @@ -124,6 +155,9 @@ func Test_RetryErrorIterating(t *testing.T) {
`{"EdgeStartTimestamp":3, "EdgeRequestHost":"foo.com"}`,
},
}, nil).Once()
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
err: ErrorLogpullReceived,
}, nil).Once()
// replace the client.
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
return cfClient, nil
Expand Down
15 changes: 13 additions & 2 deletions clients/pkg/promtail/targets/cloudflare/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/stretchr/testify/mock"
)

var ErrorLogpullReceived = errors.New("error logpull received")

type fakeCloudflareClient struct {
mock.Mock
}
Expand Down Expand Up @@ -45,7 +47,12 @@ func (f *fakeLogIterator) Next() bool {
func (f *fakeLogIterator) Err() error { return f.err }
func (f *fakeLogIterator) Line() []byte { return []byte(f.current) }
func (f *fakeLogIterator) Fields() (map[string]string, error) { return nil, nil }
func (f *fakeLogIterator) Close() error { return nil }
func (f *fakeLogIterator) Close() error {
if f.err == ErrorLogpullReceived {
f.err = nil
}
return nil
}

func newFakeCloudflareClient() *fakeCloudflareClient {
return &fakeCloudflareClient{}
Expand All @@ -54,7 +61,11 @@ func newFakeCloudflareClient() *fakeCloudflareClient {
func (f *fakeCloudflareClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) {
r := f.Called(ctx, start, end)
if r.Get(0) != nil {
return r.Get(0).(cloudflare.LogpullReceivedIterator), nil
it := r.Get(0).(cloudflare.LogpullReceivedIterator)
if it.Err() == ErrorLogpullReceived {
return it, it.Err()
}
return it, nil
}
return nil, r.Error(1)
}

0 comments on commit fe49e66

Please sign in to comment.