Skip to content

Commit

Permalink
nsqd: configurable HTTP client timeouts
Browse files Browse the repository at this point in the history
Adds configuration options HTTPClientConnectTimeout and
HTTPClientRequestTimeout to control the connection and request timeout
repectively of the HTTP client.

Also added to the following app binaries:

- nsqadmin
- nsq_stat
- nsq_to_file
- nsq_to_http

Closes nsqio#715
Closes nsqio#680
  • Loading branch information
kenjones-cisco committed Jul 24, 2016
1 parent 22c8c64 commit d6c0ce9
Show file tree
Hide file tree
Showing 16 changed files with 104 additions and 36 deletions.
20 changes: 16 additions & 4 deletions apps/nsq_stat/nsq_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
channel = flag.String("channel", "", "NSQ channel")
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 2*time.Second, "timeout for HTTP read/write (each)")
countNum = numValue{}
nsqdHTTPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
Expand Down Expand Up @@ -55,9 +57,9 @@ func init() {
flag.Var(&countNum, "count", "number of reports")
}

func statLoop(interval time.Duration, topic string, channel string,
nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
ci := clusterinfo.New(nil, http_api.NewClient(nil))
func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration,
topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout))
var o *clusterinfo.ChannelStats
for i := 0; !countNum.isSet || countNum.value >= i; i++ {
var producers clusterinfo.Producers
Expand Down Expand Up @@ -149,6 +151,16 @@ func main() {
log.Fatal("--interval should be positive")
}

connectTimeout := *httpConnectTimeout
if int64(connectTimeout) <= 0 {
log.Fatal("--http-client-connect-timeout should be positive")
}

requestTimeout := *httpRequestTimeout
if int64(requestTimeout) <= 0 {
log.Fatal("--http-client-request-timeout should be positive")
}

if countNum.isSet && countNum.value <= 0 {
log.Fatal("--count should be positive")
}
Expand All @@ -171,7 +183,7 @@ func main() {
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

go statLoop(intvl, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)

<-termChan
}
27 changes: 21 additions & 6 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 2*time.Second, "timeout for HTTP read/write (each)")

nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
Expand Down Expand Up @@ -404,8 +407,9 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool {
return match
}

func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(addrs)
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string,
connectTimeout time.Duration, requestTimeout time.Duration) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(addrs)
if err != nil {
log.Printf("ERROR: could not retrieve topic list: %s", err)
}
Expand Down Expand Up @@ -438,13 +442,14 @@ func (t *TopicDiscoverer) hup() {
}
}

func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string,
connectTimeout time.Duration, requestTimeout time.Duration) {
ticker := time.Tick(*topicPollRate)
for {
select {
case <-ticker:
if sync {
t.syncTopics(addrs, pattern)
t.syncTopics(addrs, pattern, connectTimeout, requestTimeout)
}
case <-t.termChan:
t.stop()
Expand Down Expand Up @@ -474,6 +479,16 @@ func main() {
log.Fatal("--channel is required")
}

connectTimeout := *httpConnectTimeout
if int64(connectTimeout) <= 0 {
log.Fatal("--http-client-connect-timeout should be positive")
}

requestTimeout := *httpRequestTimeout
if int64(requestTimeout) <= 0 {
log.Fatal("--http-client-request-timeout should be positive")
}

var topicsFromNSQLookupd bool

if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
Expand Down Expand Up @@ -516,7 +531,7 @@ func main() {
}
topicsFromNSQLookupd = true
var err error
topics, err = clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(lookupdHTTPAddrs)
topics, err = clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(lookupdHTTPAddrs)
if err != nil {
log.Fatalf("ERROR: could not retrieve topic list: %s", err)
}
Expand All @@ -536,5 +551,5 @@ func main() {
go discoverer.startTopicRouter(logger)
}

discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern)
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, connectTimeout, requestTimeout)
}
2 changes: 1 addition & 1 deletion apps/nsq_to_http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var httpclient *http.Client
var userAgent string

func init() {
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpTimeout)}
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpConnectTimeout, *httpRequestTimeout)}
userAgent = fmt.Sprintf("nsq_to_http v%s", version.Binary)
}

Expand Down
10 changes: 10 additions & 0 deletions apps/nsq_to_http/nsq_to_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ var (
numPublishers = flag.Int("n", 100, "number of concurrent publishers")
mode = flag.String("mode", "hostpool", "the upstream request mode options: multicast, round-robin, hostpool (default), epsilon-greedy")
sample = flag.Float64("sample", 1.0, "% of messages to publish (float b/w 0 -> 1)")
// deprecate? in favor of http-client-connect-timeout, http-client-request-timeout
httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)")
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 20*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 20*time.Second, "timeout for HTTP read/write (each)")
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")

