diff --git a/.travis.yml b/.travis.yml index 6713d0e15..f4aaa7499 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,9 @@ language: go sudo: false go: -- 1.8.1 +- 1.8.3 +- 1.7.6 - 1.6.4 -- 1.7.5 install: - go get -t ./... - go get github.com/nats-io/gnatsd diff --git a/context.go b/context.go index 6a4037c0c..2c2fbb6ea 100644 --- a/context.go +++ b/context.go @@ -16,7 +16,58 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte if ctx == nil { return nil, ErrInvalidContext } + if nc == nil { + return nil, ErrInvalidConnection + } + + // snapshot + var doSetup, useOldRequestStyle bool + nc.mu.Lock() + useOldRequestStyle = nc.Opts.UseOldRequestStyle + doSetup = (nc.respMux == nil) + nc.mu.Unlock() + + // If user wants the old style. + if useOldRequestStyle { + return nc.oldRequestWithContext(ctx, subj, data) + } + + // Make sure scoped subscription is setup at least once on first + // call to Request(). Will handle duplicates in createRespMux. + if doSetup { + if err := nc.createRespMux(); err != nil { + return nil, err + } + } + // Create literal Inbox and map to a chan msg. + mch := make(chan *Msg, RequestChanLen) + nc.mu.Lock() + respInbox := nc.newRespInbox() + nc.respMap[respToken(respInbox)] = mch + nc.mu.Unlock() + + err := nc.PublishRequest(subj, respInbox, data) + if err != nil { + return nil, err + } + + var ok bool + var msg *Msg + + select { + case msg, ok = <-mch: + if !ok { + return nil, ErrConnectionClosed + } + case <-ctx.Done(): + return nil, ctx.Err() + } + + return msg, nil +} +// oldRequestWithContext utilizes inbox and subscription per request. +func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { inbox := NewInbox() ch := make(chan *Msg, RequestChanLen) diff --git a/nats.go b/nats.go index 20682b918..95c01855d 100644 --- a/nats.go +++ b/nats.go @@ -28,7 +28,7 @@ import ( // Default Constants const ( - Version = "1.2.2" + Version = "1.3.0" DefaultURL = "nats://localhost:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -119,76 +119,76 @@ type Option func(*Options) error type Options struct { // Url represents a single NATS server url to which the client - // will be connecting. If Servers is also set, it then becomes - // the first server in the array. + // will be connecting. If the Servers option is also set, it + // then becomes the first server in the Servers array. Url string - // Servers is the configured set of servers which are - // available when attempting to connect. + // Servers is a configured set of servers which this client + // will use when attempting to connect. Servers []string - // NoRandomize configures whether we will be randomizing - // the server pool of servers. + // NoRandomize configures whether we will randomize the + // server pool. NoRandomize bool - // Name is the optional name label which will be sent to the server + // Name is an optional name label which will be sent to the server // on CONNECT to identify the client. Name string - // Verbose enables the server whether it should reply back - // OK on commands successfully being processed. + // Verbose signals the server to send an OK ack for commands + // successfully processed by the server. Verbose bool - // Pedantic sets pedantic flag option sent on connect to signal server - // whether it should be doing further validation of subjects. + // Pedantic signals the server whether it should be doing further + // validation of subjects. Pedantic bool - // Secure enable TLS secure connections that skip server - // verification by default. + // Secure enables TLS secure connections that skip server + // verification by default. NOT RECOMMENDED. Secure bool - // TLSConfig is the custom TLS configuration to use for - // the secure transport. + // TLSConfig is a custom TLS configuration to use for secure + // transports. TLSConfig *tls.Config - // AllowReconnect enables reconnection logic for when server we were - // connected to fails. + // AllowReconnect enables reconnection logic to be used when we + // encounter a disconnect from the current server. AllowReconnect bool - // MaxReconnect sets the number of connect attempts that will be - // tried before giving up connecting further to a server in the pool. - // If negative, then it will never give up trying to connect. + // MaxReconnect sets the number of reconnect attempts that will be + // tried before giving up. If negative, then it will never give up + // trying to reconnect. MaxReconnect int - // ReconnectWait sets the time to backoff after attempting to reconnect + // ReconnectWait sets the time to backoff after attempting a reconnect // to a server that we were already connected to previously. ReconnectWait time.Duration - // Timeout sets the timeout for Dial on a connection. + // Timeout sets the timeout for a Dial operation on a connection. Timeout time.Duration // FlusherTimeout is the maximum time to wait for the flusher loop - // to be able to finish writing to the underlying socket. + // to be able to finish writing to the underlying connection. FlusherTimeout time.Duration - // PingInterval is the period at which the server will be sending ping + // PingInterval is the period at which the client will be sending ping // commands to the server, disabled if 0 or negative. PingInterval time.Duration - // MaxPingsOut is the maximum number of pending ping commands waiting - // for a response back before raising a ErrStaleConnection error. + // MaxPingsOut is the maximum number of pending ping commands that can + // be awaiting a response before raising an ErrStaleConnection error. MaxPingsOut int - // ClosedCB sets the closed handler called when client will + // ClosedCB sets the closed handler that is called when a client will // no longer be connected. ClosedCB ConnHandler - // DisconnectedCB sets the disconnected handler called whenever we - // are disconnected. + // DisconnectedCB sets the disconnected handler that is called + // whenever the connection is disconnected. DisconnectedCB ConnHandler // ReconnectedCB sets the reconnected handler called whenever - // successfully reconnected. + // the connection is successfully reconnected. ReconnectedCB ConnHandler // DiscoveredServersCB sets the callback that is invoked whenever a new @@ -198,8 +198,8 @@ type Options struct { // AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB ErrHandler - // ReconnectBufSize of the backing bufio buffer during reconnect. Once this - // has been exhausted publish operations will error. + // ReconnectBufSize is the size of the backing bufio during reconnect. + // Once this has been exhausted publish operations will return an error. ReconnectBufSize int // SubChanLen is the size of the buffered channel used between the socket @@ -208,7 +208,7 @@ type Options struct { // dictated by PendingLimits() SubChanLen int - // User sets the user to be used when connecting to the server. + // User sets the username to be used when connecting to the server. User string // Password sets the password to be used when connecting to a server. @@ -217,8 +217,12 @@ type Options struct { // Token sets the token to be used when connecting to a server. Token string - // Dialer allows users setting a custom Dialer + // Dialer allows a custom Dialer when forming connections. Dialer *net.Dialer + + // UseOldRequestStyle forces the old method of Requests that utilize + // a new Inbox and a new Subscription for each request. + UseOldRequestStyle bool } const ( @@ -236,6 +240,9 @@ const ( // Channel size for the async callback handler. asyncCBChanSize = 32 + + // NUID size + nuidSize = 22 ) // A Conn represents a bare connection to a nats-server. @@ -270,6 +277,11 @@ type Conn struct { ps *parseState ptmr *time.Timer pout int + + // New style response handler + respSub string // The wildcard subject + respMux *Subscription // A single response subscription + respMap map[string]chan *Msg // Request map for the response msg channels } // A Subscription represents interest in a given subject. @@ -573,6 +585,14 @@ func Dialer(dialer *net.Dialer) Option { } } +// UseOldRequestyStyle is an Option to force usage of the old Request style. +func UseOldRequestStyle() Option { + return func(o *Options) error { + o.UseOldRequestStyle = true + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -1941,10 +1961,123 @@ func (nc *Conn) publish(subj, reply string, data []byte) error { return nil } -// Request will create an Inbox and perform a Request() call +// respHandler is the global response handler. It will look up +// the appropriate channel based on the last token and place +// the message on the channel if possible. +func (nc *Conn) respHandler(m *Msg) { + rt := respToken(m.Subject) + + nc.mu.Lock() + // Just return if closed. + if nc.isClosed() { + nc.mu.Unlock() + return + } + + // Grab mch + mch := nc.respMap[rt] + // Delete the key regardless, one response only. + // FIXME(dlc) - should we track responses past 1 + // just statistics wise? + delete(nc.respMap, rt) + nc.mu.Unlock() + + // Don't block, let Request timeout instead, mch is + // buffered and we should delete the key before a + // second response is processed. + select { + case mch <- m: + default: + return + } +} + +// Create the response subscription we will use for all +// new style responses. This will be on an _INBOX with an +// additional terminal token. The subscription will be on +// a wildcard. +func (nc *Conn) createRespMux() error { + // _INBOX wildcard + ginbox := fmt.Sprintf("%s.*", NewInbox()) + s, err := nc.Subscribe(ginbox, nc.respHandler) + if err != nil { + return err + } + // We could be racing here. So will we double check + // respMux here and discard the new one if set. + nc.mu.Lock() + if nc.respMux == nil { + nc.respSub = ginbox + nc.respMux = s + nc.respMap = make(map[string]chan *Msg) + } else { + // Discard duplicate, don't set others. + defer s.Unsubscribe() + } + nc.mu.Unlock() + return nil +} + +// Request will send a request payload and deliver the response message, +// or an error, including a timeout if no message was received properly. +func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { + if nc == nil { + return nil, ErrInvalidConnection + } + + // snapshot + var doSetup, useOldRequestStyle bool + nc.mu.Lock() + useOldRequestStyle = nc.Opts.UseOldRequestStyle + doSetup = (nc.respMux == nil) + nc.mu.Unlock() + + // If user wants the old style. + if useOldRequestStyle { + return nc.oldRequest(subj, data, timeout) + } + + // Make sure scoped subscription is setup at least once on first + // call to Request(). Will handle duplicates in createRespMux. + if doSetup { + if err := nc.createRespMux(); err != nil { + return nil, err + } + } + // Create literal Inbox and map to a chan msg. + mch := make(chan *Msg, RequestChanLen) + nc.mu.Lock() + respInbox := nc.newRespInbox() + nc.respMap[respToken(respInbox)] = mch + nc.mu.Unlock() + + err := nc.PublishRequest(subj, respInbox, data) + if err != nil { + return nil, err + } + + t := time.NewTimer(timeout) + defer t.Stop() + + var ok bool + var msg *Msg + + select { + case msg, ok = <-mch: + if !ok { + return nil, ErrConnectionClosed + } + case <-t.C: + return nil, ErrTimeout + } + + return msg, nil +} + +// oldRequest will create an Inbox and perform a Request() call // with the Inbox reply and return the first reply received. // This is optimized for the case of multiple responses. -func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { +func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) { inbox := NewInbox() ch := make(chan *Msg, RequestChanLen) @@ -1965,12 +2098,13 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, // InboxPrefix is the prefix for all inbox subjects. const InboxPrefix = "_INBOX." const inboxPrefixLen = len(InboxPrefix) +const respInboxPrefixLen = inboxPrefixLen + nuidSize + 1 // NewInbox will return an inbox string which can be used for directed replies from // subscribers. These are guaranteed to be unique, but can be shared and subscribed // to by others. func NewInbox() string { - var b [inboxPrefixLen + 22]byte + var b [inboxPrefixLen + nuidSize]byte pres := b[:inboxPrefixLen] copy(pres, InboxPrefix) ns := b[inboxPrefixLen:] @@ -1978,6 +2112,23 @@ func NewInbox() string { return string(b[:]) } +// Creates a new literal response subject that will trigger +// the global subscription handler. +func (nc *Conn) newRespInbox() string { + var b [inboxPrefixLen + (2 * nuidSize) + 1]byte + pres := b[:respInboxPrefixLen] + copy(pres, nc.respSub) + ns := b[respInboxPrefixLen:] + copy(ns, nuid.Next()) + return string(b[:]) +} + +// respToken will return the last token of a literal response inbox +// which we use for the message channel lookup. +func respToken(respInbox string) string { + return respInbox[respInboxPrefixLen:] +} + // Subscribe will express interest in the given subject. The subject // can have wildcards (partial:*, full:>). Messages will be delivered // to the associated MsgHandler. If no MsgHandler is given, the @@ -2592,6 +2743,20 @@ func (nc *Conn) clearPendingFlushCalls() { nc.pongs = nil } +// This will clear any pending Request calls. +// Lock is assumed to be held by the caller. +func (nc *Conn) clearPendingRequestCalls() { + if nc.respMap == nil { + return + } + for key, ch := range nc.respMap { + if ch != nil { + close(ch) + delete(nc.respMap, key) + } + } +} + // Low level close call that will do correct cleanup and set // desired status. Also controls whether user defined callbacks // will be triggered. The lock should not be held entering this @@ -2614,6 +2779,9 @@ func (nc *Conn) close(status Status, doCBs bool) { // Clear any queued pongs, e.g. pending flush calls. nc.clearPendingFlushCalls() + // Clear any queued and blocking Requests. + nc.clearPendingRequestCalls() + if nc.ptmr != nil { nc.ptmr.Stop() } diff --git a/test/basic_test.go b/test/basic_test.go index 5e90203ad..863d9f699 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -503,6 +503,29 @@ func TestRequestTimeout(t *testing.T) { } } +func TestOldRequest(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + + response := []byte("I will help you") + nc.Subscribe("foo", func(m *nats.Msg) { + nc.Publish(m.Reply, response) + }) + msg, err := nc.Request("foo", []byte("help"), 500*time.Millisecond) + if err != nil { + t.Fatalf("Received an error on Request test: %s", err) + } + if !bytes.Equal(msg.Data, response) { + t.Fatalf("Received invalid response") + } +} + func TestRequest(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() @@ -541,6 +564,71 @@ func TestRequestNoBody(t *testing.T) { } } +func TestSimultaneousRequests(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + nc := NewDefaultConnection(t) + defer nc.Close() + + response := []byte("I will help you") + nc.Subscribe("foo", func(m *nats.Msg) { + nc.Publish(m.Reply, response) + }) + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + if _, err := nc.Request("foo", nil, 2*time.Second); err != nil { + t.Fatalf("Expected to receive a timeout error") + } else { + wg.Done() + } + }() + } + wg.Wait() +} + +func TestRequestClose(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + defer nc.Close() + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + nc.Close() + }() + if _, err := nc.Request("foo", []byte("help"), 2*time.Second); err != nats.ErrInvalidConnection && err != nats.ErrConnectionClosed { + t.Fatalf("Expected connection error: got %v", err) + } + wg.Wait() +} + +func TestRequestCloseTimeout(t *testing.T) { + // Make sure we return a timeout when we close + // the connection even if response is queued. + + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + defer nc.Close() + + response := []byte("I will help you") + nc.Subscribe("foo", func(m *nats.Msg) { + nc.Publish(m.Reply, response) + nc.Close() + }) + if _, err := nc.Request("foo", nil, 1*time.Second); err == nil { + t.Fatalf("Expected to receive a timeout error") + } +} + func TestFlushInCB(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() diff --git a/test/bench_test.go b/test/bench_test.go index 5deb12775..233e8c0fa 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -127,3 +127,27 @@ func BenchmarkRequest(b *testing.B) { } } } + +func BenchmarkOldRequest(b *testing.B) { + b.StopTimer() + s := RunDefaultServer() + defer s.Shutdown() + nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle()) + if err != nil { + b.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + ok := []byte("ok") + nc.Subscribe("req", func(m *nats.Msg) { + nc.Publish(m.Reply, ok) + }) + b.StartTimer() + b.ReportAllocs() + q := []byte("q") + for i := 0; i < b.N; i++ { + _, err := nc.Request("req", q, 1*time.Second) + if err != nil { + b.Fatalf("Err %v\n", err) + } + } +} diff --git a/test/context_test.go b/test/context_test.go index c431d9353..003977002 100644 --- a/test/context_test.go +++ b/test/context_test.go @@ -13,13 +13,22 @@ import ( "github.com/nats-io/go-nats" ) -func TestContextRequestWithTimeout(t *testing.T) { - s := RunDefaultServer() - defer s.Shutdown() +func TestContextRequestWithNilConnection(t *testing.T) { + var nc *nats.Conn - nc := NewDefaultConnection(t) - defer nc.Close() + ctx, cancelCB := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancelCB() // should always be called, not discarded, to prevent context leak + + _, err := nc.RequestWithContext(ctx, "fast", []byte("")) + if err == nil { + t.Fatalf("Expected request with context and nil connection to fail\n") + } + if err != nats.ErrInvalidConnection { + t.Fatalf("Expected nats.ErrInvalidConnection, got %v\n", err) + } +} +func testContextRequestWithTimeout(t *testing.T, nc *nats.Conn) { nc.Subscribe("slow", func(m *nats.Msg) { // Simulates latency into the client so that timeout is hit. time.Sleep(200 * time.Millisecond) @@ -71,13 +80,30 @@ func TestContextRequestWithTimeout(t *testing.T) { } } -func TestContextRequestWithTimeoutCanceled(t *testing.T) { +func TestContextRequestWithTimeout(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() nc := NewDefaultConnection(t) defer nc.Close() + testContextRequestWithTimeout(t, nc) +} + +func TestOldContextRequestWithTimeout(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + + testContextRequestWithTimeout(t, nc) +} + +func testContextRequestWithTimeoutCanceled(t *testing.T, nc *nats.Conn) { ctx, cancelCB := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancelCB() @@ -124,13 +150,30 @@ func TestContextRequestWithTimeoutCanceled(t *testing.T) { } } -func TestContextRequestWithCancel(t *testing.T) { +func TestContextRequestWithTimeoutCanceled(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() nc := NewDefaultConnection(t) defer nc.Close() + testContextRequestWithTimeoutCanceled(t, nc) +} + +func TestOldContextRequestWithTimeoutCanceled(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + + testContextRequestWithTimeoutCanceled(t, nc) +} + +func testContextRequestWithCancel(t *testing.T, nc *nats.Conn) { ctx, cancelCB := context.WithCancel(context.Background()) defer cancelCB() // should always be called, not discarded, to prevent context leak @@ -201,13 +244,30 @@ func TestContextRequestWithCancel(t *testing.T) { } } -func TestContextRequestWithDeadline(t *testing.T) { +func TestContextRequestWithCancel(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() nc := NewDefaultConnection(t) defer nc.Close() + testContextRequestWithCancel(t, nc) +} + +func TestOldContextRequestWithCancel(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + + testContextRequestWithCancel(t, nc) +} + +func testContextRequestWithDeadline(t *testing.T, nc *nats.Conn) { deadline := time.Now().Add(100 * time.Millisecond) ctx, cancelCB := context.WithDeadline(context.Background(), deadline) defer cancelCB() // should always be called, not discarded, to prevent context leak @@ -252,6 +312,29 @@ func TestContextRequestWithDeadline(t *testing.T) { } } +func TestContextRequestWithDeadline(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + defer nc.Close() + + testContextRequestWithDeadline(t, nc) +} + +func TestOldContextRequestWithDeadline(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + + testContextRequestWithDeadline(t, nc) +} + func TestContextSubNextMsgWithTimeout(t *testing.T) { s := RunDefaultServer() defer s.Shutdown()