Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: allow custom connection pooling #1966

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

- [#1966](https://github.com/thanos-io/thanos/issues/1966) Receive: Allow custom connection pool sizing

### Fixed

- [#1919](https://github.com/thanos-io/thanos/issues/1919) Compactor: Fixed potential data loss when uploading older blocks, or upload taking long time while compactor is
Expand Down
16 changes: 15 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/exthttp"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
Expand Down Expand Up @@ -72,6 +73,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

tsdbBlockDuration := modelDuration(cmd.Flag("tsdb.block-duration", "Duration for local TSDB blocks").Default("2h").Hidden())
blockloop marked this conversation as resolved.
Show resolved Hide resolved

connectionPoolSize := cmd.Flag("receive.connection-pool-size", "Controls the http MaxIdleConns. Default is 0, which is unlimited").Int()
connectionPoolSizePerHost := cmd.Flag("receive.connection-pool-size-per-host", "Controls the http MaxIdleConnsPerHost").Default("100").Int()

walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
Expand Down Expand Up @@ -138,6 +142,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*replicaHeader,
*replicationFactor,
comp,
*connectionPoolSize,
*connectionPoolSizePerHost,
)
}
}
Expand Down Expand Up @@ -172,6 +178,8 @@ func runReceive(
replicaHeader string,
replicationFactor uint64,
comp component.SourceStoreAPI,
connectionPoolSize int,
connectionPoolSizePerHost int,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand All @@ -185,6 +193,12 @@ func runReceive(
if err != nil {
return err
}

transport := exthttp.NewTransport()
transport.MaxIdleConns = connectionPoolSize
transport.MaxIdleConnsPerHost = connectionPoolSizePerHost
transport.TLSClientConfig = rwTLSClientConfig

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
ListenAddress: rwAddress,
Registry: reg,
Expand All @@ -194,7 +208,7 @@ func runReceive(
ReplicationFactor: replicationFactor,
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
Transport: transport,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down
24 changes: 24 additions & 0 deletions pkg/exthttp/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package exthttp

import (
"net"
"net/http"
"time"
)

// NewTransport creates a new http.Transport with default settings
func NewTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
}
11 changes: 7 additions & 4 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/exthttp"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
Expand All @@ -50,7 +51,7 @@ type Options struct {
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
TLSClientConfig *tls.Config
Transport *http.Transport
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -74,9 +75,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
logger = log.NewNopLogger()
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = o.TLSClientConfig
client := &http.Client{Transport: transport}
if o.Transport == nil {
o.Transport = exthttp.NewTransport()
}

client := &http.Client{Transport: o.Transport}
if o.Tracer != nil {
client.Transport = tracing.HTTPTripperware(logger, client.Transport)
}
Expand Down