Skip to content

Commit

Permalink
More protocol friendly request pattern. Fix for #294.
Browse files Browse the repository at this point in the history
This request pattern is sematically the same but utilizes a single
wildcard subscription and the last token context for the response
subject binding to each request. Since NATS floods interest (subscriptions),
the old requestor pattern would create a new inbox subscription for each request.
It would then auto-unsubscribe and delete the subscription after the
response was received, causing quite a bit of protocol traffic.

So although this does have a performance gain as can be seen in the benchmarks,
it's implementation is specifically designed to be more friendly for NATS
clusters.
  • Loading branch information
derekcollison committed May 25, 2017
1 parent d912fba commit 2231281
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
language: go
sudo: false
go:
- 1.8.1
- 1.8.3
- 1.7.6
- 1.6.4
- 1.7.5
install:
- go get -t ./...
- go get github.com/nats-io/gnatsd
Expand Down
224 changes: 187 additions & 37 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

// Default Constants
const (
Version = "1.2.2"
Version = "1.3.0"
DefaultURL = "nats://localhost:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
Expand Down Expand Up @@ -119,72 +119,72 @@ type Option func(*Options) error
type Options struct {

// Url represents a single NATS server url to which the client
// will be connecting. If Servers is also set, it then becomes
// the first server in the array.
// will be connecting. If the Servers option is also set, it
// then becomes the first server in the Servers array.
Url string

// Servers is the configured set of servers which are
// available when attempting to connect.
// Servers is a configured set of servers which this client
// will use when attempting to connect.
Servers []string

// NoRandomize configures whether we will be randomizing
// the server pool of servers.
// NoRandomize configures whether we will randomize the
// server pool.
NoRandomize bool

// Name is the optional name label which will be sent to the server
// Name is an optional name label which will be sent to the server
// on CONNECT to identify the client.
Name string

// Verbose enables the server whether it should reply back
// OK on commands successfully being processed.
// Verbose signals the server to send an OK ack for commands
// successfully processed by the server.
Verbose bool

// Pedantic sets pedantic flag option sent on connect to signal server
// whether it should be doing further validation of subjects.
// Pedantic signals the server whether it should be doing further
// validation of subjects.
Pedantic bool

// Secure enable TLS secure connections that skip server
// verification by default.
// Secure enables TLS secure connections that skip server
// verification by default. NOT RECOMMENDED.
Secure bool

// TLSConfig is the custom TLS configuration to use for
// the secure transport.
// TLSConfig is a custom TLS configuration to use for secure
// transports.
TLSConfig *tls.Config

// AllowReconnect enables reconnection logic for when server we were
// connected to fails.
// AllowReconnect enables reconnection logic to be used when we
// encounter a disconnect from the current server.
AllowReconnect bool

// MaxReconnect sets the number of connect attempts that will be
// tried before giving up connecting further to a server in the pool.
// If negative, then it will never give up trying to connect.
// MaxReconnect sets the number of reconnect attempts that will be
// tried before giving up. If negative, then it will never give up
// trying to reconnect.
MaxReconnect int

// ReconnectWait sets the time to backoff after attempting to reconnect
// ReconnectWait sets the time to backoff after attempting a reconnect
// to a server that we were already connected to previously.
ReconnectWait time.Duration

// Timeout sets the timeout for Dial on a connection.
// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration

// FlusherTimeout is the maximum time to wait for the flusher loop
// to be able to finish writing to the underlying socket.
// to be able to finish writing to the underlying connection.
FlusherTimeout time.Duration

// PingInterval is the period at which the server will be sending ping
// PingInterval is the period at which the client will be sending ping
// commands to the server, disabled if 0 or negative.
PingInterval time.Duration

// MaxPingsOut is the maximum number of pending ping commands waiting
// for a response back before raising a ErrStaleConnection error.
// MaxPingsOut is the maximum number of pending ping commands that can
// be awaiting a response before raising an ErrStaleConnection error.
MaxPingsOut int

// ClosedCB sets the closed handler called when client will
// ClosedCB sets the closed handler that is called when a client will
// no longer be connected.
ClosedCB ConnHandler

// DisconnectedCB sets the disconnected handler called whenever we
// are disconnected.
// DisconnectedCB sets the disconnected handler that is called
// whenever we are disconnected.
DisconnectedCB ConnHandler

// ReconnectedCB sets the reconnected handler called whenever
Expand All @@ -198,8 +198,8 @@ type Options struct {
// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
AsyncErrorCB ErrHandler

// ReconnectBufSize of the backing bufio buffer during reconnect. Once this
// has been exhausted publish operations will error.
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
ReconnectBufSize int

// SubChanLen is the size of the buffered channel used between the socket
Expand All @@ -208,7 +208,7 @@ type Options struct {
// dictated by PendingLimits()
SubChanLen int

// User sets the user to be used when connecting to the server.
// User sets the username to be used when connecting to the server.
User string

// Password sets the password to be used when connecting to a server.
Expand All @@ -217,8 +217,12 @@ type Options struct {
// Token sets the token to be used when connecting to a server.
Token string

// Dialer allows users setting a custom Dialer
// Dialer allows a custom Dialer when forming connections.
Dialer *net.Dialer

// UseOldRequestStyle force older method of Requests that utilize a new Inbox
// and Subscription for each request.
UseOldRequestStyle bool
}

const (
Expand All @@ -236,6 +240,9 @@ const (

// Channel size for the async callback handler.
asyncCBChanSize = 32

// NUID size
nuidSize = 22
)

// A Conn represents a bare connection to a nats-server.
Expand Down Expand Up @@ -270,6 +277,11 @@ type Conn struct {
ps *parseState
ptmr *time.Timer
pout int

// New style response handler
respSub string // The wildcard subject
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
}

// A Subscription represents interest in a given subject.
Expand Down Expand Up @@ -573,6 +585,14 @@ func Dialer(dialer *net.Dialer) Option {
}
}

// UseOldRequestyStyle is an Option to force usage of the old Request style.
func UseOldRequestStyle() Option {
return func(o *Options) error {
o.UseOldRequestStyle = true
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -1941,10 +1961,122 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
return nil
}

// Request will create an Inbox and perform a Request() call
// respHandler is the global respnse handler. It will look up
// the apprioriate channel based on the last token and place
// the message on the channel if possible.
func (nc *Conn) respHandler(m *Msg) {
rt := respToken(m.Subject)

nc.mu.Lock()
// Just return if closed, let Request timeout.
if nc.isClosed() {
nc.mu.Unlock()
return
}

// Grab mch
mch := nc.respMap[rt]
// Delete the key regardless, one response only.
// FIXME(dlc) - should we track responses past 1
// just statistics wise?
delete(nc.respMap, rt)
nc.mu.Unlock()

// Don't block, let Request timeout instead, mch is
// buffered and we should delete the key before a
// second response is processed.
select {
case mch <- m:
default:
return
}
}

// Create the response subscription we will use for all
// new style responses. This will be on an _INBOX with an
// additional terminal token. The subscription will be on
// a wildcard.
func (nc *Conn) createRespMux() error {
// _INBOX wildcard
ginbox := fmt.Sprintf("%s.*", NewInbox())
s, err := nc.Subscribe(ginbox, nc.respHandler)
if err != nil {
return err
}
// We could be racing here. So will we double check
// respMux here and discard the new one if set.
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.respMux == nil {
nc.respSub = ginbox
nc.respMux = s
nc.respMap = make(map[string]chan *Msg)
} else {
// Discard duplicate, don't set others.
defer s.Unsubscribe()
}
return nil
}

// New style request that will mux on a single Subscription.
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

// snapshot
var doSetup, useOldRequestStyle bool
nc.mu.Lock()
useOldRequestStyle = nc.Opts.UseOldRequestStyle
doSetup = (nc.respMux == nil)
nc.mu.Unlock()

// If user wants the old style.
if useOldRequestStyle {
return nc.oldRequest(subj, data, timeout)
}

// Make sure scoped subscription is setup at least once on first
// call to Request(). Will handle duplicates in createRespMux.
if doSetup {
if err := nc.createRespMux(); err != nil {
return nil, err
}
}
// Create literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
nc.mu.Lock()
respInbox := nc.newRespInbox()
nc.respMap[respToken(respInbox)] = mch
nc.mu.Unlock()

err := nc.PublishRequest(subj, respInbox, data)
if err != nil {
return nil, err
}

t := time.NewTimer(timeout)
defer t.Stop()

var ok bool
var msg *Msg

select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
case <-t.C:
return nil, ErrTimeout
}

return msg, nil
}

// oldRequest will create an Inbox and perform a Request() call
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) {
inbox := NewInbox()
ch := make(chan *Msg, RequestChanLen)

Expand All @@ -1965,19 +2097,37 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
// InboxPrefix is the prefix for all inbox subjects.
const InboxPrefix = "_INBOX."
const inboxPrefixLen = len(InboxPrefix)
const respInboxPrefixLen = inboxPrefixLen + nuidSize + 1

// NewInbox will return an inbox string which can be used for directed replies from
// subscribers. These are guaranteed to be unique, but can be shared and subscribed
// to by others.
func NewInbox() string {
var b [inboxPrefixLen + 22]byte
var b [inboxPrefixLen + nuidSize]byte
pres := b[:inboxPrefixLen]
copy(pres, InboxPrefix)
ns := b[inboxPrefixLen:]
copy(ns, nuid.Next())
return string(b[:])
}

// Creates a new literal response subject that will trigger
// the global subscription handler.
func (nc *Conn) newRespInbox() string {
var b [inboxPrefixLen + (2 * nuidSize) + 1]byte
pres := b[:respInboxPrefixLen]
copy(pres, nc.respSub)
ns := b[respInboxPrefixLen:]
copy(ns, nuid.Next())
return string(b[:])
}

// respToken will return the last token of a literal response inbox
// which we use for the message channel lookup.
func respToken(respInbox string) string {
return respInbox[respInboxPrefixLen:]
}

// Subscribe will express interest in the given subject. The subject
// can have wildcards (partial:*, full:>). Messages will be delivered
// to the associated MsgHandler. If no MsgHandler is given, the
Expand Down
Loading

0 comments on commit 2231281

Please sign in to comment.