From 45a01e2d9635ec375d7fbc97b0fcdb21292fcd8d Mon Sep 17 00:00:00 2001 From: Chris Piraino and Serguei Filimonov Date: Mon, 15 Dec 2014 14:23:14 -0800 Subject: [PATCH] Bump apcera/nats to v1.0.6 --- Godeps/Godeps.json | 4 +- .../github.com/apcera/nats/cluster_test.go | 2 + .../src/github.com/apcera/nats/enc.go | 8 +- .../src/github.com/apcera/nats/enc_test.go | 4 +- .../apcera/nats/examples/nats-pub.go | 3 +- .../apcera/nats/examples/nats-sub.go | 2 +- .../src/github.com/apcera/nats/nats.go | 170 +++++++++++++----- .../github.com/apcera/nats/reconnect_test.go | 15 ++ 8 files changed, 161 insertions(+), 47 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index e9b3c21c3..48dd55441 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -11,8 +11,8 @@ }, { "ImportPath": "github.com/apcera/nats", - "Comment": "v1.0.1-16-g4ca550d", - "Rev": "4ca550d13b9b6d03b2dded15287a1eae2ae27fa8" + "Comment": "v1.0.6", + "Rev": "0ad40909a5568932a6ba494f25762971a089499d" }, { "ImportPath": "github.com/cloudfoundry-incubator/candiedyaml", diff --git a/Godeps/_workspace/src/github.com/apcera/nats/cluster_test.go b/Godeps/_workspace/src/github.com/apcera/nats/cluster_test.go index a0e87c859..38d7406df 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/cluster_test.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/cluster_test.go @@ -228,6 +228,8 @@ func TestBasicClusterReconnect(t *testing.T) { } // Make sure we did not wait on reconnect for default time. + // Reconnect should be fast since it will be a switch to the + // second server and not be dependent on server restart time. reconnectTime := time.Since(reconnectTimeStart) if reconnectTime > (100 * time.Millisecond) { t.Fatalf("Took longer than expected to reconnect: %v\n", reconnectTime) diff --git a/Godeps/_workspace/src/github.com/apcera/nats/enc.go b/Godeps/_workspace/src/github.com/apcera/nats/enc.go index 450e5edf7..7e758f18c 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/enc.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/enc.go @@ -1,4 +1,4 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. +// Copyright 2012-2014 Apcera Inc. All rights reserved. package nats @@ -19,6 +19,12 @@ type Encoder interface { var encMap map[string]Encoder var encLock sync.Mutex +const ( + JSON_ENCODER = "json" + GOB_ENCODER = "gob" + DEFAULT_ENCODER = "default" +) + func init() { encMap = make(map[string]Encoder) // Register json, gob and default encoder diff --git a/Godeps/_workspace/src/github.com/apcera/nats/enc_test.go b/Godeps/_workspace/src/github.com/apcera/nats/enc_test.go index 671c5e3e2..3352d8f2a 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/enc_test.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/enc_test.go @@ -258,7 +258,7 @@ func TestEncRequest(t *testing.T) { var resp string - err := ec.Request("help", "help me", &resp, 100*time.Millisecond) + err := ec.Request("help", "help me", &resp, 500*time.Millisecond) if err != nil { t.Fatalf("Failed at receiving proper response: %v\n", err) } @@ -274,7 +274,7 @@ func TestEncRequestReceivesMsg(t *testing.T) { var resp Msg - err := ec.Request("help", "help me", &resp, 100*time.Millisecond) + err := ec.Request("help", "help me", &resp, 500*time.Millisecond) if err != nil { t.Fatalf("Failed at receiving proper response: %v\n", err) } diff --git a/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-pub.go b/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-pub.go index 7041d75c8..f5667cbbc 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-pub.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-pub.go @@ -7,6 +7,7 @@ import ( "flag" "log" "strings" + "github.com/apcera/nats" ) @@ -30,7 +31,7 @@ func main() { opts := nats.DefaultOptions opts.Servers = strings.Split(*urls, ",") for i, s := range opts.Servers { - opts.Servers[i] = strings.Trim(s, " ") + opts.Servers[i] = strings.Trim(s, " ") } opts.Secure = *ssl diff --git a/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-sub.go b/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-sub.go index 47561364a..f0b24c89a 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-sub.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/examples/nats-sub.go @@ -1,4 +1,4 @@ -// Copyright 2012 Apcera Inc. All rights reserved. +// Copyright 2012-2014 Apcera Inc. All rights reserved. // +build ignore package main diff --git a/Godeps/_workspace/src/github.com/apcera/nats/nats.go b/Godeps/_workspace/src/github.com/apcera/nats/nats.go index 15d4f549b..0ef5eeebb 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/nats.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/nats.go @@ -27,14 +27,19 @@ import ( ) const ( - Version = "1.0.2" + Version = "1.0.6" DefaultURL = "nats://localhost:4222" DefaultPort = 4222 - DefaultMaxReconnect = 10 + DefaultMaxReconnect = 60 DefaultReconnectWait = 2 * time.Second DefaultTimeout = 2 * time.Second + DefaultPingInterval = 2 * time.Minute + DefaultMaxPingOut = 2 ) +// For detection and proper handling of a Stale Connection +const STALE_CONNECTION = "Stale Connection" + var ( ErrConnectionClosed = errors.New("nats: Connection Closed") ErrSecureConnRequired = errors.New("nats: Secure connection required") @@ -47,6 +52,7 @@ var ( ErrNoServers = errors.New("nats: No servers available for connection") ErrJsonParse = errors.New("nats: Connect message, json parse err") ErrChanArg = errors.New("nats: Argument needs to be a channel type") + ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) ) var DefaultOptions = Options{ @@ -54,6 +60,9 @@ var DefaultOptions = Options{ MaxReconnect: DefaultMaxReconnect, ReconnectWait: DefaultReconnectWait, Timeout: DefaultTimeout, + + PingInterval: DefaultPingInterval, + MaxPingsOut: DefaultMaxPingOut, } type Status int @@ -65,9 +74,6 @@ const ( RECONNECTING ) -// For detection and proper handling of a Stale Connection -const STALE_CONNECTION = "Stale Connection" - // ConnHandlers are used for asynchronous events such as // disconnected and closed connections. type ConnHandler func(*Conn) @@ -93,6 +99,9 @@ type Options struct { DisconnectedCB ConnHandler ReconnectedCB ConnHandler AsyncErrorCB ErrHandler + + PingInterval time.Duration // disabled if 0 or negative + MaxPingsOut int } const ( @@ -138,6 +147,8 @@ type Conn struct { status Status err error ps *parseState + ptmr *time.Timer + pout int } // A Subscription represents interest in a given subject. @@ -233,6 +244,9 @@ func SecureConnect(url string) (*Conn, error) { // Connect will attempt to connect to a NATS server with multiple options. func (o Options) Connect() (*Conn, error) { nc := &Conn{Opts: o} + if nc.Opts.MaxPingsOut == 0 { + nc.Opts.MaxPingsOut = DefaultMaxPingOut + } if err := nc.setupServerPool(); err != nil { return nil, err } @@ -414,21 +428,42 @@ func (nc *Conn) makeTLSConn() { nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize) } +// waitForExits will wait for all socket watcher Go routines to +// be shutdown before proceeding. +func (nc *Conn) waitForExits() { + // Kick old flusher forcefully. + nc.fch <- true + // Wait for any previous go routines. + nc.wg.Wait() +} + // spinUpSocketWatchers will launch the Go routines responsible for // reading and writing to the socket. This will be launched via a // go routine itself to release any locks that may be held. // We also use a WaitGroup to make sure we only start them on a -// reconnect when the precious ones have exited. +// reconnect when the previous ones have exited. func (nc *Conn) spinUpSocketWatchers() { - // Kick old flusher forcefully. - nc.fch <- true - // Wait for any previous ones. - nc.wg.Wait() - // We will wait on both. + // Make sure everything has exited. + nc.waitForExits() + + // We will wait on both going forward. nc.wg.Add(2) + // Spin up the readLoop and the socket flusher. go nc.readLoop() go nc.flusher() + + nc.mu.Lock() + nc.pout = 0 + + if nc.Opts.PingInterval > 0 { + if nc.ptmr == nil { + nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer) + } else { + nc.ptmr.Reset(nc.Opts.PingInterval) + } + } + nc.mu.Unlock() } // Report the connected server's Url @@ -441,6 +476,16 @@ func (nc *Conn) ConnectedUrl() string { return nc.url.String() } +// Report the connected server's Id +func (nc *Conn) ConnectedServerId() string { + nc.mu.Lock() + defer nc.mu.Unlock() + if nc.status != CONNECTED { + return _EMPTY_ + } + return nc.info.Id +} + // Low level setup for structs, etc func (nc *Conn) setup() { nc.subs = make(map[int64]*Subscription) @@ -673,7 +718,6 @@ func (nc *Conn) processDisconnect() { // This will process a disconnect when reconnect is allowed. // The lock should not be held on entering this function. func (nc *Conn) processReconnect() { - nc.mu.Lock() defer nc.mu.Unlock() @@ -683,29 +727,16 @@ func (nc *Conn) processReconnect() { return } nc.status = RECONNECTING + if nc.ptmr != nil { + nc.ptmr.Stop() + } if nc.conn != nil { nc.bw.Flush() nc.conn.Close() + nc.conn = nil } - nc.conn = nil - nc.kickFlusher() - - // FIXME(dlc) - We have an issue here if we have - // outstanding flush points (pongs) and they were not - // sent out, but are still in the pipe. - - // Create a pending buffer to underpin the bufio Writer while - // we are reconnecting. - nc.pending = &bytes.Buffer{} - nc.bw = bufio.NewWriterSize(nc.pending, defaultPendingSize) - nc.err = nil go nc.doReconnect() } - // Perform appropriate callback if needed for a disconnect. - dcb := nc.Opts.DisconnectedCB - if dcb != nil { - go dcb(nc) - } } // flushReconnectPending will push the pending items that were @@ -723,10 +754,34 @@ func (nc *Conn) flushReconnectPendingItems() { // Try to reconnect using the option parameters. // This function assumes we are allowed to reconnect. func (nc *Conn) doReconnect() { + // We want to make sure we have the other watchers shutdown properly + // here before we proceed past this point. + nc.waitForExits() + + // FIXME(dlc) - We have an issue here if we have + // outstanding flush points (pongs) and they were not + // sent out, but are still in the pipe. + // Hold the lock manually and release where needed below, // can't do defer here. nc.mu.Lock() + // Create a new pending buffer to underpin the bufio Writer while + // we are reconnecting. + nc.pending = &bytes.Buffer{} + nc.bw = bufio.NewWriterSize(nc.pending, defaultPendingSize) + + // Clear any errors. + nc.err = nil + + // Perform appropriate callback if needed for a disconnect. + dcb := nc.Opts.DisconnectedCB + if dcb != nil { + nc.mu.Unlock() + dcb(nc) + nc.mu.Lock() + } + for len(nc.srvPool) > 0 { cur, err := nc.selectNextServer() if err != nil { @@ -765,6 +820,10 @@ func (nc *Conn) doReconnect() { // We are reconnected nc.Reconnects += 1 + // Clear out server stats for the server we connected to.. + cur.didConnect = true + cur.reconnects = 0 + // Process Connect logic if nc.err = nc.processExpectedInfo(); nc.err == nil { // Send our connect info as normal @@ -840,9 +899,7 @@ func (nc *Conn) processOpErr(err error) { // protocol from the server. It will dispatch appropriately based // on the op type. func (nc *Conn) readLoop() { - b := make([]byte, defaultBufSize) - - // Release the wait group + // Release the wait group on exit defer nc.wg.Done() // Create a parseState if needed. @@ -852,6 +909,9 @@ func (nc *Conn) readLoop() { } nc.mu.Unlock() + // Stack based buffer. + b := make([]byte, defaultBufSize) + for { // FIXME(dlc): RWLock here? nc.mu.Lock() @@ -1022,6 +1082,7 @@ func (nc *Conn) processPong() { ch = nc.pongs[0] nc.pongs = nc.pongs[1:] } + nc.pout = 0 nc.mu.Unlock() if ch != nil { ch <- true @@ -1051,11 +1112,12 @@ func (nc *Conn) LastError() error { // sets the connection's lastError. func (nc *Conn) processErr(e string) { // FIXME(dlc) - process Slow Consumer signals special. - err := errors.New("nats: " + e) if e == STALE_CONNECTION { - nc.processOpErr(err) + nc.processOpErr(ErrStaleConnection) } else { - nc.err = err + nc.mu.Lock() + nc.err = errors.New("nats: " + e) + nc.mu.Unlock() nc.Close() } } @@ -1388,6 +1450,34 @@ func (nc *Conn) removeFlushEntry(ch chan bool) bool { return false } +// The lock must be held entering this function. +func (nc *Conn) sendPing(ch chan bool) { + nc.pongs = append(nc.pongs, ch) + nc.bw.WriteString(pingProto) + nc.kickFlusher() +} + +func (nc *Conn) processPingTimer() { + nc.mu.Lock() + + if nc.status != CONNECTED { + nc.mu.Unlock() + return + } + + // Check for violation + nc.pout += 1 + if nc.pout > nc.Opts.MaxPingsOut { + nc.mu.Unlock() + nc.processOpErr(ErrStaleConnection) + return + } + + nc.sendPing(nil) + nc.ptmr.Reset(nc.Opts.PingInterval) + nc.mu.Unlock() +} + // FlushTimeout allows a Flush operation to have an associated timeout. func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) { if timeout <= 0 { @@ -1403,11 +1493,7 @@ func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) { defer t.Stop() ch := make(chan bool) // FIXME: Inefficient? - // defer close(ch) - - nc.pongs = append(nc.pongs, ch) - nc.bw.WriteString(pingProto) - nc.bw.Flush() + nc.sendPing(ch) nc.mu.Unlock() select { @@ -1488,6 +1574,10 @@ func (nc *Conn) close(status Status, doCBs bool) { nc.mu.Lock() + if nc.ptmr != nil { + nc.ptmr.Stop() + } + // Close sync subscriber channels and release any // pending NextMsg() calls. for _, s := range nc.subs { diff --git a/Godeps/_workspace/src/github.com/apcera/nats/reconnect_test.go b/Godeps/_workspace/src/github.com/apcera/nats/reconnect_test.go index 868da3ffa..2d7c33d90 100644 --- a/Godeps/_workspace/src/github.com/apcera/nats/reconnect_test.go +++ b/Godeps/_workspace/src/github.com/apcera/nats/reconnect_test.go @@ -11,6 +11,15 @@ func startReconnectServer(t *testing.T) *server { return startServer(t, 22222, "") } +func TestReconnectTotalTime(t *testing.T) { + opts := DefaultOptions + totalReconnectTime := time.Duration(opts.MaxReconnect) * opts.ReconnectWait + if totalReconnectTime < (2 * time.Minute) { + t.Fatalf("Total reconnect time should be at least 2 mins: Currently %v\n", + totalReconnectTime) + } +} + func TestReconnectDisallowedFlags(t *testing.T) { ts := startReconnectServer(t) ch := make(chan bool) @@ -122,6 +131,12 @@ func TestBasicReconnectFunctionality(t *testing.T) { ec.Conn.Reconnects, expectedReconnectCount) } + // Make sure the server who is reconnected has the reconnects stats reset. + _, cur := nc.currentServer() + if cur.reconnects != 0 { + t.Fatalf("Current Server's reconnects should be 0 vs %d\n", cur.reconnects) + } + nc.Close() }