Skip to content

Commit

Permalink
receive: allow custom connection pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
blockloop committed Jan 8, 2020
1 parent 9c84435 commit d313c74
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
9 changes: 8 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()

tsdbBlockDuration := modelDuration(cmd.Flag("tsdb.block-duration", "Duration for local TSDB blocks").Default("2h").Hidden())
connectionPoolSize := cmd.Flag("receive.connection-pool-size", "Controls the http MaxIdleConns. Default is 0, which is unlimited").Int()
connectionPoolSizePerHost := cmd.Flag("receive.connection-pool-size-per-host", "Controls the http MaxIdleConnsPerHost").Default("100").Int()

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
Expand Down Expand Up @@ -130,6 +131,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*replicationFactor,
*tsdbBlockDuration,
comp,
*connectionPoolSize,
*connectionPoolSizePerHost,
)
}
}
Expand Down Expand Up @@ -165,6 +168,8 @@ func runReceive(
replicationFactor uint64,
tsdbBlockDuration model.Duration,
comp component.SourceStoreAPI,
connectionPoolSize int,
connectionPoolSizePerHost int,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand Down Expand Up @@ -196,6 +201,8 @@ func runReceive(
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
ConnectionPoolSize: connectionPoolSize,
ConnectionPoolSizePerHost: connectionPoolSizePerHost,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down
25 changes: 15 additions & 10 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ var conflictErr = errors.New("conflict")

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
Endpoint string
TenantHeader string
ReplicaHeader string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
TLSClientConfig *tls.Config
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
Endpoint string
TenantHeader string
ReplicaHeader string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
TLSClientConfig *tls.Config
ConnectionPoolSize int
ConnectionPoolSizePerHost int
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -76,6 +78,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = o.TLSClientConfig
transport.MaxIdleConnsPerHost = o.ConnectionPoolSizePerHost
transport.MaxIdleConns = o.ConnectionPoolSize

client := &http.Client{Transport: transport}
if o.Tracer != nil {
client.Transport = tracing.HTTPTripperware(logger, client.Transport)
Expand Down

0 comments on commit d313c74

Please sign in to comment.