Skip to content

Commit

Permalink
nsqd: configurable HTTP client transport timeout
Browse files Browse the repository at this point in the history
Adds configuration option for HTTPClientTimeout to control the timeout
of the HTTP client transport.

Also added to the following app binaries:

- nsqadmin
- nsq_stat
- nsq_to_file

Closes nsqio#715
  • Loading branch information
kenjones-cisco committed Jul 24, 2016
1 parent 3f10c34 commit de57ee8
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 28 deletions.
12 changes: 9 additions & 3 deletions apps/nsq_stat/nsq_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
channel = flag.String("channel", "", "NSQ channel")
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
httpClientTimeout = flag.Duration("http-client-timeout", 2*time.Second, "duration of time HTTP client timeout")
countNum = numValue{}
nsqdHTTPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
Expand Down Expand Up @@ -55,9 +56,9 @@ func init() {
flag.Var(&countNum, "count", "number of reports")
}

func statLoop(interval time.Duration, topic string, channel string,
func statLoop(interval time.Duration, clientTimeout time.Duration, topic string, channel string,
nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
ci := clusterinfo.New(nil, http_api.NewClient(nil))
ci := clusterinfo.New(nil, http_api.NewClient(nil, clientTimeout))
var o *clusterinfo.ChannelStats
for i := 0; !countNum.isSet || countNum.value >= i; i++ {
var producers clusterinfo.Producers
Expand Down Expand Up @@ -149,6 +150,11 @@ func main() {
log.Fatal("--interval should be positive")
}

clientTimeout := *httpClientTimeout
if int64(clientTimeout) <= 0 {
log.Fatal("--http-client-timeout should be positive")
}

if countNum.isSet && countNum.value <= 0 {
log.Fatal("--count should be positive")
}
Expand All @@ -171,7 +177,7 @@ func main() {
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

go statLoop(intvl, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
go statLoop(intvl, clientTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)

<-termChan
}
19 changes: 13 additions & 6 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

httpClientTimeout = flag.Duration("http-client-timeout", 2*time.Second, "duration of time HTTP client timeout")

nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
Expand Down Expand Up @@ -404,8 +406,8 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool {
return match
}

func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(addrs)
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string, clientTimeout time.Duration) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, clientTimeout)).GetLookupdTopics(addrs)
if err != nil {
log.Printf("ERROR: could not retrieve topic list: %s", err)
}
Expand Down Expand Up @@ -438,13 +440,13 @@ func (t *TopicDiscoverer) hup() {
}
}

func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string, clientTimeout time.Duration) {
ticker := time.Tick(*topicPollRate)
for {
select {
case <-ticker:
if sync {
t.syncTopics(addrs, pattern)
t.syncTopics(addrs, pattern, clientTimeout)
}
case <-t.termChan:
t.stop()
Expand Down Expand Up @@ -474,6 +476,11 @@ func main() {
log.Fatal("--channel is required")
}

clientTimeout := *httpClientTimeout
if int64(clientTimeout) <= 0 {
log.Fatal("--http-client-timeout should be positive")
}

var topicsFromNSQLookupd bool

if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
Expand Down Expand Up @@ -516,7 +523,7 @@ func main() {
}
topicsFromNSQLookupd = true
var err error
topics, err = clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(lookupdHTTPAddrs)
topics, err = clusterinfo.New(nil, http_api.NewClient(nil, clientTimeout)).GetLookupdTopics(lookupdHTTPAddrs)
if err != nil {
log.Fatalf("ERROR: could not retrieve topic list: %s", err)
}
Expand All @@ -536,5 +543,5 @@ func main() {
go discoverer.startTopicRouter(logger)
}

discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern)
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, clientTimeout)
}
2 changes: 2 additions & 0 deletions apps/nsqadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (

notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")

httpClientTimeout = flagSet.Duration("http-client-timeout", 2*time.Second, "duration of time HTTP client timeout")

httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")
Expand Down
1 change: 1 addition & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs := app.StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-timeout", opts.HTTPClientTimeout, "duration of time HTTP client timeout")

// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
Expand Down
8 changes: 4 additions & 4 deletions internal/auth/authorizations.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (a *State) IsExpired() bool {
return false
}

func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*State, error) {
func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string, clientTimeout time.Duration) (*State, error) {
for _, a := range authd {
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret)
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret, clientTimeout)
if err != nil {
log.Printf("Error: failed auth against %s %s", a, err)
continue
Expand All @@ -88,7 +88,7 @@ func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*St
return nil, errors.New("Unable to access auth server")
}

