From 2e07a9f25088566b65675d0acd894b09380d2aa5 Mon Sep 17 00:00:00 2001 From: kenjones-cisco Date: Sat, 23 Jul 2016 22:36:08 -0400 Subject: [PATCH] nsqd: configurable HTTP client timeouts Adds configuration options HTTPClientConnectTimeout and HTTPClientRequestTimeout to control the connection and request timeout repectively of the HTTP client. Also added to the following app binaries: - nsqadmin - nsq_stat - nsq_to_file - nsq_to_http Closes #715 Closes #680 --- apps/nsq_stat/nsq_stat.go | 36 +++++++++++++++-------- apps/nsq_to_file/nsq_to_file.go | 27 +++++++++++++---- apps/nsq_to_http/http.go | 2 +- apps/nsq_to_http/nsq_to_http.go | 16 ++++++++-- apps/nsqadmin/main.go | 3 ++ apps/nsqd/nsqd.go | 2 ++ contrib/nsqd.cfg.example | 5 ++++ internal/auth/authorizations.go | 10 ++++--- internal/http_api/api_request.go | 14 ++++----- nsqadmin/bindata.go | 50 ++++++++++++++++---------------- nsqadmin/http.go | 11 +++---- nsqadmin/nsqadmin.go | 2 +- nsqadmin/options.go | 19 +++++++----- nsqd/client_v2.go | 3 +- nsqd/nsqd.go | 2 +- nsqd/nsqd_test.go | 13 ++++++--- nsqd/options.go | 21 +++++++++----- nsqlookupd/nsqlookupd_test.go | 17 +++++++---- 18 files changed, 162 insertions(+), 91 deletions(-) diff --git a/apps/nsq_stat/nsq_stat.go b/apps/nsq_stat/nsq_stat.go index 10c48a52d..6d8c34abc 100644 --- a/apps/nsq_stat/nsq_stat.go +++ b/apps/nsq_stat/nsq_stat.go @@ -22,14 +22,16 @@ import ( ) var ( - showVersion = flag.Bool("version", false, "print version") - topic = flag.String("topic", "", "NSQ topic") - channel = flag.String("channel", "", "NSQ channel") - statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output") - interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output") - countNum = numValue{} - nsqdHTTPAddrs = app.StringArray{} - lookupdHTTPAddrs = app.StringArray{} + showVersion = flag.Bool("version", false, "print version") + topic = flag.String("topic", "", "NSQ topic") + channel = flag.String("channel", "", "NSQ channel") + statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output") + interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output") + httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect") + httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request") + countNum = numValue{} + nsqdHTTPAddrs = app.StringArray{} + lookupdHTTPAddrs = app.StringArray{} ) type numValue struct { @@ -55,9 +57,9 @@ func init() { flag.Var(&countNum, "count", "number of reports") } -func statLoop(interval time.Duration, topic string, channel string, - nsqdTCPAddrs []string, lookupdHTTPAddrs []string) { - ci := clusterinfo.New(nil, http_api.NewClient(nil)) +func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration, + topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) { + ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)) var o *clusterinfo.ChannelStats for i := 0; !countNum.isSet || countNum.value >= i; i++ { var producers clusterinfo.Producers @@ -149,6 +151,16 @@ func main() { log.Fatal("--interval should be positive") } + connectTimeout := *httpConnectTimeout + if int64(connectTimeout) <= 0 { + log.Fatal("--http-client-connect-timeout should be positive") + } + + requestTimeout := *httpRequestTimeout + if int64(requestTimeout) <= 0 { + log.Fatal("--http-client-request-timeout should be positive") + } + if countNum.isSet && countNum.value <= 0 { log.Fatal("--count should be positive") } @@ -171,7 +183,7 @@ func main() { termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - go statLoop(intvl, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs) + go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs) <-termChan } diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index 4c76607ed..56a1defb5 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -45,6 +45,9 @@ var ( rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes") rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration") + httpConnectTimeout = flag.Duration("http-client-connect-timeout", 5*time.Second, "timeout for HTTP connect") + httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request") + nsqdTCPAddrs = app.StringArray{} lookupdHTTPAddrs = app.StringArray{} topics = app.StringArray{} @@ -404,8 +407,9 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool { return match } -func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) { - newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(addrs) +func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string, + connectTimeout time.Duration, requestTimeout time.Duration) { + newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(addrs) if err != nil { log.Printf("ERROR: could not retrieve topic list: %s", err) } @@ -438,13 +442,14 @@ func (t *TopicDiscoverer) hup() { } } -func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) { +func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string, + connectTimeout time.Duration, requestTimeout time.Duration) { ticker := time.Tick(*topicPollRate) for { select { case <-ticker: if sync { - t.syncTopics(addrs, pattern) + t.syncTopics(addrs, pattern, connectTimeout, requestTimeout) } case <-t.termChan: t.stop() @@ -474,6 +479,16 @@ func main() { log.Fatal("--channel is required") } + connectTimeout := *httpConnectTimeout + if int64(connectTimeout) <= 0 { + log.Fatal("--http-client-connect-timeout should be positive") + } + + requestTimeout := *httpRequestTimeout + if int64(requestTimeout) <= 0 { + log.Fatal("--http-client-request-timeout should be positive") + } + var topicsFromNSQLookupd bool if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 { @@ -516,7 +531,7 @@ func main() { } topicsFromNSQLookupd = true var err error - topics, err = clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(lookupdHTTPAddrs) + topics, err = clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(lookupdHTTPAddrs) if err != nil { log.Fatalf("ERROR: could not retrieve topic list: %s", err) } @@ -536,5 +551,5 @@ func main() { go discoverer.startTopicRouter(logger) } - discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern) + discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, connectTimeout, requestTimeout) } diff --git a/apps/nsq_to_http/http.go b/apps/nsq_to_http/http.go index e07e6f481..e56924066 100644 --- a/apps/nsq_to_http/http.go +++ b/apps/nsq_to_http/http.go @@ -13,7 +13,7 @@ var httpclient *http.Client var userAgent string func init() { - httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpTimeout)} + httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpConnectTimeout, *httpRequestTimeout), Timeout: *httpRequestTimeout} userAgent = fmt.Sprintf("nsq_to_http v%s", version.Binary) } diff --git a/apps/nsq_to_http/nsq_to_http.go b/apps/nsq_to_http/nsq_to_http.go index 89e37df37..8eaeb0527 100644 --- a/apps/nsq_to_http/nsq_to_http.go +++ b/apps/nsq_to_http/nsq_to_http.go @@ -42,9 +42,12 @@ var ( numPublishers = flag.Int("n", 100, "number of concurrent publishers") mode = flag.String("mode", "hostpool", "the upstream request mode options: multicast, round-robin, hostpool (default), epsilon-greedy") sample = flag.Float64("sample", 1.0, "% of messages to publish (float b/w 0 -> 1)") - httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)") - statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables") - contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests") + // TODO: remove; deprecated in favor of http-client-connect-timeout, http-client-request-timeout + httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)") + httpConnectTimeout = flag.Duration("http-client-connect-timeout", 20*time.Second, "timeout for HTTP connect") + httpRequestTimeout = flag.Duration("http-client-request-timeout", 20*time.Second, "timeout for HTTP request") + statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables") + contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests") getAddrs = app.StringArray{} postAddrs = app.StringArray{} @@ -249,6 +252,13 @@ func main() { *httpTimeout = time.Duration(*httpTimeoutMs) * time.Millisecond } + // TODO: remove, deprecated + if hasArg("http-timeout") { + log.Printf("WARNING: --http-timeout is deprecated in favor of --http-client-connect-timeout=X and --http-client-request-timeout=Y") + *httpConnectTimeout = *httpTimeout + *httpRequestTimeout = *httpTimeout + } + termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) diff --git a/apps/nsqadmin/main.go b/apps/nsqadmin/main.go index 3061f7d2b..281129c29 100644 --- a/apps/nsqadmin/main.go +++ b/apps/nsqadmin/main.go @@ -36,6 +36,9 @@ var ( notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent") + httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect") + httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request") + httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates") httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client") httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client") diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 163e28950..6c30ea32f 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -90,6 +90,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)") lookupdTCPAddrs := app.StringArray{} flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)") + flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect") + flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request") // diskqueue options flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages") diff --git a/contrib/nsqd.cfg.example b/contrib/nsqd.cfg.example index 68a199c57..cfd13e1e6 100644 --- a/contrib/nsqd.cfg.example +++ b/contrib/nsqd.cfg.example @@ -21,6 +21,11 @@ nsqlookupd_tcp_addresses = [ "127.0.0.1:4160" ] +## duration to wait before HTTP client connection timeout +http_client_connect_timeout = "2s" + +## duration to wait before HTTP client request timeout +http_client_request_timeout = "5s" ## path to store disk-backed messages # data_path = "/var/lib/nsq" diff --git a/internal/auth/authorizations.go b/internal/auth/authorizations.go index 7bb011aa5..b4d46e63a 100644 --- a/internal/auth/authorizations.go +++ b/internal/auth/authorizations.go @@ -76,9 +76,10 @@ func (a *State) IsExpired() bool { return false } -func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*State, error) { +func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string, + connectTimeout time.Duration, requestTimeout time.Duration) (*State, error) { for _, a := range authd { - authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret) + authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret, connectTimeout, requestTimeout) if err != nil { log.Printf("Error: failed auth against %s %s", a, err) continue @@ -88,7 +89,8 @@ func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*St return nil, errors.New("Unable to access auth server") } -func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error) { +func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string, + connectTimeout time.Duration, requestTimeout time.Duration) (*State, error) { v := url.Values{} v.Set("remote_ip", remoteIP) v.Set("tls", tlsEnabled) @@ -97,7 +99,7 @@ func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error) endpoint := fmt.Sprintf("http://%s/auth?%s", authd, v.Encode()) var authState State - client := http_api.NewClient(nil) + client := http_api.NewClient(nil, connectTimeout, requestTimeout) if err := client.GETV1(endpoint, &authState); err != nil { return nil, err } diff --git a/internal/http_api/api_request.go b/internal/http_api/api_request.go index 49569baab..fe192fb8c 100644 --- a/internal/http_api/api_request.go +++ b/internal/http_api/api_request.go @@ -19,25 +19,24 @@ type deadlinedConn struct { } func (c *deadlinedConn) Read(b []byte) (n int, err error) { - c.Conn.SetReadDeadline(time.Now().Add(c.Timeout)) return c.Conn.Read(b) } func (c *deadlinedConn) Write(b []byte) (n int, err error) { - c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout)) return c.Conn.Write(b) } // A custom http.Transport with support for deadline timeouts -func NewDeadlineTransport(timeout time.Duration) *http.Transport { +func NewDeadlineTransport(connectTimeout time.Duration, requestTimeout time.Duration) *http.Transport { transport := &http.Transport{ Dial: func(netw, addr string) (net.Conn, error) { - c, err := net.DialTimeout(netw, addr, timeout) + c, err := net.DialTimeout(netw, addr, connectTimeout) if err != nil { return nil, err } - return &deadlinedConn{timeout, c}, nil + return &deadlinedConn{connectTimeout, c}, nil }, + ResponseHeaderTimeout: requestTimeout, } return transport } @@ -46,12 +45,13 @@ type Client struct { c *http.Client } -func NewClient(tlsConfig *tls.Config) *Client { - transport := NewDeadlineTransport(2 * time.Second) +func NewClient(tlsConfig *tls.Config, connectTimeout time.Duration, requestTimeout time.Duration) *Client { + transport := NewDeadlineTransport(connectTimeout, requestTimeout) transport.TLSClientConfig = tlsConfig return &Client{ c: &http.Client{ Transport: transport, + Timeout: requestTimeout, }, } } diff --git a/nsqadmin/bindata.go b/nsqadmin/bindata.go index cf3d5c2fc..08f0ffdce 100644 --- a/nsqadmin/bindata.go +++ b/nsqadmin/bindata.go @@ -392,19 +392,19 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "base.css": baseCss, - "bootstrap.min.css": bootstrapMinCss, - "favicon.png": faviconPng, - "glyphicons-halflings-regular.eot": glyphiconsHalflingsRegularEot, - "glyphicons-halflings-regular.svg": glyphiconsHalflingsRegularSvg, - "glyphicons-halflings-regular.ttf": glyphiconsHalflingsRegularTtf, - "glyphicons-halflings-regular.woff": glyphiconsHalflingsRegularWoff, + "base.css": baseCss, + "bootstrap.min.css": bootstrapMinCss, + "favicon.png": faviconPng, + "glyphicons-halflings-regular.eot": glyphiconsHalflingsRegularEot, + "glyphicons-halflings-regular.svg": glyphiconsHalflingsRegularSvg, + "glyphicons-halflings-regular.ttf": glyphiconsHalflingsRegularTtf, + "glyphicons-halflings-regular.woff": glyphiconsHalflingsRegularWoff, "glyphicons-halflings-regular.woff2": glyphiconsHalflingsRegularWoff2, - "index.html": indexHtml, - "main.js": mainJs, - "main.js.map": mainJsMap, - "nsq_blue.png": nsq_bluePng, - "vendor.js": vendorJs, + "index.html": indexHtml, + "main.js": mainJs, + "main.js.map": mainJsMap, + "nsq_blue.png": nsq_bluePng, + "vendor.js": vendorJs, } // AssetDir returns the file names below a certain @@ -446,20 +446,21 @@ type bintree struct { Func func() (*asset, error) Children map[string]*bintree } + var _bintree = &bintree{nil, map[string]*bintree{ - "base.css": &bintree{baseCss, map[string]*bintree{}}, - "bootstrap.min.css": &bintree{bootstrapMinCss, map[string]*bintree{}}, - "favicon.png": &bintree{faviconPng, map[string]*bintree{}}, - "glyphicons-halflings-regular.eot": &bintree{glyphiconsHalflingsRegularEot, map[string]*bintree{}}, - "glyphicons-halflings-regular.svg": &bintree{glyphiconsHalflingsRegularSvg, map[string]*bintree{}}, - "glyphicons-halflings-regular.ttf": &bintree{glyphiconsHalflingsRegularTtf, map[string]*bintree{}}, - "glyphicons-halflings-regular.woff": &bintree{glyphiconsHalflingsRegularWoff, map[string]*bintree{}}, + "base.css": &bintree{baseCss, map[string]*bintree{}}, + "bootstrap.min.css": &bintree{bootstrapMinCss, map[string]*bintree{}}, + "favicon.png": &bintree{faviconPng, map[string]*bintree{}}, + "glyphicons-halflings-regular.eot": &bintree{glyphiconsHalflingsRegularEot, map[string]*bintree{}}, + "glyphicons-halflings-regular.svg": &bintree{glyphiconsHalflingsRegularSvg, map[string]*bintree{}}, + "glyphicons-halflings-regular.ttf": &bintree{glyphiconsHalflingsRegularTtf, map[string]*bintree{}}, + "glyphicons-halflings-regular.woff": &bintree{glyphiconsHalflingsRegularWoff, map[string]*bintree{}}, "glyphicons-halflings-regular.woff2": &bintree{glyphiconsHalflingsRegularWoff2, map[string]*bintree{}}, - "index.html": &bintree{indexHtml, map[string]*bintree{}}, - "main.js": &bintree{mainJs, map[string]*bintree{}}, - "main.js.map": &bintree{mainJsMap, map[string]*bintree{}}, - "nsq_blue.png": &bintree{nsq_bluePng, map[string]*bintree{}}, - "vendor.js": &bintree{vendorJs, map[string]*bintree{}}, + "index.html": &bintree{indexHtml, map[string]*bintree{}}, + "main.js": &bintree{mainJs, map[string]*bintree{}}, + "main.js.map": &bintree{mainJsMap, map[string]*bintree{}}, + "nsq_blue.png": &bintree{nsq_bluePng, map[string]*bintree{}}, + "vendor.js": &bintree{vendorJs, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory @@ -508,4 +509,3 @@ func _filePath(dir, name string) string { cannonicalName := strings.Replace(name, "\\", "/", -1) return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) } - diff --git a/nsqadmin/http.go b/nsqadmin/http.go index 22a4ad0d6..410ad28dc 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -30,7 +30,7 @@ func maybeWarnMsg(msgs []string) string { } // this is similar to httputil.NewSingleHostReverseProxy except it passes along basic auth -func NewSingleHostReverseProxy(target *url.URL, timeout time.Duration) *httputil.ReverseProxy { +func NewSingleHostReverseProxy(target *url.URL, connectTimeout time.Duration, requestTimeout time.Duration) *httputil.ReverseProxy { director := func(req *http.Request) { req.URL.Scheme = target.Scheme req.URL.Host = target.Host @@ -41,7 +41,7 @@ func NewSingleHostReverseProxy(target *url.URL, timeout time.Duration) *httputil } return &httputil.ReverseProxy{ Director: director, - Transport: http_api.NewDeadlineTransport(timeout), + Transport: http_api.NewDeadlineTransport(connectTimeout, requestTimeout), } } @@ -55,7 +55,8 @@ type httpServer struct { func NewHTTPServer(ctx *Context) *httpServer { log := http_api.Log(ctx.nsqadmin.getOpts().Logger) - client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig) + client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadmin.getOpts().HTTPClientConnectTimeout, + ctx.nsqadmin.getOpts().HTTPClientRequestTimeout) router := httprouter.New() router.HandleMethodNotAllowed = true @@ -83,7 +84,7 @@ func NewHTTPServer(ctx *Context) *httpServer { router.Handle("GET", "/static/:asset", http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText)) router.Handle("GET", "/fonts/:asset", http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText)) if s.ctx.nsqadmin.getOpts().ProxyGraphite { - proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second) + proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second, 20*time.Second) router.Handler("GET", "/render", proxy) } @@ -697,7 +698,7 @@ func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httpr if req.Method == "PUT" { // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) - readMax := int64(1024 * 1024 + 1) + readMax := int64(1024*1024 + 1) body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax)) if err != nil { return nil, http_api.Err{500, "INTERNAL_ERROR"} diff --git a/nsqadmin/nsqadmin.go b/nsqadmin/nsqadmin.go index 377fdefea..b8ca36d89 100644 --- a/nsqadmin/nsqadmin.go +++ b/nsqadmin/nsqadmin.go @@ -143,7 +143,7 @@ func (n *NSQAdmin) handleAdminActions() { if err != nil { n.logf("ERROR: failed to serialize admin action - %s", err) } - httpclient := &http.Client{Transport: http_api.NewDeadlineTransport(10 * time.Second)} + httpclient := &http.Client{Transport: http_api.NewDeadlineTransport(10*time.Second, 10*time.Second)} n.logf("POSTing notification to %s", n.getOpts().NotificationHTTPEndpoint) resp, err := httpclient.Post(n.getOpts().NotificationHTTPEndpoint, "application/json", bytes.NewBuffer(content)) diff --git a/nsqadmin/options.go b/nsqadmin/options.go index 0bc4ed74f..1a0290c26 100644 --- a/nsqadmin/options.go +++ b/nsqadmin/options.go @@ -22,6 +22,9 @@ type Options struct { NSQLookupdHTTPAddresses []string `flag:"lookupd-http-address" cfg:"nsqlookupd_http_addresses"` NSQDHTTPAddresses []string `flag:"nsqd-http-address" cfg:"nsqd_http_addresses"` + HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout"` + HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout"` + HTTPClientTLSInsecureSkipVerify bool `flag:"http-client-tls-insecure-skip-verify"` HTTPClientTLSRootCAFile string `flag:"http-client-tls-root-ca-file"` HTTPClientTLSCert string `flag:"http-client-tls-cert"` @@ -34,12 +37,14 @@ type Options struct { func NewOptions() *Options { return &Options{ - HTTPAddress: "0.0.0.0:4171", - UseStatsdPrefixes: true, - StatsdPrefix: "nsq.%s", - StatsdCounterFormat: "stats.counters.%s.count", - StatsdGaugeFormat: "stats.gauges.%s", - StatsdInterval: 60 * time.Second, - Logger: log.New(os.Stderr, "[nsqadmin] ", log.Ldate|log.Ltime|log.Lmicroseconds), + HTTPAddress: "0.0.0.0:4171", + UseStatsdPrefixes: true, + StatsdPrefix: "nsq.%s", + StatsdCounterFormat: "stats.counters.%s.count", + StatsdGaugeFormat: "stats.gauges.%s", + StatsdInterval: 60 * time.Second, + HTTPClientConnectTimeout: 2 * time.Second, + HTTPClientRequestTimeout: 5 * time.Second, + Logger: log.New(os.Stderr, "[nsqadmin] ", log.Ldate|log.Ltime|log.Lmicroseconds), } } diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 3318bb218..6e1bd67e4 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -574,7 +574,8 @@ func (c *clientV2) QueryAuthd() error { } authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses, - remoteIP, tlsEnabled, c.AuthSecret) + remoteIP, tlsEnabled, c.AuthSecret, c.ctx.nsqd.getOpts().HTTPClientConnectTimeout, + c.ctx.nsqd.getOpts().HTTPClientRequestTimeout) if err != nil { return err } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 0a161b88d..f2afc9916 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -84,7 +84,7 @@ func New(opts *Options) *NSQD { exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), - ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil)), + ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)), dl: dirlock.New(dataPath), } n.swapOpts(opts) diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index f2b92cf48..785866b81 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -20,6 +20,11 @@ import ( "github.com/nsqio/nsq/nsqlookupd" ) +const ( + ConnectTimeout = 5 * time.Second + RequestTimeout = 5 * time.Second +) + func assert(t *testing.T, condition bool, msg string, v ...interface{}) { if !condition { _, file, line, _ := runtime.Caller(1) @@ -80,7 +85,7 @@ func getMetadata(n *NSQD) (*simplejson.Json, error) { func API(endpoint string) (data *simplejson.Json, err error) { d := make(map[string]interface{}) - err = http_api.NewClient(nil).NegotiateV1(endpoint, &d) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &d) data = simplejson.New() data.SetPath(nil, d) return @@ -373,11 +378,11 @@ func TestCluster(t *testing.T) { equal(t, err, nil) url := fmt.Sprintf("http://%s/topic/create?topic=%s", nsqd.RealHTTPAddr(), topicName) - err = http_api.NewClient(nil).POSTV1(url) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url) equal(t, err, nil) url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=ch", nsqd.RealHTTPAddr(), topicName) - err = http_api.NewClient(nil).POSTV1(url) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url) equal(t, err, nil) // allow some time for nsqd to push info to nsqlookupd @@ -425,7 +430,7 @@ func TestCluster(t *testing.T) { equal(t, channel, "ch") url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqd.RealHTTPAddr(), topicName) - err = http_api.NewClient(nil).POSTV1(url) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url) equal(t, err, nil) // allow some time for nsqd to push info to nsqlookupd diff --git a/nsqd/options.go b/nsqd/options.go index 72641d51c..1a38fbaaa 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -12,14 +12,16 @@ import ( type Options struct { // basic options - ID int64 `flag:"worker-id" cfg:"id"` - Verbose bool `flag:"verbose"` - TCPAddress string `flag:"tcp-address"` - HTTPAddress string `flag:"http-address"` - HTTPSAddress string `flag:"https-address"` - BroadcastAddress string `flag:"broadcast-address"` - NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` - AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` + ID int64 `flag:"worker-id" cfg:"id"` + Verbose bool `flag:"verbose"` + TCPAddress string `flag:"tcp-address"` + HTTPAddress string `flag:"http-address"` + HTTPSAddress string `flag:"https-address"` + BroadcastAddress string `flag:"broadcast-address"` + NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` + AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` + HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"` + HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"` // diskqueue options DataPath string `flag:"data-path"` @@ -95,6 +97,9 @@ func NewOptions() *Options { NSQLookupdTCPAddresses: make([]string, 0), AuthHTTPAddresses: make([]string, 0), + HTTPClientConnectTimeout: 2 * time.Second, + HTTPClientRequestTimeout: 5 * time.Second, + MemQueueSize: 10000, MaxBytesPerFile: 100 * 1024 * 1024, SyncEvery: 2500, diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index 33de0e560..2a8e44712 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -15,6 +15,11 @@ import ( "github.com/nsqio/nsq/internal/http_api" ) +const ( + ConnectTimeout = 5 * time.Second + RequestTimeout = 5 * time.Second +) + func equal(t *testing.T, act, exp interface{}) { if !reflect.DeepEqual(exp, act) { _, file, line, _ := runtime.Caller(1) @@ -83,7 +88,7 @@ func identify(t *testing.T, conn net.Conn, address string, tcpPort int, httpPort func API(endpoint string) (data *simplejson.Json, err error) { d := make(map[string]interface{}) - err = http_api.NewClient(nil).NegotiateV1(endpoint, &d) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).NegotiateV1(endpoint, &d) data = simplejson.New() data.SetPath(nil, d) return @@ -263,7 +268,7 @@ func TestTombstoneRecover(t *testing.T) { endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", httpAddr, topicName, "ip.address:5555") - err = http_api.NewClient(nil).POSTV1(endpoint) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(endpoint) equal(t, err, nil) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) @@ -307,7 +312,7 @@ func TestTombstoneUnregister(t *testing.T) { endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", httpAddr, topicName, "ip.address:5555") - err = http_api.NewClient(nil).POSTV1(endpoint) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(endpoint) equal(t, err, nil) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) @@ -349,7 +354,7 @@ func TestInactiveNodes(t *testing.T) { _, err := nsq.ReadResponse(conn) equal(t, err, nil) - ci := clusterinfo.New(nil, http_api.NewClient(nil)) + ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout)) producers, _ := ci.GetLookupdProducers(lookupdHTTPAddrs) equal(t, len(producers), 1) @@ -382,7 +387,7 @@ func TestTombstonedNodes(t *testing.T) { _, err := nsq.ReadResponse(conn) equal(t, err, nil) - ci := clusterinfo.New(nil, http_api.NewClient(nil)) + ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout)) producers, _ := ci.GetLookupdProducers(lookupdHTTPAddrs) equal(t, len(producers), 1) @@ -392,7 +397,7 @@ func TestTombstonedNodes(t *testing.T) { endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", httpAddr, topicName, "ip.address:5555") - err = http_api.NewClient(nil).POSTV1(endpoint) + err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(endpoint) equal(t, err, nil) producers, _ = ci.GetLookupdProducers(lookupdHTTPAddrs)