-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
proxy.go
137 lines (113 loc) · 3.19 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package rules
import (
"context"
"io"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Proxy implements rulespb.Rules gRPC that fanouts requests to given rulespb.Rules and deduplication on the way.
type Proxy struct {
logger log.Logger
rules func() []rulespb.RulesClient
}
func RegisterRulesServer(rulesSrv rulespb.RulesServer) func(*grpc.Server) {
return func(s *grpc.Server) {
rulespb.RegisterRulesServer(s, rulesSrv)
}
}
// NewProxy returns new rules.Proxy.
func NewProxy(logger log.Logger, rules func() []rulespb.RulesClient) *Proxy {
return &Proxy{
logger: logger,
rules: rules,
}
}
func (s *Proxy) Rules(req *rulespb.RulesRequest, srv rulespb.Rules_RulesServer) error {
var (
g, gctx = errgroup.WithContext(srv.Context())
respChan = make(chan *rulespb.RuleGroup, 10)
groups []*rulespb.RuleGroup
)
for _, rulesClient := range s.rules() {
rs := &rulesStream{
client: rulesClient,
request: req,
channel: respChan,
server: srv,
}
g.Go(func() error { return rs.receive(gctx) })
}
go func() {
_ = g.Wait()
close(respChan)
}()
for resp := range respChan {
groups = append(groups, resp)
}
if err := g.Wait(); err != nil {
level.Error(s.logger).Log("err", err)
return err
}
for _, g := range groups {
if err := srv.Send(rulespb.NewRuleGroupRulesResponse(g)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send rules response").Error())
}
}
return nil
}
type rulesStream struct {
client rulespb.RulesClient
request *rulespb.RulesRequest
channel chan<- *rulespb.RuleGroup
server rulespb.Rules_RulesServer
}
func (stream *rulesStream) receive(ctx context.Context) error {
rules, err := stream.client.Rules(ctx, stream.request)
if err != nil {
err = errors.Wrapf(err, "fetching rules from rules client %v", stream.client)
if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
return err
}
if serr := stream.server.Send(rulespb.NewWarningRulesResponse(err)); serr != nil {
return serr
}
// Not an error if response strategy is warning.
return nil
}
for {
rule, err := rules.Recv()
if err == io.EOF {
return nil
}
if err != nil {
err = errors.Wrapf(err, "receiving rules from rules client %v", stream.client)
if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
return err
}
if err := stream.server.Send(rulespb.NewWarningRulesResponse(err)); err != nil {
return errors.Wrapf(err, "sending rules error to server %v", stream.server)
}
continue
}
if w := rule.GetWarning(); w != "" {
if err := stream.server.Send(rulespb.NewWarningRulesResponse(errors.New(w))); err != nil {
return errors.Wrapf(err, "sending rules warning to server %v", stream.server)
}
continue
}
select {
case stream.channel <- rule.GetGroup():
case <-ctx.Done():
return ctx.Err()
}
}
}