forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pkg/rules/proxy: fix hotlooping when receiving client errors
Currently, if we receive an error from the underlying client stream, we continue with trying to receive additional data. This causes a hotloop as we will receive the same error again. This fixes it by returning in the error case and adds a unit test for the proxy logic. Fixes thanos-io#3717 Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>
- Loading branch information
1 parent
4d91e94
commit 4b4ee62
Showing
2 changed files
with
259 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package rules | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"os" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/pkg/errors" | ||
"github.com/thanos-io/thanos/pkg/rules/rulespb" | ||
"github.com/thanos-io/thanos/pkg/store/storepb" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
type testRulesClient struct { | ||
grpc.ClientStream | ||
rulesErr, recvErr error | ||
response *rulespb.RulesResponse | ||
sentResponse bool | ||
} | ||
|
||
func (t *testRulesClient) String() string { | ||
return "test" | ||
} | ||
|
||
func (t *testRulesClient) Recv() (*rulespb.RulesResponse, error) { | ||
// A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. | ||
if t.recvErr != nil { | ||
return nil, t.recvErr | ||
} | ||
|
||
if t.sentResponse { | ||
return nil, io.EOF | ||
} | ||
t.sentResponse = true | ||
|
||
return t.response, nil | ||
} | ||
|
||
func (t *testRulesClient) Rules(ctx context.Context, in *rulespb.RulesRequest, opts ...grpc.CallOption) (rulespb.Rules_RulesClient, error) { | ||
return t, t.rulesErr | ||
} | ||
|
||
var _ rulespb.RulesClient = &testRulesClient{} | ||
|
||
type testRulesServer struct { | ||
grpc.ServerStream | ||
sendErr error | ||
response *rulespb.RulesResponse | ||
} | ||
|
||
func (t *testRulesServer) String() string { | ||
return "test" | ||
} | ||
|
||
func (t *testRulesServer) Send(response *rulespb.RulesResponse) error { | ||
if t.sendErr != nil { | ||
return t.sendErr | ||
} | ||
t.response = response | ||
return nil | ||
} | ||
|
||
func (t *testRulesServer) Context() context.Context { | ||
return context.Background() | ||
} | ||
|
||
func TestProxy(t *testing.T) { | ||
logger := log.NewLogfmtLogger(os.Stderr) | ||
|
||
for _, tc := range []struct { | ||
name string | ||
request *rulespb.RulesRequest | ||
client rulespb.RulesClient | ||
server *testRulesServer | ||
wantResponse *rulespb.RulesResponse | ||
wantError error | ||
}{ | ||
{ | ||
name: "rule group proxy success", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ | ||
Name: "foo", | ||
}), | ||
recvErr: nil, | ||
}, | ||
server: &testRulesServer{}, | ||
wantResponse: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ | ||
Name: "foo", | ||
}), | ||
}, | ||
{ | ||
name: "warning proxy success", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: rulespb.NewWarningRulesResponse(errors.New("warning from client")), | ||
recvErr: nil, | ||
}, | ||
server: &testRulesServer{}, | ||
wantResponse: rulespb.NewWarningRulesResponse(errors.New("warning from client")), | ||
}, | ||
{ | ||
name: "warn: retreiving rules client failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: nil, | ||
rulesErr: errors.New("retreiving rules failed"), | ||
}, | ||
server: &testRulesServer{}, | ||
wantResponse: rulespb.NewWarningRulesResponse(errors.New("fetching rules from rules client test: retreiving rules failed")), | ||
}, | ||
{ | ||
name: "warn: retreiving rules client failed, forward warning failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: nil, | ||
rulesErr: errors.New("retreiving rules failed"), | ||
}, | ||
server: &testRulesServer{ | ||
sendErr: errors.New("forwarding warning response failed"), | ||
}, | ||
wantError: errors.New("forwarding warning response failed"), | ||
}, | ||
{ | ||
name: "abort: retreiving rules client failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, | ||
}, | ||
client: &testRulesClient{ | ||
response: nil, | ||
rulesErr: errors.New("retreiving rules failed"), | ||
}, | ||
server: &testRulesServer{}, | ||
wantError: errors.New("fetching rules from rules client test: retreiving rules failed"), | ||
}, | ||
{ | ||
name: "warn: receive failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: nil, | ||
recvErr: errors.New("503 from Prometheus"), | ||
}, | ||
server: &testRulesServer{}, | ||
wantResponse: rulespb.NewWarningRulesResponse(errors.New("receiving rules from rules client test: 503 from Prometheus")), | ||
}, | ||
{ | ||
name: "warn: receive failed, forward warning failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: nil, | ||
recvErr: errors.New("503 from Prometheus"), | ||
}, | ||
server: &testRulesServer{ | ||
sendErr: errors.New("forwarding warning response failed"), | ||
}, | ||
wantError: errors.New("sending rules error to server test: forwarding warning response failed"), | ||
}, | ||
{ | ||
name: "abort: receive failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, | ||
}, | ||
client: &testRulesClient{ | ||
response: nil, | ||
recvErr: errors.New("503 from Prometheus"), | ||
}, | ||
server: &testRulesServer{}, | ||
wantError: errors.New("receiving rules from rules client test: 503 from Prometheus"), | ||
}, | ||
{ | ||
name: "send failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ | ||
Name: "foo", | ||
}), | ||
recvErr: nil, | ||
}, | ||
server: &testRulesServer{ | ||
sendErr: errors.New("sending message failed"), | ||
}, | ||
wantError: errors.New("rpc error: code = Unknown desc = send rules response: sending message failed"), | ||
}, | ||
{ | ||
name: "sending warning response failed", | ||
request: &rulespb.RulesRequest{ | ||
Type: rulespb.RulesRequest_ALL, | ||
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, | ||
}, | ||
client: &testRulesClient{ | ||
response: rulespb.NewWarningRulesResponse(errors.New("warning from client")), | ||
recvErr: nil, | ||
}, | ||
server: &testRulesServer{ | ||
sendErr: errors.New("sending message failed"), | ||
}, | ||
wantError: errors.New("sending rules warning to server test: sending message failed"), | ||
}, | ||
} { | ||
t.Run(tc.name, func(t *testing.T) { | ||
p := NewProxy(logger, func() []rulespb.RulesClient { | ||
return []rulespb.RulesClient{tc.client} | ||
}) | ||
|
||
err := p.Rules(tc.request, tc.server) | ||
gotErr := "<nil>" | ||
if err != nil { | ||
gotErr = err.Error() | ||
} | ||
wantErr := "<nil>" | ||
if tc.wantError != nil { | ||
wantErr = tc.wantError.Error() | ||
} | ||
|
||
if gotErr != wantErr { | ||
t.Errorf("want error %q, got %q", wantErr, gotErr) | ||
} | ||
|
||
if !reflect.DeepEqual(tc.wantResponse, tc.server.response) { | ||
t.Errorf("want response %v, got %v", tc.wantResponse, tc.server.response) | ||
} | ||
}) | ||
} | ||
} |