Skip to content

Commit

Permalink
Implement capnproto replication
Browse files Browse the repository at this point in the history
Our profiles from production show that a lot of CPU and memory in receivers
is used for unmarshaling protobuf messages. Although it is not possible to change
the remote-write format, we have the freedom to change the protocol used
for replicating timeseries data.

This commit introduces a new feature in receivers where replication can be done
using Cap'n Proto instead of gRPC + Protobuf. The advantage of the former protocol
is that deserialization is far cheaper and fields can be accessed directly from
the received message (byte slice) without allocating intermediate objects.
There is an additional cost for serialization because we have to convert from
Protobuf to the Cap'n proto format, but in our setup this still results in a net
reduction in resource usage.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Sep 27, 2024
1 parent 6ff5e1b commit ee70d10
Show file tree
Hide file tree
Showing 25 changed files with 3,847 additions and 297 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
include .bingo/Variables.mk
include .busybox-versions

FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -path ./internal/cortex -prune -o -name '*.go' -print)
FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -path ./internal/cortex -prune -o -path '*.capnp.go' -prune -o -name '*.go' -print)
MD_FILES_TO_FORMAT = $(shell find docs -name "*.md") $(shell find examples -name "*.md") $(filter-out mixin/runbook.md, $(shell find mixin -name "*.md")) $(shell ls *.md)
FAST_MD_FILES_TO_FORMAT = $(shell git diff --name-only | grep "\.md")

Expand Down Expand Up @@ -395,11 +395,15 @@ go-lint: check-git deps $(GOLANGCI_LINT) $(FAILLINT)
$(call require_clean_work_tree,'detected not clean work tree before running lint, previous job changed something?')
@echo ">> verifying modules being imported"
@# TODO(bwplotka): Add, Printf, DefaultRegisterer, NewGaugeFunc and MustRegister once exception are accepted. Add fmt.{Errorf}=github.com/pkg/errors.{Errorf} once https://github.com/fatih/faillint/issues/10 is addressed.
<<<<<<< HEAD
@$(FAILLINT) -paths "errors=github.com/pkg/errors,\
github.com/prometheus/tsdb=github.com/prometheus/prometheus/tsdb,\
github.com/prometheus/prometheus/prompb=github.com/thanos-io/thanos/pkg/store/storepb/prompb,\
github.com/gogo/protobuf=google.golang.org/protobuf,\
github.com/gogo/protobuf/proto=google.golang.org/protobuf/proto,\
=======
@$(FAILLINT) -paths "github.com/prometheus/tsdb=github.com/prometheus/prometheus/tsdb,\
>>>>>>> 729ad107 (Add tests)
github.com/prometheus/prometheus/pkg/testutils=github.com/thanos-io/thanos/pkg/testutil,\
github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUntypedFunc,UntypedFunc},\
github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\
Expand Down
48 changes: 37 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package main

