Skip to content

Commit

Permalink
nsqd: configurable HTTP client timeouts
Browse files Browse the repository at this point in the history
Adds configuration options HTTPClientConnectTimeout and
HTTPClientRequestTimeout to control the connection and request timeout
repectively of the HTTP client.

Also added to the following app binaries:

- nsqadmin
- nsq_stat
- nsq_to_file
- nsq_to_http

Closes nsqio#715
Closes nsqio#680
  • Loading branch information
kenjones-cisco committed Jul 27, 2016
1 parent 22c8c64 commit 2e07a9f
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 91 deletions.
36 changes: 24 additions & 12 deletions apps/nsq_stat/nsq_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import (
)

var (
showVersion = flag.Bool("version", false, "print version")
topic = flag.String("topic", "", "NSQ topic")
channel = flag.String("channel", "", "NSQ channel")
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
countNum = numValue{}
nsqdHTTPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
showVersion = flag.Bool("version", false, "print version")
topic = flag.String("topic", "", "NSQ topic")
channel = flag.String("channel", "", "NSQ channel")
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
countNum = numValue{}
nsqdHTTPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)

type numValue struct {
Expand All @@ -55,9 +57,9 @@ func init() {
flag.Var(&countNum, "count", "number of reports")
}

func statLoop(interval time.Duration, topic string, channel string,
nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
ci := clusterinfo.New(nil, http_api.NewClient(nil))
func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration,
topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout))
var o *clusterinfo.ChannelStats
for i := 0; !countNum.isSet || countNum.value >= i; i++ {
var producers clusterinfo.Producers
Expand Down Expand Up @@ -149,6 +151,16 @@ func main() {
log.Fatal("--interval should be positive")
}

connectTimeout := *httpConnectTimeout
if int64(connectTimeout) <= 0 {
log.Fatal("--http-client-connect-timeout should be positive")
}

requestTimeout := *httpRequestTimeout
if int64(requestTimeout) <= 0 {
log.Fatal("--http-client-request-timeout should be positive")
}

if countNum.isSet && countNum.value <= 0 {
log.Fatal("--count should be positive")
}
Expand All @@ -171,7 +183,7 @@ func main() {
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

go statLoop(intvl, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)

<-termChan
}
27 changes: 21 additions & 6 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

httpConnectTimeout = flag.Duration("http-client-connect-timeout", 5*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")

nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
Expand Down Expand Up @@ -404,8 +407,9 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool {
return match
}

func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(addrs)
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string,
connectTimeout time.Duration, requestTimeout time.Duration) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(addrs)
if err != nil {
log.Printf("ERROR: could not retrieve topic list: %s", err)
}
Expand Down Expand Up @@ -438,13 +442,14 @@ func (t *TopicDiscoverer) hup() {
}
}

func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string,
connectTimeout time.Duration, requestTimeout time.Duration) {
ticker := time.Tick(*topicPollRate)
for {
select {
case <-ticker:
if sync {
t.syncTopics(addrs, pattern)
t.syncTopics(addrs, pattern, connectTimeout, requestTimeout)
}
case <-t.termChan:
t.stop()
Expand Down Expand Up @@ -474,6 +479,16 @@ func main() {
log.Fatal("--channel is required")
}

connectTimeout := *httpConnectTimeout
if int64(connectTimeout) <= 0 {
log.Fatal("--http-client-connect-timeout should be positive")
}

requestTimeout := *httpRequestTimeout
if int64(requestTimeout) <= 0 {
log.Fatal("--http-client-request-timeout should be positive")
}

var topicsFromNSQLookupd bool

if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
Expand Down Expand Up @@ -516,7 +531,7 @@ func main() {
}
topicsFromNSQLookupd = true
var err error
topics, err = clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(lookupdHTTPAddrs)
topics, err = clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(lookupdHTTPAddrs)
if err != nil {
log.Fatalf("ERROR: could not retrieve topic list: %s", err)
}
Expand All @@ -536,5 +551,5 @@ func main() {
go discoverer.startTopicRouter(logger)
}

discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern)
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, connectTimeout, requestTimeout)
}
2 changes: 1 addition & 1 deletion apps/nsq_to_http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var httpclient *http.Client
var userAgent string

func init() {
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpTimeout)}
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpConnectTimeout, *httpRequestTimeout), Timeout: *httpRequestTimeout}
userAgent = fmt.Sprintf("nsq_to_http v%s", version.Binary)
}