Expand Down Expand Up @@ -249,6 +252,13 @@ func main() {
*httpTimeout = time.Duration(*httpTimeoutMs) * time.Millisecond
}

// TODO: remove, deprecated
if hasArg("http-timeout") {
log.Printf("WARNING: --http-timeout is deprecated in favor of --http-client-connect-timeout=X and --http-client-request-timeout=Y")
*httpConnectTimeout = *httpTimeout
*httpRequestTimeout = *httpTimeout
}

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

Expand Down
3 changes: 3 additions & 0 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ var (

notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")

httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 2*time.Second, "timeout for HTTP read/write (each)")

httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
Expand Down
2 changes: 2 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs := app.StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP read/write (each)")

// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
Expand Down
10 changes: 6 additions & 4 deletions internal/auth/authorizations.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ func (a *State) IsExpired() bool {
return false
}

func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*State, error) {
func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string,
clientTimeout time.Duration, requestTimeout time.Duration) (*State, error) {
for _, a := range authd {
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret)
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret, clientTimeout, requestTimeout)
if err != nil {
log.Printf("Error: failed auth against %s %s", a, err)
continue
Expand All @@ -88,7 +89,8 @@ func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*St
return nil, errors.New("Unable to access auth server")
}

func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error) {
func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string,
clientTimeout time.Duration, requestTimeout time.Duration) (*State, error) {
v := url.Values{}
v.Set("remote_ip", remoteIP)
v.Set("tls", tlsEnabled)
Expand All @@ -97,7 +99,7 @@ func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error)
endpoint := fmt.Sprintf("http://%s/auth?%s", authd, v.Encode())

var authState State
client := http_api.NewClient(nil)
client := http_api.NewClient(nil, clientTimeout, requestTimeout)
if err := client.GETV1(endpoint, &authState); err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions internal/http_api/api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type deadlinedConn struct {
}

func (c *deadlinedConn) Read(b []byte) (n int, err error) {
c.Conn.SetReadDeadline(time.Now().Add(c.Timeout))
return c.Conn.Read(b)
}

Expand All @@ -29,7 +28,7 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
}

// A custom http.Transport with support for deadline timeouts
func NewDeadlineTransport(timeout time.Duration) *http.Transport {
func NewDeadlineTransport(timeout time.Duration, requestTimeout time.Duration) *http.Transport {
transport := &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
Expand All @@ -38,6 +37,7 @@ func NewDeadlineTransport(timeout time.Duration) *http.Transport {
}
return &deadlinedConn{timeout, c}, nil
},
ResponseHeaderTimeout: requestTimeout,
}
return transport
}
Expand All @@ -46,12 +46,13 @@ type Client struct {
c *http.Client
}

func NewClient(tlsConfig *tls.Config) *Client {
transport := NewDeadlineTransport(2 * time.Second)
func NewClient(tlsConfig *tls.Config, clientTimeout time.Duration, requestTimeout time.Duration) *Client {
transport := NewDeadlineTransport(clientTimeout, requestTimeout)
transport.TLSClientConfig = tlsConfig
return &Client{
c: &http.Client{
Transport: transport,
Timeout: requestTimeout,
},
}
}
Expand Down
10 changes: 6 additions & 4 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func maybeWarnMsg(msgs []string) string {
}

// this is similar to httputil.NewSingleHostReverseProxy except it passes along basic auth
func NewSingleHostReverseProxy(target *url.URL, timeout time.Duration) *httputil.ReverseProxy {
func NewSingleHostReverseProxy(target *url.URL, clientTimeout time.Duration,
requestTimeout time.Duration) *httputil.ReverseProxy {
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
Expand All @@ -41,7 +42,7 @@ func NewSingleHostReverseProxy(target *url.URL, timeout time.Duration) *httputil
}
return &httputil.ReverseProxy{
Director: director,
Transport: http_api.NewDeadlineTransport(timeout),
Transport: http_api.NewDeadlineTransport(clientTimeout, requestTimeout),
}
}

Expand All @@ -55,7 +56,8 @@ type httpServer struct {
func NewHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqadmin.getOpts().Logger)

client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig)
client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadmin.getOpts().HTTPClientConnectTimeout,
ctx.nsqadmin.getOpts().HTTPClientRequestTimeout)