func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error) {
func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string, clientTimeout time.Duration) (*State, error) {
v := url.Values{}
v.Set("remote_ip", remoteIP)
v.Set("tls", tlsEnabled)
Expand All @@ -97,7 +97,7 @@ func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error)
endpoint := fmt.Sprintf("http://%s/auth?%s", authd, v.Encode())

var authState State
client := http_api.NewClient(nil)
client := http_api.NewClient(nil, clientTimeout)
if err := client.GETV1(endpoint, &authState); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/http_api/api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Client struct {
c *http.Client
}

func NewClient(tlsConfig *tls.Config) *Client {
transport := NewDeadlineTransport(2 * time.Second)
func NewClient(tlsConfig *tls.Config, clientTimeout time.Duration) *Client {
transport := NewDeadlineTransport(clientTimeout)
transport.TLSClientConfig = tlsConfig
return &Client{
c: &http.Client{
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type httpServer struct {
func NewHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqadmin.getOpts().Logger)

client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig)
client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadmin.getOpts().HTTPClientTimeout)

router := httprouter.New()
router.HandleMethodNotAllowed = true
Expand Down
3 changes: 3 additions & 0 deletions nsqadmin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Options struct {
NSQLookupdHTTPAddresses []string `flag:"lookupd-http-address" cfg:"nsqlookupd_http_addresses"`
NSQDHTTPAddresses []string `flag:"nsqd-http-address" cfg:"nsqd_http_addresses"`

HTTPClientTimeout time.Duration `flag:"http-client-timeout"`

HTTPClientTLSInsecureSkipVerify bool `flag:"http-client-tls-insecure-skip-verify"`
HTTPClientTLSRootCAFile string `flag:"http-client-tls-root-ca-file"`
HTTPClientTLSCert string `flag:"http-client-tls-cert"`
Expand All @@ -40,6 +42,7 @@ func NewOptions() *Options {
StatsdCounterFormat: "stats.counters.%s.count",
StatsdGaugeFormat: "stats.gauges.%s",
StatsdInterval: 60 * time.Second,
HTTPClientTimeout: 2 * time.Second,
Logger: log.New(os.Stderr, "[nsqadmin] ", log.Ldate|log.Ltime|log.Lmicroseconds),
}
}
2 changes: 1 addition & 1 deletion nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (c *clientV2) QueryAuthd() error {
}

authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses,
remoteIP, tlsEnabled, c.AuthSecret)
remoteIP, tlsEnabled, c.AuthSecret, c.ctx.nsqd.getOpts().HTTPClientTimeout)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(opts *Options) *NSQD {
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil)),
ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil, opts.HTTPClientTimeout)),
dl: dirlock.New(dataPath),
}
n.swapOpts(opts)
Expand Down
8 changes: 4 additions & 4 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func getMetadata(n *NSQD) (*simplejson.Json, error) {

func API(endpoint string) (data *simplejson.Json, err error) {
d := make(map[string]interface{})
err = http_api.NewClient(nil).NegotiateV1(endpoint, &d)
err = http_api.NewClient(nil, 2*time.Second).NegotiateV1(endpoint, &d)
data = simplejson.New()
data.SetPath(nil, d)
return
Expand Down Expand Up @@ -373,11 +373,11 @@ func TestCluster(t *testing.T) {
equal(t, err, nil)

url := fmt.Sprintf("http://%s/topic/create?topic=%s", nsqd.RealHTTPAddr(), topicName)
err = http_api.NewClient(nil).POSTV1(url)
err = http_api.NewClient(nil, 2*time.Second).POSTV1(url)
equal(t, err, nil)

url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=ch", nsqd.RealHTTPAddr(), topicName)
err = http_api.NewClient(nil).POSTV1(url)
err = http_api.NewClient(nil, 2*time.Second).POSTV1(url)
equal(t, err, nil)

// allow some time for nsqd to push info to nsqlookupd
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestCluster(t *testing.T) {
equal(t, channel, "ch")

url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqd.RealHTTPAddr(), topicName)
err = http_api.NewClient(nil).POSTV1(url)
err = http_api.NewClient(nil, 2*time.Second).POSTV1(url)
equal(t, err, nil)

// allow some time for nsqd to push info to nsqlookupd
Expand Down
3 changes: 3 additions & 0 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Options struct {
BroadcastAddress string `flag:"broadcast-address"`
NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"`
HTTPClientTimeout time.Duration `flag:"http-client-timeout"`

// diskqueue options
DataPath string `flag:"data-path"`
Expand Down Expand Up @@ -95,6 +96,8 @@ func NewOptions() *Options {
NSQLookupdTCPAddresses: make([]string, 0),
AuthHTTPAddresses: make([]string, 0),

HTTPClientTimeout: 2 * time.Second,

MemQueueSize: 10000,
MaxBytesPerFile: 100 * 1024 * 1024,
SyncEvery: 2500,
Expand Down
12 changes: 6 additions & 6 deletions nsqlookupd/nsqlookupd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func identify(t *testing.T, conn net.Conn, address string, tcpPort int, httpPort

func API(endpoint string) (data *simplejson.Json, err error) {
d := make(map[string]interface{})
err = http_api.NewClient(nil).NegotiateV1(endpoint, &d)
err = http_api.NewClient(nil, 2*time.Second).NegotiateV1(endpoint, &d)
data = simplejson.New()
data.SetPath(nil, d)
return
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestTombstoneRecover(t *testing.T) {

endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s",
httpAddr, topicName, "ip.address:5555")
err = http_api.NewClient(nil).POSTV1(endpoint)
err = http_api.NewClient(nil, 2*time.Second).POSTV1(endpoint)
equal(t, err, nil)

endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName)
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestTombstoneUnregister(t *testing.T) {

endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s",
httpAddr, topicName, "ip.address:5555")
err = http_api.NewClient(nil).POSTV1(endpoint)
err = http_api.NewClient(nil, 2*time.Second).POSTV1(endpoint)
equal(t, err, nil)

endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName)
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestInactiveNodes(t *testing.T) {
_, err := nsq.ReadResponse(conn)
equal(t, err, nil)

ci := clusterinfo.New(nil, http_api.NewClient(nil))
ci := clusterinfo.New(nil, http_api.NewClient(nil, 2*time.Second))

producers, _ := ci.GetLookupdProducers(lookupdHTTPAddrs)
equal(t, len(producers), 1)
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestTombstonedNodes(t *testing.T) {
_, err := nsq.ReadResponse(conn)
equal(t, err, nil)

ci := clusterinfo.New(nil, http_api.NewClient(nil))
ci := clusterinfo.New(nil, http_api.NewClient(nil, 2*time.Second))

producers, _ := ci.GetLookupdProducers(lookupdHTTPAddrs)
equal(t, len(producers), 1)
Expand All @@ -392,7 +392,7 @@ func TestTombstonedNodes(t *testing.T) {

endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s",
httpAddr, topicName, "ip.address:5555")
err = http_api.NewClient(nil).POSTV1(endpoint)
err = http_api.NewClient(nil, 2*time.Second).POSTV1(endpoint)
equal(t, err, nil)

producers, _ = ci.GetLookupdProducers(lookupdHTTPAddrs)
Expand Down

0 comments on commit de57ee8

Please sign in to comment.