diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index fa46b5c83c..281168599f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -57,6 +57,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default("THANOS-TENANT").String() + replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default("THANOS-REPLICA").String() + + replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { @@ -101,6 +105,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri cw, *local, *tenantHeader, + *replicaHeader, + *replicationFactor, ) } } @@ -123,6 +129,8 @@ func runReceive( cw *receive.ConfigWatcher, endpoint string, tenantHeader string, + replicaHeader string, + replicationFactor uint64, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") @@ -137,12 +145,14 @@ func runReceive( localStorage := &tsdb.ReadyStorage{} receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Receiver: receiver, - ListenAddress: remoteWriteAddress, - Registry: reg, - ReadyStorage: localStorage, - Endpoint: endpoint, - TenantHeader: tenantHeader, + Receiver: receiver, + ListenAddress: remoteWriteAddress, + Registry: reg, + ReadyStorage: localStorage, + Endpoint: endpoint, + TenantHeader: tenantHeader, + ReplicaHeader: replicaHeader, + ReplicationFactor: replicationFactor, }) // Start all components while we wait for TSDB to open but only load diff --git a/docs/proposals/approved/201812_thanos-remote-receive.md b/docs/proposals/approved/201812_thanos-remote-receive.md index 4bf58557fb..f606895e7f 100644 --- a/docs/proposals/approved/201812_thanos-remote-receive.md +++ b/docs/proposals/approved/201812_thanos-remote-receive.md @@ -26,7 +26,7 @@ Prometheus has the remote write API to send samples collected by a Prometheus se ## Architecture -The Thanos receiver component seamlessly integrates into the rest of the Thanos components. It acts similarly to what is referred to in Thanos as a "source", in the current set of components, this is typically represented by the Thanos sidecar that is put next to Prometheus to ship tsdb blocks into object storage and reply to store API requests, however in the case of the Thanos receiver, the Thanos sidecar is not necessary anymore, as the data is replicated from the original Prometheus server to the Thanos receiver, and the Thanos receiver participates in the Thanos gossip mesh. The Prometheus server on a tenant’s infrastructure can therefore be completely vanilla and is just configured to replicate its time-series to the Thanos receiver. +The Thanos receiver component seamlessly integrates into the rest of the Thanos components. It acts similarly to what is referred to in Thanos as a "source", in the current set of components, this is typically represented by the Thanos sidecar that is put next to Prometheus to ship tsdb blocks into object storage and reply to store API requests, however in the case of the Thanos receiver, the Thanos sidecar is not necessary anymore, as the data is replicated from the original Prometheus server to the Thanos receiver, and the Thanos receiver participates in the Thanos gossip mesh. The Prometheus server on a tenant's infrastructure can therefore be completely vanilla and is just configured to replicate its time-series to the Thanos receiver. Instead of directly scraping metrics, however, the Thanos receiver accepts Prometheus remote-write requests, and writes these into a local instance of the Prometheus tsdb. Once successfully committed to the tenant's tsdbs, the requests returns successfully. To prevent data leaking at the database level, each tenant has an individual tsdb instance, meaning a single Thanos receiver may manage multiple tsdb instances. The receiver answers Thanos store API requests and uploads built blocks of the Prometheus tsdb. Implementation-wise, this just requires wiring up existing components. As tenant's data within object storage are separate objects, it may be enough separation to have a single bucket for all tenants, however, this architecture supports any setup of tenant to object storage bucket combination. @@ -76,18 +76,17 @@ For the Thanos receiver to work at scale there are some areas that need further ### Load distribution -In order to scale beyond a single machine, time-series are distributed among all receivers. In order to do this consistently, the Thanos receivers build a hash ring and use consistent hashing to distribute the time-series. Receivers are configured with their identity, and thus their position in the hash ring, by an external system (such as the configuration management system). The position in the hashring decides which time-series are accepted and stored by a Thanos receiver. +In order to scale beyond a single machine, time-series are distributed among all receivers. In order to do this consistently, the Thanos receivers build a hashring and use consistent hashing to distribute the time-series. Receivers are configured with their identity, and thus their position in the hashring, by an external system (such as the configuration management system). The position in the hashring decides which time-series are accepted and stored by a Thanos receiver. As the Thanos receivers route the requests to the responsible node, an edge load balancer is configured to randomly distribute load to all Thanos receivers available. -Time-series hashes are calculated from the entire label set of that time-series. To ensure that potentially large time-series with common labels do not all end up being ingested by the same node, the tenant’s ID should be included in the hash. The tenant's ID is obtained through a set of labels passed to the receiver via an HTTP header. The label keys defining a tenant must be pre-configured. For example, the receiver is configured with the following flag: +Time-series hashes are calculated from the entire label set of that time-series. To ensure that potentially large time-series with common labels do not all end up being ingested by the same node, the tenant’s ID should be included in the hash. The tenant's ID is passed to the receiver via an HTTP header. The header defining a tenant must be pre-configured. For example, the receiver is configured with the following flag: ``` ---tenant-labels=tenant ---tenant-header=X-THANOS-TENANT +--receive.tenant-header=THANOS-TENANT ``` -And a valid request would have the `X-THANOS-TENANT` header set to `{tenant="A"}`. When a tenant label is configured, it must be included in the header sent. +A valid request could have the `THANOS-TENANT` header set to `tenant-a`. If the header is not present in a request, then the request is interpreted as belonging to the empty string tenant ``. Using the tenant's ID in the hash will help to distribute the load across receivers. The hash is roughly calculated as follows. @@ -103,21 +102,30 @@ While the routing functionality could be a separate component, we choose to have In attempts to build similar systems a common fallacy has been to distribute all load from all tenants of the system onto a single set of ingestion nodes. This makes reasoning and management rather simple, however, has turned out to have stronger downsides than upsides. Companies using Cortex in production and offering it to clients have setup entirely separate clusters for different customers, because their load has caused incidents affecting other clients. In order to allow customers to send large amounts of data at irregular intervals the ingestion infrastructure needs to scale without impacting durability. Scaling the ingestion infrastructure can cause endpoints to not accept data temporarily, however, the write-ahead-log based replication can cope with this, as it backs off and continues sending its data once the ingestion infrastructure successfully processes those requests again. -Hard tenants in the Thanos receiver are configured in a configuration file, changes to this configuration must be orchestrated by a configuration management tool. When a remote write request is received by a Thanos receiver it goes through the list of configured hard tenants. For each hard tenant there is a separate hash ring respective to their ingestion nodes as described in the "Load distribution" section. A hard tenant also has the number of associated receive nodes belonging to it. A remote write request can be initially received by any receiver node, however, will only be dispatched to receiver nodes that correspond to that hard tenant. +Hard tenants in the Thanos receiver are configured in a configuration file. Changes to this configuration must be orchestrated by a configuration management tool. When a remote write request is received by a Thanos receiver, it goes through the list of configured hard tenants. For each hard tenant, there is a separate hashring respective to their ingestion nodes as described in the "Load distribution" section. A hard tenant also has the number of associated receive endpoints belonging to it. A remote write request can be initially received by any receiver node, however, will only be dispatched to receiver endpoints that correspond to that hard tenant. A sample of the configuration of tenants and their respective infrastructure: -``` -tenants: -- match: tenant-a - nodes: - - tenant-a-1.metrics.local -- hashmod: 0 - nodes: - - soft-tenants-1.metrics.local +```json +[ + { + hashring: "tenant-a", + endpoints: ["tenant-a-1.metrics.local:19291/api/v1/receive", "tenant-a-2.metrics.local:19291/api/v1/receive"], + tenants: ["tenant-a"] + }, + { + hashring: "tenants-b-c", + endpoints: ["tenant-b-c-1.metrics.local:19291/api/v1/receive", "tenant-b-c-2.metrics.local:19291/api/v1/receive"], + tenants: ["tenant-b", "tenant-c"] + }, + { + hashring: "soft-tenants", + endpoints: ["http://soft-tenants-1.metrics.local:19291/api/v1/receive"] + } +] ``` -To start, exact matches of tenant IDs will used to distribute requests to receive nodes. Additionally a sharding mechanism performing `hashmod` on the tenant ID, in order to shard the tenants among pools of receivers. Should it be necessary, more sophisticated mechanisms can be added later. When a request is received, the specified tenant is tested against the configured tenant ID until an exact match is found. If the specified tenant is the empty string, then any tenant is considered a valid match. If no hard tenancy is configured, a tenant will automatically land in a soft tenancy hashring. +To start, exact matches of tenant IDs will be used to distribute requests to receive endpoints. Should it be necessary, more sophisticated mechanisms can be added later. When a request is received, the tenant specified in the request is tested against the configured allowed tenants for each hashring until an exact match is found. If a hashring specifies no explicit tenants, then any tenant is considered a valid match; this allows for a cluster to provide soft-tenancy. Requests whose tenant ID matches no other hashring explicitly, will automatically land in this soft tenancy hashring. If no matching hashring is found and no soft tenancy is configured, the receiver responds with an error. ``` Soft tenant hashring @@ -157,11 +165,29 @@ To start, exact matches of tenant IDs will used to distribute requests to receiv +-----------------------+ ``` -The intention is that the load balancer can just distribute requests randomly to all Thanos receivers independent of the tenant. Should this turn out to cause problems, either an additional “distribution” layer can be inserted that performs this routing or a load balancer per hard tenant hashring can be introduced. The distribution layer would have the advantage of being able to keep a single endpoint for configuration of Prometheus servers for remote write, which could be convenient for users. +The intention is that the load balancer can distribute requests randomly to all Thanos receivers independent of the tenant. Should this turn out to cause problems, a distribution layer can be created with additional Thanos receivers whose names do not appear in the configuration file can be used to perform routing of requests. These receivers will forward all requests to the correct receiver in each hashring without storing any data themselves. Alternatively, a load balancer per hard tenant hashring can be introduced. The distribution layer would have the advantage of being able to keep a single endpoint for configuration of Prometheus servers for remote write, which could be convenient for users. + +### Replication + +The Thanos receiver supports replication of received time-series to other receivers in the same hashring. The replication factor is controlled by setting a flag on the receivers and indicates the maximum number of copies of any time-series that should be stored in the hashring. If any time-series in a write request received by a Thanos receiver is not successfully written to at least `(REPLICATION_FATOR + 1)/2` nodes, the receiver responds with an error. For example, to attempt to store 3 copies of every time-series and ensure that every time-series is successfully written to at least 2 Thanos receivers in the target hashring, all receivers should be configured with the following flag: + +``` +--receive.replication-factor=3 +``` + +Thanos receivers identify the replica number of a write request via a 0-indexed uint64 contained in an HTTP header. The header name can be configured via a flag: + +``` +--receive.replica-header=THANOS-REPLICA +``` + +If the header is present in a request, the receiver will look for the replica-th node of the hashring that should handle the request. If it is the receiver itself, then the request is stored locally, else it is forwarded to the correct endpoint. If the replica number of the request exceeds the configured replication factor or the total number of nodes in the target hashring, the receiver responds with an error. If the header is not present in a request, then the receiver will replicate the request to `REPLCIATION_FACTOR` nodes, setting the replica header on each request to ensure it is not replicated further. + +Note that replicating write requests may require additional compaction and deduplication of object storage as well as significantly increase infrastructure cost. ### Rollout/scaling/failure of receiver nodes -As replication is based on the Prometheus write-ahead-log and retries when the remote write backend is not available, intermediate downtime is tolerable and expected for receivers. Prometheus remote write treats 503 as temporary failures and continues do retry until the remote write receiving end responds again. +Prometheus remote write will retry whenever the remote write backend is not available, thus intermediate downtime is tolerable and expected for receivers. Prometheus remote write treats 503 as temporary failures and continues do retry until the remote write receiving end responds again. If this ingestion downtime is not acceptable, then a replication factor of 3 or more should be specified, ensuring that a write request is accepted in its entirety by at least 2 replicas. This way we can ensure there is no downtime of ingestion. On rollouts receivers do not need to re-shard data, but instead at shutdown in case of rollout or scaling flush the write-ahead-log to a Prometheus tsdb block and upload it to object storage. Rollouts that include a soft tenant being promoted to a hard tenant, does require all nodes of a hash-ring to upload its content as the hash-ring changes. When the nodes comes back and accepts remote write requests again, the tenant local Prometheus server will continue where it left off. When scaling, all nodes need to perform the above operation as the hashring is resized meaning all nodes will have a new distribution. In the case of a failure, and the hashring is not resized, it will load the write-ahead-log and assume where it left off. Partially succeeding requests return a 503 causing Prometheus to retry the full request. This works as identically existing timestamp-value matches are ignored by tsdb. Prometheus relies on this to de-duplicate federation request results, therefore it is safe to rely on this here as well. @@ -178,7 +204,6 @@ Decisions of the design have consequences some of which will show themselves in * This proposal describes the write-ahead-log based remote write, which is not (yet) merged in Prometheus: https://github.com/prometheus/prometheus/pull/4588. This landing may impact durability characteristics. - While there is work left the pull request seems to be close to completion * For compaction to work as described in this proposal, vertical compaction in tsdb needs to be possible. Implemented but not merged yet: https://github.com/prometheus/tsdb/pull/370 -* If downtime of ingestion, as in the described `503` for intermediate downtime, and Prometheus resuming when the backend becomes healthy again, turns out not to be an option, we could attempt to duplicate all write requests to 3 (or configurable amount) replicas, where a write request needs to be accepted by at least 2 replicas, this way we can ensure no downtime of ingestion. This may require additional compaction and deduplication of object storage as well as significantly increase infrastructure cost. * Additional safeguards may need to be put in place to ensure that hashring resizes do not occur on failed nodes, only once they have recovered and have successfully uploaded their blocks. [xxhash]: http://cyan4973.github.io/xxHash/ diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index f949a16d47..5acbaf2495 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -8,6 +8,7 @@ import ( stdlog "log" "net" "net/http" + "strconv" "sync/atomic" "github.com/go-kit/kit/log" @@ -24,41 +25,19 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/route" promtsdb "github.com/prometheus/prometheus/storage/tsdb" -) - -var ( - requestDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "thanos_http_request_duration_seconds", - Help: "Histogram of latencies for HTTP requests.", - Buckets: []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, - }, - []string{"handler"}, - ) - requestsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "thanos_http_requests_total", - Help: "Tracks the number of HTTP requests.", - }, []string{"code", "handler", "method"}, - ) - responseSize = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "thanos_http_response_size_bytes", - Help: "Histogram of response size for HTTP requests.", - Buckets: prometheus.ExponentialBuckets(100, 10, 8), - }, - []string{"handler"}, - ) + terrors "github.com/prometheus/tsdb/errors" ) // Options for the web Handler. type Options struct { - Receiver *Writer - ListenAddress string - Registry prometheus.Registerer - ReadyStorage *promtsdb.ReadyStorage - Endpoint string - TenantHeader string + Receiver *Writer + ListenAddress string + Registry prometheus.Registerer + ReadyStorage *promtsdb.ReadyStorage + Endpoint string + TenantHeader string + ReplicaHeader string + ReplicationFactor uint64 } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -71,52 +50,88 @@ type Handler struct { options *Options listener net.Listener + // Metrics + requestDuration *prometheus.HistogramVec + requestsTotal *prometheus.CounterVec + responseSize *prometheus.HistogramVec + forwardRequestsTotal *prometheus.CounterVec + // These fields are uint32 rather than boolean to be able to use atomic functions. storageReady uint32 hashringReady uint32 } -func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { - return promhttp.InstrumentHandlerDuration( - requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), - promhttp.InstrumentHandlerResponseSize( - responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), - promhttp.InstrumentHandlerCounter( - requestsTotal.MustCurryWith(prometheus.Labels{"handler": handlerName}), - handler, - ), - ), - ) -} - func NewHandler(logger log.Logger, o *Options) *Handler { - router := route.New().WithInstrumentation(instrumentHandler) if logger == nil { logger = log.NewNopLogger() } h := &Handler{ logger: logger, - router: router, readyStorage: o.ReadyStorage, receiver: o.Receiver, options: o, + requestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "thanos_http_request_duration_seconds", + Help: "Histogram of latencies for HTTP requests.", + Buckets: []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, + }, + []string{"handler"}, + ), + requestsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_http_requests_total", + Help: "Tracks the number of HTTP requests.", + }, []string{"code", "handler", "method"}, + ), + responseSize: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "thanos_http_response_size_bytes", + Help: "Histogram of response size for HTTP requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 8), + }, + []string{"handler"}, + ), + forwardRequestsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_receive_forward_requests_total", + Help: "The number of forward requests.", + }, []string{"result"}, + ), } + router := route.New().WithInstrumentation(h.instrumentHandler) + h.router = router + readyf := h.testReady router.Post("/api/v1/receive", readyf(h.receive)) if o.Registry != nil { o.Registry.MustRegister( - requestDuration, - requestsTotal, - responseSize, + h.requestDuration, + h.requestsTotal, + h.responseSize, + h.forwardRequestsTotal, ) } return h } +func (h *Handler) instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + return promhttp.InstrumentHandlerDuration( + h.requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), + promhttp.InstrumentHandlerResponseSize( + h.responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), + promhttp.InstrumentHandlerCounter( + h.requestsTotal.MustCurryWith(prometheus.Labels{"handler": handlerName}), + handler, + ), + ), + ) +} + // StorageReady marks the storage as ready. func (h *Handler) StorageReady() { atomic.StoreUint32(&h.storageReady, 1) @@ -159,7 +174,9 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc { // Close stops the Handler. func (h *Handler) Close() { - runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") + if h.listener != nil { + runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") + } } // Run serves the HTTP endpoints. @@ -193,6 +210,13 @@ func (h *Handler) Run() error { return httpSrv.Serve(h.listener) } +// replica encapsulates the replica number of a request and if the request is +// already replicated. +type replica struct { + n uint64 + replicated bool +} + func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { compressed, err := ioutil.ReadAll(r.Body) if err != nil { @@ -213,30 +237,44 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { return } + var rep replica + replicaRaw := r.Header.Get(h.options.ReplicaHeader) + // If the header is emtpy, we assume the request is not yet replicated. + if replicaRaw != "" { + if rep.n, err = strconv.ParseUint(replicaRaw, 10, 64); err != nil { + http.Error(w, "could not parse replica header", http.StatusBadRequest) + return + } + rep.replicated = true + } + // The replica value in the header is zero-indexed, thus we need >=. + if rep.n >= h.options.ReplicationFactor { + http.Error(w, "replica count exceeds replication factor", http.StatusBadRequest) + return + } + tenant := r.Header.Get(h.options.TenantHeader) - local, err := h.forward(r.Context(), tenant, &wreq) - if err != nil { + + // Forward any time series as necessary. All time series + // destined for the local node will be written to the receiver. + // Time series will be replicated as necessary. + if err := h.forward(r.Context(), tenant, rep, &wreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - // There may be no WriteRequest destined for the local node. - if local != nil { - if err := h.receiver.Receive(local); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - } } // forward accepts a write request, batches its time series by -// corresponding endpoint, and forwards them in parallel. It returns a write -// request containing only the time series that correspond to -// local handler. For a given write request, at most one outgoing -// write request will be made to every other node in the hashring. -// The function only returns when all requests have finished, +// corresponding endpoint, and forwards them in parallel to the +// correct endpoint. Requests destined for the local node are written +// the the local receiver. For a given write request, at most one outgoing +// write request will be made to every other node in the hashring, +// unless the request needs to be replicated. +// The function only returns when all requests have finished // or the context is canceled. -func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.WriteRequest) (*prompb.WriteRequest, error) { +func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *prompb.WriteRequest) error { wreqs := make(map[string]*prompb.WriteRequest) + replicas := make(map[string]replica) // Batch all of the time series in the write request // into several smaller write requests that are // grouped by target endpoint. This ensures that @@ -245,17 +283,25 @@ func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.Write // to every other node in the hashring, rather than // one request per time series. for i := range wreq.Timeseries { - endpoint, err := h.hashring.Get(tenant, &wreq.Timeseries[i]) + endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) if err != nil { - return nil, err + return err } if _, ok := wreqs[endpoint]; !ok { wreqs[endpoint] = &prompb.WriteRequest{} + replicas[endpoint] = r } wr := wreqs[endpoint] wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) } + return h.parallelizeRequests(ctx, tenant, replicas, wreqs) +} + +// parallelizeRequests parallelizes a given set of write requests. +// The function only returns when all requests have finished +// or the context is canceled. +func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest) error { ec := make(chan error) defer close(ec) // We don't wan't to use a sync.WaitGroup here because that @@ -264,16 +310,28 @@ func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.Write // as in order to collect errors while doing wg.Wait, we would // need a separate error collection goroutine. var n int - var local *prompb.WriteRequest for endpoint := range wreqs { + n++ + // If the request is not yet replicated, let's replicate it. + if !replicas[endpoint].replicated { + go func(endpoint string) { + ec <- h.replicate(ctx, tenant, wreqs[endpoint]) + }(endpoint) + continue + } // If the endpoint for the write request is the - // local node, then don't make a request. - // Save it for later so it can be returned. + // local node, then don't make a request but store locally. + // By handing replication to the local node in the same + // function as replication to other nodes, we can treat + // a failure to write locally as just another error that + // can be ignored if the replication factor is met. if endpoint == h.options.Endpoint { - local = wreqs[endpoint] + go func(endpoint string) { + ec <- h.receiver.Receive(wreqs[endpoint]) + }(endpoint) continue } - n++ + // Make a request to the specified endpoint. go func(endpoint string) { buf, err := proto.Marshal(wreqs[endpoint]) if err != nil { @@ -288,13 +346,31 @@ func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.Write return } req.Header.Add(h.options.TenantHeader, tenant) + req.Header.Add(h.options.ReplicaHeader, strconv.FormatUint(replicas[endpoint].n, 10)) + + // Increment the counters as necessary now that + // the requests will go out. + defer func() { + if err != nil { + h.forwardRequestsTotal.WithLabelValues("error").Inc() + return + } + h.forwardRequestsTotal.WithLabelValues("success").Inc() + }() + // Actually make the request against the endpoint // we determined should handle these time series. - if _, err := http.DefaultClient.Do(req.WithContext(ctx)); err != nil { + var res *http.Response + res, err = http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { level.Error(h.logger).Log("msg", "forward request error", "err", err, "endpoint", endpoint) ec <- err return } + if res.StatusCode != http.StatusOK { + ec <- errors.New(res.Status) + return + } ec <- nil }(endpoint) } @@ -304,16 +380,38 @@ func (h *Handler) forward(ctx context.Context, tenant string, wreq *prompb.Write // for every error received on the chan. This simplifies // error collection and avoids data races with a separate // error collection goroutine. - var errs error + var errs terrors.MultiError for ; n > 0; n-- { if err := <-ec; err != nil { - if errs == nil { - errs = err - continue - } - errs = errors.Wrap(errs, err.Error()) + errs.Add(err) + } + } + + return errs.Err() +} + +// replicate replicates a write request to (replication-factor) nodes +// selected by the tenant and time series. +// The function only returns when all replication requests have finished +// or the context is canceled. +func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error { + wreqs := make(map[string]*prompb.WriteRequest) + replicas := make(map[string]replica) + var i uint64 + for i = 0; i < h.options.ReplicationFactor; i++ { + endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[0], i) + if err != nil { + return err } + wreqs[endpoint] = wreq + replicas[endpoint] = replica{i, true} } - return local, errs + err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) + if errs, ok := err.(terrors.MultiError); ok { + if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 { + return errors.New("did not meet replication threshhold") + } + } + return errors.Wrap(err, "could not replicate write request") } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 34f874ca9b..3ab0932d69 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -3,6 +3,7 @@ package receive import ( "context" "errors" + "fmt" "sort" "sync" @@ -13,11 +14,26 @@ import ( const sep = '\xff' +// insufficientNodesError is returned when a hashring does not +// have enough nodes to satisfy a request for a node. +type insufficientNodesError struct { + have uint64 + want uint64 +} + +// Error implements the error interface. +func (i *insufficientNodesError) Error() string { + return fmt.Sprintf("insufficient nodes; have %d, want %d", i.have, i.want) +} + // Hashring finds the correct node to handle a given time series // for a specified tenant. // It returns the node and any error encountered. type Hashring interface { + // Get returns the first node that should handle the given tenant and time series. Get(tenant string, timeSeries *prompb.TimeSeries) (string, error) + // GetN returns the nth node that should handle the given tenant and time series. + GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error) } // hash returns a hash for the given tenant and time series. @@ -41,7 +57,15 @@ func hash(tenant string, ts *prompb.TimeSeries) uint64 { type SingleNodeHashring string // Get implements the Hashring interface. -func (s SingleNodeHashring) Get(_ string, _ *prompb.TimeSeries) (string, error) { +func (s SingleNodeHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { + return s.GetN(tenant, ts, 0) +} + +// GetN implements the Hashring interface. +func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (string, error) { + if n > 0 { + return "", &insufficientNodesError{have: 1, want: n + 1} + } return string(s), nil } @@ -50,8 +74,15 @@ type simpleHashring []string // Get returns a target to handle the given tenant and time series. func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { - // Always return nil here to implement the Hashring interface. - return s[hash(tenant, ts)%uint64(len(s))], nil + return s.GetN(tenant, ts, 0) +} + +// GetN returns the nth target to handle the given tenant and time series. +func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { + if n >= uint64(len(s)) { + return "", &insufficientNodesError{have: uint64(len(s)), want: n + 1} + } + return s[(hash(tenant, ts)+n)%uint64(len(s))], nil } // multiHashring represents a set of hashrings. @@ -70,11 +101,16 @@ type multiHashring struct { // Get returns a target to handle the given tenant and time series. func (m *multiHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { + return m.GetN(tenant, ts, 0) +} + +// GetN returns the nth target to handle the given tenant and time series. +func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { m.mu.RLock() h, ok := m.cache[tenant] m.mu.RUnlock() if ok { - return h.Get(tenant, ts) + return h.GetN(tenant, ts, n) } var found bool // If the tenant is not in the cache, then we need to check @@ -94,7 +130,7 @@ func (m *multiHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error m.mu.Lock() m.cache[tenant] = m.hashrings[i] m.mu.Unlock() - return m.hashrings[i].Get(tenant, ts) + return m.hashrings[i].GetN(tenant, ts, n) } } return "", errors.New("no matching hashring to handle tenant") diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 643650eea2..90e095bc9d 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -30,7 +30,7 @@ var ( Add(scraper(3, defaultPromConfig("prom-ha", 1))). Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). - Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)))) + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), 1)) queryFileSDSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0))). @@ -38,7 +38,7 @@ var ( Add(scraper(3, defaultPromConfig("prom-ha", 1))). Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1))). - Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)))) + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), 1)) ) func TestQuery(t *testing.T) { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index e9e4bbed00..463bdda12c 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -15,18 +15,94 @@ import ( ) var ( + // The hashring suite creates three receivers, each with a Prometheus + // remote-writing data to it. However, due to the hashing of the labels, + // the time series from the Prometheus is forwarded to a different + // receiver in the hashring than the one handling the request. + // The querier queries all the receivers and the test verifies + // the time series are forwarded to the correct receive node. receiveHashringSuite = newSpinupSuite(). - Add(querierWithStoreFlags(1, "replica", remoteWriteReceiveGRPC(1), remoteWriteReceiveGRPC(2), remoteWriteReceiveGRPC(3))). - Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). - Add(receiver(2, defaultPromRemoteWriteConfig(nodeExporterHTTP(2), remoteWriteEndpoint(2)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). - Add(receiver(3, defaultPromRemoteWriteConfig(nodeExporterHTTP(3), remoteWriteEndpoint(3)), remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))) + Add(querierWithStoreFlags(1, "replica", remoteWriteReceiveGRPC(1), remoteWriteReceiveGRPC(2), remoteWriteReceiveGRPC(3))). + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), 1, remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). + Add(receiver(2, defaultPromRemoteWriteConfig(nodeExporterHTTP(2), remoteWriteEndpoint(2)), 1, remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). + Add(receiver(3, defaultPromRemoteWriteConfig(nodeExporterHTTP(3), remoteWriteEndpoint(3)), 1, remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))) + receiveHashringMetrics = []model.Metric{ + { + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(1)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("2"), + }, + { + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(2)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("3"), + }, + { + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(3)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("1"), + }, + } + // The replication suite creates three receivers but only one + // Prometheus that remote-writes data. The querier queries all + // receivers and the test verifies that the time series are + // replicated to all of the nodes. + receiveReplicationSuite = newSpinupSuite(). + Add(querierWithStoreFlags(1, "replica", remoteWriteReceiveGRPC(1), remoteWriteReceiveGRPC(2), remoteWriteReceiveGRPC(3))). + Add(receiver(1, defaultPromRemoteWriteConfig(nodeExporterHTTP(1), remoteWriteEndpoint(1)), 3, remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). + Add(receiver(2, defaultPromConfig("no-remote-write", 2), 3, remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))). + Add(receiver(3, defaultPromConfig("no-remote-write", 3), 3, remoteWriteEndpoint(1), remoteWriteEndpoint(2), remoteWriteEndpoint(3))) + receiveReplicationMetrics = []model.Metric{ + { + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(1)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("1"), + }, + { + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(1)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("2"), + }, + { + "__name__": "up", + "instance": model.LabelValue(nodeExporterHTTP(1)), + "job": "node", + "receive": "true", + "replica": model.LabelValue("3"), + }, + } ) +type receiveTestConfig struct { + testConfig + metrics []model.Metric +} + func TestReceive(t *testing.T) { - for _, tt := range []testConfig{ + for _, tt := range []receiveTestConfig{ + { + testConfig{ + "hashring", + receiveHashringSuite, + }, + receiveHashringMetrics, + }, { - "hashring", - receiveHashringSuite, + testConfig{ + "replication", + receiveReplicationSuite, + }, + receiveReplicationMetrics, }, } { t.Run(tt.name, func(t *testing.T) { @@ -37,7 +113,7 @@ func TestReceive(t *testing.T) { // testReceive runs a setup of Prometheus servers, receive nodes, and query nodes and verifies that // queries return data from the Prometheus servers. Additionally it verifies that remote-writes were routed through the correct receive node. -func testReceive(t *testing.T, conf testConfig) { +func testReceive(t *testing.T, conf receiveTestConfig) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) exit, err := conf.suite.Exec(t, ctx, conf.name) @@ -84,7 +160,7 @@ func testReceive(t *testing.T, conf testConfig) { return errors.Errorf("unexpected warnings %s", warnings) } - expectedRes := 3 + expectedRes := len(conf.metrics) if len(res) != expectedRes { return errors.Errorf("unexpected result size %d, expected %d", len(res), expectedRes) } @@ -92,25 +168,7 @@ func testReceive(t *testing.T, conf testConfig) { return nil })) - testutil.Equals(t, model.Metric{ - "__name__": "up", - "instance": model.LabelValue(nodeExporterHTTP(1)), - "job": "node", - "receive": "true", - "replica": model.LabelValue("2"), - }, res[0].Metric) - testutil.Equals(t, model.Metric{ - "__name__": "up", - "instance": model.LabelValue(nodeExporterHTTP(2)), - "job": "node", - "receive": "true", - "replica": model.LabelValue("3"), - }, res[1].Metric) - testutil.Equals(t, model.Metric{ - "__name__": "up", - "instance": model.LabelValue(nodeExporterHTTP(3)), - "job": "node", - "receive": "true", - "replica": model.LabelValue("1"), - }, res[2].Metric) + for i, metric := range conf.metrics { + testutil.Equals(t, metric, res[i].Metric) + } } diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index abe98ebbad..96f3f2dae7 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "path" + "strconv" "syscall" "testing" "time" @@ -125,7 +126,7 @@ func scraper(i int, config string) cmdScheduleFunc { } } -func receiver(i int, config string, receiveAddresses ...string) cmdScheduleFunc { +func receiver(i int, config string, replicationFactor int, receiveAddresses ...string) cmdScheduleFunc { if len(receiveAddresses) == 0 { receiveAddresses = []string{remoteWriteEndpoint(1)} } @@ -165,6 +166,7 @@ func receiver(i int, config string, receiveAddresses ...string) cmdScheduleFunc "--labels", fmt.Sprintf(`replica="%d"`, i), "--tsdb.path", promDir, "--log.level", "debug", + "--receive.replication-factor", strconv.Itoa(replicationFactor), "--receive.local-endpoint", remoteWriteEndpoint(i), "--receive.hashrings-file", path.Join(hashringsFileDir, "hashrings.json"), "--receive.hashrings-file-refresh-interval", "5s"))), nil