router := httprouter.New()
router.HandleMethodNotAllowed = true
Expand Down Expand Up @@ -83,7 +85,7 @@ func NewHTTPServer(ctx *Context) *httpServer {
router.Handle("GET", "/static/:asset", http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
router.Handle("GET", "/fonts/:asset", http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
if s.ctx.nsqadmin.getOpts().ProxyGraphite {
proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second)
proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second, 20*time.Second)
router.Handler("GET", "/render", proxy)
}

Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/nsqadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (n *NSQAdmin) handleAdminActions() {
if err != nil {
n.logf("ERROR: failed to serialize admin action - %s", err)
}
httpclient := &http.Client{Transport: http_api.NewDeadlineTransport(10 * time.Second)}
httpclient := &http.Client{Transport: http_api.NewDeadlineTransport(10 * time.Second, 10 * time.Second)}
n.logf("POSTing notification to %s", n.getOpts().NotificationHTTPEndpoint)
resp, err := httpclient.Post(n.getOpts().NotificationHTTPEndpoint,
"application/json", bytes.NewBuffer(content))
Expand Down
5 changes: 5 additions & 0 deletions nsqadmin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Options struct {
NSQLookupdHTTPAddresses []string `flag:"lookupd-http-address" cfg:"nsqlookupd_http_addresses"`
NSQDHTTPAddresses []string `flag:"nsqd-http-address" cfg:"nsqd_http_addresses"`

HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout"`
HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout"`

HTTPClientTLSInsecureSkipVerify bool `flag:"http-client-tls-insecure-skip-verify"`
HTTPClientTLSRootCAFile string `flag:"http-client-tls-root-ca-file"`
HTTPClientTLSCert string `flag:"http-client-tls-cert"`
Expand All @@ -40,6 +43,8 @@ func NewOptions() *Options {
StatsdCounterFormat: "stats.counters.%s.count",
StatsdGaugeFormat: "stats.gauges.%s",
StatsdInterval: 60 * time.Second,
HTTPClientConnectTimeout: 2 * time.Second,
HTTPClientRequestTimeout: 2 * time.Second,
Logger: log.New(os.Stderr, "[nsqadmin] ", log.Ldate|log.Ltime|log.Lmicroseconds),
}
}
3 changes: 2 additions & 1 deletion nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func (c *clientV2) QueryAuthd() error {
}

authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses,
remoteIP, tlsEnabled, c.AuthSecret)
remoteIP, tlsEnabled, c.AuthSecret, c.ctx.nsqd.getOpts().HTTPClientConnectTimeout,
c.ctx.nsqd.getOpts().HTTPClientRequestTimeout)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(opts *Options) *NSQD {
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil)),
ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)),
dl: dirlock.New(dataPath),
}
n.swapOpts(opts)
Expand Down
13 changes: 9 additions & 4 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/nsqio/nsq/nsqlookupd"
)

const (
ConnectTimeout = 2 * time.Second
RequestTimeout = 2 * time.Second
)

func assert(t *testing.T, condition bool, msg string, v ...interface{}) {
if !condition {
_, file, line, _ := runtime.Caller(1)
Expand Down Expand Up @@ -80,7 +85,7 @@ func getMetadata(n *NSQD) (*simplejson.Json, error) {

func API(endpoint string) (data *simplejson.Json, err error) {
d := make(map[string]interface{})
err = http_api.NewClient(nil).NegotiateV1(endpoint, &d)
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &d)
data = simplejson.New()
data.SetPath(nil, d)
return
Expand Down Expand Up @@ -373,11 +378,11 @@ func TestCluster(t *testing.T) {
equal(t, err, nil)

url := fmt.Sprintf("http://%s/topic/create?topic=%s", nsqd.RealHTTPAddr(), topicName)
err = http_api.NewClient(nil).POSTV1(url)
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url)
equal(t, err, nil)

url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=ch", nsqd.RealHTTPAddr(), topicName)
err = http_api.NewClient(nil).POSTV1(url)
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url)
equal(t, err, nil)

// allow some time for nsqd to push info to nsqlookupd
Expand Down Expand Up @@ -425,7 +430,7 @@ func TestCluster(t *testing.T) {
equal(t, channel, "ch")

url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqd.RealHTTPAddr(), topicName)
err = http_api.NewClient(nil).POSTV1(url)
err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url)
equal(t, err, nil)

// allow some time for nsqd to push info to nsqlookupd
Expand Down
5 changes: 5 additions & 0 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Options struct {
BroadcastAddress string `flag:"broadcast-address"`
NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"`
HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout"`
HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout"`

// diskqueue options
DataPath string `flag:"data-path"`
Expand Down Expand Up @@ -95,6 +97,9 @@ func NewOptions() *Options {
NSQLookupdTCPAddresses: make([]string, 0),
AuthHTTPAddresses: make([]string, 0),

HTTPClientConnectTimeout: 2 * time.Second,
HTTPClientRequestTimeout: 2 * time.Second,

MemQueueSize: 10000,
MaxBytesPerFile: 100 * 1024 * 1024,
SyncEvery: 2500,
Expand Down
Loading

0 comments on commit d6c0ce9

Please sign in to comment.