import (
"context"
"fmt"
"net"
"os"
"path"
"strings"
Expand Down Expand Up @@ -259,6 +261,7 @@ func runReceive(
Limiter: limiter,

AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
UseCapNProtoReplication: conf.useCapNProtoReplication,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -452,6 +455,23 @@ func runReceive(
}
}

{
handler := receive.NewCapNProtoHandler(logger, writer)

Check failure on line 459 in cmd/thanos/receive.go

View workflow job for this annotation

GitHub Actions / Go build with -tags=stringlabels

cannot use writer (variable of type *receive.Writer) as *receive.CapNProtoWriter value in argument to receive.NewCapNProtoHandler
listener, err := net.Listen("tcp", conf.replicationAddr)
if err != nil {
return err
}
server := receive.NewCapNProtoServer(listener, handler)
g.Add(func() error {
return server.ListenAndServe()
}, func(err error) {
server.Shutdown()
if err := listener.Close(); err != nil {
level.Warn(logger).Log("msg", "Cap'n Proto server did not shut down gracefully", "err", err.Error())
}
})
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down Expand Up @@ -782,6 +802,7 @@ type receiveConfig struct {

grpcConfig grpcConfig

replicationAddr string
rwAddress string
rwServerCert string
rwServerKey string
Expand All @@ -803,17 +824,18 @@ type receiveConfig struct {
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
useCapNProtoReplication bool

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
Expand Down Expand Up @@ -914,6 +936,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

cmd.Flag("receive.capnproto-replication", "Use Cap'n Proto for replication requests.").Default("false").BoolVar(&rc.useCapNProtoReplication)

cmd.Flag("receive.capnproto-address", "Address for the Cap'n Proto server.").Default(fmt.Sprintf("0.0.0.0:%s", receive.DefaultCapNProtoPort)).StringVar(&rc.replicationAddr)

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())
Expand Down
19 changes: 19 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the si

This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes.

### Replication protocols

By default, Receivers will replicate data using Protobuf over gRPC. Deserializing protobuf-encoded messages can often be resource intensive and cause a lot of GC pressure.
It is possible to use [Cap'N Proto](https://capnproto.org/) as the replication encoding and RPC framework.

In order to enable this mode, you can enable the `receive.capnproto-replication` flag on the receiver. Thanos will try to infer the Cap'N Proto address of each peer in
the hashring using the existing gRPC address. You can also explicitly set the Cap'N Proto as follows:
```json
[
{
"endpoints": [
{"address": "node-1:10901", "capnproto_address": "node-1:19391"},
{"address": "node-2:10901", "capnproto_address": "node-2:19391"},
{"address": "node-3:10901", "capnproto_address": "node-3:19391"}
]
}
]
```

### Hashring management and autoscaling in Kubernetes

The [Thanos Receive Controller](https://github.com/observatorium/thanos-receive-controller) project aims to automate hashring management when running Thanos in Kubernetes. In combination with the Ketama hashring algorithm, this controller can also be used to keep hashrings up to date when Receivers are scaled automatically using an HPA or [Keda](https://keda.sh/).
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
)

require (
capnproto.org/go/capnp/v3 v3.0.0-alpha.30
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand Down Expand Up @@ -160,6 +161,7 @@ require (
k8s.io/client-go v0.30.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect
zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 // indirect
)

require (
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
capnproto.org/go/capnp/v3 v3.0.0-alpha.30 h1:iABQan/YiHFCgSXym5aNj27osapnEgAk4WaWYqb4sQM=
capnproto.org/go/capnp/v3 v3.0.0-alpha.30/go.mod h1:+ysMHvOh1EWNOyorxJWs1omhRFiDoKxKkWQACp54jKM=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -1228,6 +1230,8 @@ github.com/ovh/go-ovh v1.5.1 h1:P8O+7H+NQuFK9P/j4sFW5C0fvSS2DnHYGPwdVCp45wI=
github.com/ovh/go-ovh v1.5.1/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
Expand Down Expand Up @@ -1373,6 +1377,8 @@ github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g
github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
Expand Down Expand Up @@ -2250,3 +2256,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 h1:yksDCGMVzyn3vlyf0GZ3huiF5FFaMGQpQ3UJvR0EoGA=
zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE=
133 changes: 133 additions & 0 deletions pkg/receive/capnp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package receive

import (
"context"
"net"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/receive/writecapnp"
)

type CapNProtoServer struct {
listener net.Listener
server writecapnp.Writer
}

func NewCapNProtoServer(listener net.Listener, handler *CapNProtoHandler) *CapNProtoServer {
return &CapNProtoServer{
listener: listener,
server: writecapnp.Writer_ServerToClient(handler),
}
}

func (c *CapNProtoServer) ListenAndServe() error {
for {
conn, err := c.listener.Accept()
if err != nil {
return err
}
go func() {
defer conn.Close()
rpcConn := rpc.NewConn(rpc.NewPackedStreamTransport(conn), &rpc.Options{
// The BootstrapClient is the RPC interface that will be made available
// to the remote endpoint by default.
BootstrapClient: capnp.Client(c.server).AddRef(),
})
<-rpcConn.Done()
}()
}
}

func (c *CapNProtoServer) Shutdown() {
c.server.Release()
}

type CapNProtoHandler struct {
writer *CapNProtoWriter
logger log.Logger
}

func NewCapNProtoHandler(logger log.Logger, writer *CapNProtoWriter) *CapNProtoHandler {
return &CapNProtoHandler{logger: logger, writer: writer}
}

func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_write) error {
call.Go()
wr, err := call.Args().Wr()
if err != nil {
return err
}
t, err := wr.Tenant()
if err != nil {
return err
}
req, err := writecapnp.NewRequest(wr)
if err != nil {
return err
}
defer req.Close()

var errs writeErrors
errs.Add(c.writer.Write(ctx, t, req))
if err := errs.ErrOrNil(); err != nil {
level.Debug(c.logger).Log("msg", "failed to handle request", "err", err)
result, allocErr := call.AllocResults()
if allocErr != nil {
return allocErr
}

switch errors.Cause(err) {
case nil:
return nil
case errNotReady:
result.SetError(writecapnp.WriteError_unavailable)
case errUnavailable:
result.SetError(writecapnp.WriteError_unavailable)
case errConflict:
result.SetError(writecapnp.WriteError_alreadyExists)
case errBadReplica:
result.SetError(writecapnp.WriteError_invalidArgument)
default:
result.SetError(writecapnp.WriteError_internal)
}
}
return nil
}

type BufferedListener struct {
ctx context.Context
cancel context.CancelFunc

conns chan net.Conn
}

func (b BufferedListener) Accept() (net.Conn, error) {
select {
case <-b.ctx.Done():
return nil, b.ctx.Err()
case c := <-b.conns:
return c, nil
}
}

func (b BufferedListener) Close() error {
b.cancel()
return nil
}

func (b BufferedListener) Addr() net.Addr {
return addr{}
}

type addr struct{}

func (addr) Network() string { return "bufconn" }
func (addr) String() string { return "bufconn" }
Loading

0 comments on commit ee70d10

Please sign in to comment.