From 99c835891ca9d43f508fb2b5dade528cc88c9ec0 Mon Sep 17 00:00:00 2001 From: YaoZengzeng Date: Mon, 5 Aug 2019 19:31:26 +0800 Subject: [PATCH] receiver: avoid race of hashring Signed-off-by: YaoZengzeng --- pkg/receive/handler.go | 45 +++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5e16bdd3ba..1962dd9ca3 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "strconv" + "sync" "sync/atomic" "github.com/go-kit/kit/log" @@ -46,10 +47,12 @@ type Handler struct { logger log.Logger receiver *Writer router *route.Router - hashring Hashring options *Options listener net.Listener + mtx sync.RWMutex + hashring Hashring + // Metrics requestDuration *prometheus.HistogramVec requestsTotal *prometheus.CounterVec @@ -57,8 +60,7 @@ type Handler struct { forwardRequestsTotal *prometheus.CounterVec // These fields are uint32 rather than boolean to be able to use atomic functions. - storageReady uint32 - hashringReady uint32 + storageReady uint32 } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -140,20 +142,19 @@ func (h *Handler) StorageReady() { // Hashring sets the hashring for the handler and marks the hashring as ready. // If the hashring is nil, then the hashring is marked as not ready. func (h *Handler) Hashring(hashring Hashring) { - if hashring == nil { - atomic.StoreUint32(&h.hashringReady, 0) - h.hashring = nil - return - } + h.mtx.Lock() + defer h.mtx.Unlock() + h.hashring = hashring - atomic.StoreUint32(&h.hashringReady, 1) } // Verifies whether the server is ready or not. func (h *Handler) isReady() bool { sr := atomic.LoadUint32(&h.storageReady) - hr := atomic.LoadUint32(&h.hashringReady) - return sr > 0 && hr > 0 + h.mtx.RLock() + hr := h.hashring != nil + h.mtx.RUnlock() + return sr > 0 && hr } // Checks if server is ready, calls f if it is, returns 503 if it is not. @@ -275,6 +276,15 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *prompb.WriteRequest) error { wreqs := make(map[string]*prompb.WriteRequest) replicas := make(map[string]replica) + + // It is possible that hashring is ready in testReady() but unready now, + // so need to lock here. + h.mtx.RLock() + if h.hashring == nil { + h.mtx.RUnlock() + return errors.New("hashring is not ready") + } + // Batch all of the time series in the write request // into several smaller write requests that are // grouped by target endpoint. This ensures that @@ -285,6 +295,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p for i := range wreq.Timeseries { endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) if err != nil { + h.mtx.RUnlock() return err } if _, ok := wreqs[endpoint]; !ok { @@ -294,6 +305,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p wr := wreqs[endpoint] wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) } + h.mtx.RUnlock() return h.parallelizeRequests(ctx, tenant, replicas, wreqs) } @@ -400,14 +412,25 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri wreqs := make(map[string]*prompb.WriteRequest) replicas := make(map[string]replica) var i uint64 + + // It is possible that hashring is ready in testReady() but unready now, + // so need to lock here. + h.mtx.RLock() + if h.hashring == nil { + h.mtx.RUnlock() + return errors.New("hashring is not ready") + } + for i = 0; i < h.options.ReplicationFactor; i++ { endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[0], i) if err != nil { + h.mtx.RUnlock() return err } wreqs[endpoint] = wreq replicas[endpoint] = replica{i, true} } + h.mtx.RUnlock() err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) if errs, ok := err.(terrors.MultiError); ok {