Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve store timeouts #1789

Merged
merged 5 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 64 additions & 58 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,7 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb
}

func (s ctxRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
return
case s.ch <- r:
return
}
s.ch <- r
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
Expand Down Expand Up @@ -348,6 +343,21 @@ type streamSeriesSet struct {
closeSeries context.CancelFunc
}

type recvResponse struct {
r *storepb.SeriesResponse
err error
}

func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) {
frameTimeoutCtx := context.Background()
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
var cancel context.CancelFunc
if responseTimeout != 0 {
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout)
return frameTimeoutCtx, cancel
}
return frameTimeoutCtx, func() {}
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
Expand Down Expand Up @@ -383,78 +393,74 @@ func startStreamSeriesSet(
emptyStreamResponses.Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't we want to increment this if the context was actually cancelled.. right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, therefore numResponses++ only on recv processed.

}
}()
for {
r, err := s.stream.Recv()

if err == io.EOF {
return
}

if err != nil {
wrapErr := errors.Wrapf(err, "receive series from %s", s.name)
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr))
rCh := make(chan *recvResponse)
done := make(chan struct{})
go func() {
for {
r, err := s.stream.Recv()
select {
case <-done:
close(rCh)
return
case rCh <- &recvResponse{r: r, err: err}:
}
}
}()
for {
frameTimeoutCtx, cancel := frameCtx(s.responseTimeout)
defer cancel()
var rr *recvResponse
select {
case <-ctx.Done():
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done)
return
case <-frameTimeoutCtx.Done():
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data in %s from %s", s.responseTimeout.String(), s.name), done)
return
case rr = <-rCh:
}

s.errMtx.Lock()
s.err = wrapErr
s.errMtx.Unlock()
if rr.err == io.EOF {
close(done)
return
}

if rr.err != nil {
wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why another var? (: We can inline this.. not a blocker though.

s.handleErr(wrapErr, done)
return
}
numResponses++

if w := r.GetWarning(); w != "" {
if w := rr.r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}

select {
case s.recvCh <- r.GetSeries():
continue
case <-ctx.Done():
return
}

s.recvCh <- rr.r.GetSeries()
}
}()
return s
}

// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
ctx := s.ctx
timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name)

if s.responseTimeout != 0 {
timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)
func (s *streamSeriesSet) handleErr(err error, done chan struct{}) {
defer close(done)
s.closeSeries()

timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout)
defer done()
ctx = timeoutCtx
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
}

select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
// closeSeries to shutdown a goroutine in startStreamSeriesSet.
s.closeSeries()

err := errors.Wrap(ctx.Err(), timeoutMsg)
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return false
}
// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
s.currSeries, ok = <-s.recvCh
return ok
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
Loading