Expand Down
16 changes: 13 additions & 3 deletions apps/nsq_to_http/nsq_to_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ var (
numPublishers = flag.Int("n", 100, "number of concurrent publishers")
mode = flag.String("mode", "hostpool", "the upstream request mode options: multicast, round-robin, hostpool (default), epsilon-greedy")
sample = flag.Float64("sample", 1.0, "% of messages to publish (float b/w 0 -> 1)")
httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)")
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")
// TODO: remove; deprecated in favor of http-client-connect-timeout, http-client-request-timeout
httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)")
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 20*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flag.Duration("http-client-request-timeout", 20*time.Second, "timeout for HTTP request")
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")

getAddrs = app.StringArray{}
postAddrs = app.StringArray{}
Expand Down Expand Up @@ -249,6 +252,13 @@ func main() {
*httpTimeout = time.Duration(*httpTimeoutMs) * time.Millisecond
}

// TODO: remove, deprecated
if hasArg("http-timeout") {
log.Printf("WARNING: --http-timeout is deprecated in favor of --http-client-connect-timeout=X and --http-client-request-timeout=Y")
*httpConnectTimeout = *httpTimeout
*httpRequestTimeout = *httpTimeout
}

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

Expand Down
3 changes: 3 additions & 0 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ var (

notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")

httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")

httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
Expand Down
2 changes: 2 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs := app.StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")

// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
Expand Down
5 changes: 5 additions & 0 deletions contrib/nsqd.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ nsqlookupd_tcp_addresses = [
"127.0.0.1:4160"
]

## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"

## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"

## path to store disk-backed messages
# data_path = "/var/lib/nsq"
Expand Down
10 changes: 6 additions & 4 deletions internal/auth/authorizations.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ func (a *State) IsExpired() bool {
return false
}

func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*State, error) {
func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string,
connectTimeout time.Duration, requestTimeout time.Duration) (*State, error) {
for _, a := range authd {
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret)
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret, connectTimeout, requestTimeout)
if err != nil {
log.Printf("Error: failed auth against %s %s", a, err)
continue
Expand All @@ -88,7 +89,8 @@ func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*St
return nil, errors.New("Unable to access auth server")
}

func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error) {
func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string,
connectTimeout time.Duration, requestTimeout time.Duration) (*State, error) {
v := url.Values{}
v.Set("remote_ip", remoteIP)
v.Set("tls", tlsEnabled)
Expand All @@ -97,7 +99,7 @@ func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error)
endpoint := fmt.Sprintf("http://%s/auth?%s", authd, v.Encode())

var authState State
client := http_api.NewClient(nil)
client := http_api.NewClient(nil, connectTimeout, requestTimeout)
if err := client.GETV1(endpoint, &authState); err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions internal/http_api/api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@ type deadlinedConn struct {
}

func (c *deadlinedConn) Read(b []byte) (n int, err error) {
c.Conn.SetReadDeadline(time.Now().Add(c.Timeout))
return c.Conn.Read(b)
}

func (c *deadlinedConn) Write(b []byte) (n int, err error) {
c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout))
return c.Conn.Write(b)
}

// A custom http.Transport with support for deadline timeouts
func NewDeadlineTransport(timeout time.Duration) *http.Transport {
func NewDeadlineTransport(connectTimeout time.Duration, requestTimeout time.Duration) *http.Transport {
transport := &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
c, err := net.DialTimeout(netw, addr, connectTimeout)
if err != nil {
return nil, err
}
return &deadlinedConn{timeout, c}, nil
return &deadlinedConn{connectTimeout, c}, nil
},
ResponseHeaderTimeout: requestTimeout,
}
return transport
}
Expand All @@ -46,12 +45,13 @@ type Client struct {
c *http.Client
}

func NewClient(tlsConfig *tls.Config) *Client {
transport := NewDeadlineTransport(2 * time.Second)
func NewClient(tlsConfig *tls.Config, connectTimeout time.Duration, requestTimeout time.Duration) *Client {
transport := NewDeadlineTransport(connectTimeout, requestTimeout)
transport.TLSClientConfig = tlsConfig
return &Client{
c: &http.Client{
Transport: transport,
Timeout: requestTimeout,
},
}
}
Expand Down
50 changes: 25 additions & 25 deletions nsqadmin/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2e07a9f

Please sign in to comment.