Skip to content

Commit

Permalink
Refactoring for PR.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksey Sin <asin@ozon.ru>
  • Loading branch information
Aleksey Sin committed Jan 13, 2020
1 parent c71cc72 commit ec64370
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}

var storeTypeStr string
if st.StoreType() != nil {
storeTypeStr = st.StoreType().String()
storeType := st.StoreType()
if storeType != nil {
storeTypeStr = storeType.String()
}
metrics := &storeRequestMetrics{
withPayload: s.metrics.timeToFirstByte.WithLabelValues(st.LabelSetsString(), storeTypeStr, withPayloadLabel),
Expand All @@ -312,10 +313,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
queryTimeoutCount: s.metrics.queryTimeoutCount.WithLabelValues(st.LabelSetsString(), storeTypeStr),
}

seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, metrics))
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, metrics))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
Expand Down Expand Up @@ -380,6 +381,16 @@ type recvResponse struct {
err error
}

func startFrameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) {
frameTimeoutCtx := context.Background()
var cancel context.CancelFunc
if responseTimeout != 0 {
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout)
return frameTimeoutCtx, cancel
}
return frameTimeoutCtx, nil
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
Expand Down Expand Up @@ -410,10 +421,8 @@ func startStreamSeriesSet(
defer wg.Done()
defer close(s.recvCh)
for {
frameTimeoutCtx := context.Background()
var cancel context.CancelFunc
if s.responseTimeout != 0 {
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, s.responseTimeout)
frameTimeoutCtx, cancel := startFrameCtx(s.responseTimeout)
if cancel != nil {
defer cancel()
}
rCh := make(chan *recvResponse, 1)
Expand Down

0 comments on commit ec64370

Please sign in to comment.