Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LeafNode: delay connect even when loop detected by accepting side #1338

Merged
merged 1 commit into from
Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,7 @@ func (c *client) processErr(errStr string) {
c.Errorf("Gateway Error %s", errStr)
case LEAF:
c.Errorf("Leafnode Error %s", errStr)
c.leafProcessErr(errStr)
close = false
}
if close {
Expand Down
78 changes: 46 additions & 32 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
return delay
}

// Reset the connect delay.
func (cfg *leafNodeCfg) resetConnectDelay() {
// Sets the connect delay.
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
cfg.Lock()
cfg.connDelay = 0
cfg.connDelay = delay
cfg.Unlock()
}

Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
case <-s.quitCh:
return
}
remote.resetConnectDelay()
remote.setConnectDelay(0)
}

var conn net.Conn
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
if ldsPrefix && string(sub.subject) == acc.getLDSubject() {
c.mu.Unlock()
srv.reportLeafNodeLoop(c)
c.handleLeafNodeLoop(true)
return nil
}

Expand Down Expand Up @@ -1364,23 +1364,19 @@ func (c *client) processLeafSub(argo []byte) (err error) {
return nil
}

func (s *Server) reportLeafNodeLoop(c *client) {
delay := leafNodeReconnectDelayAfterLoopDetected
opts := s.getOpts()
if opts.LeafNode.connDelay != 0 {
delay = opts.LeafNode.connDelay
}
c.mu.Lock()
if c.leaf.remote != nil {
c.leaf.remote.Lock()
c.leaf.remote.connDelay = delay
c.leaf.remote.Unlock()
// If the leafnode is a solicited, set the connect delay based on default
// or private option (for tests). Sends the error to the other side, log and
// close the connection.
func (c *client) handleLeafNodeLoop(sendErr bool) {
accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
if sendErr {
c.sendErr(errTxt)
}
accName := c.acc.Name
c.mu.Unlock()
c.sendErrAndErr(fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v",
accName, delay))
// Leafnode do not close the connection on processErr(), so close it here.
c.Errorf(errTxt)
// If we are here with "sendErr" false, it means that this is the server
// that received the error. The other side will have closed the connection,
// but does not hurt to close here too.
c.closeConnection(ProtocolViolation)
}

Expand Down Expand Up @@ -1586,17 +1582,7 @@ func (c *client) leafSubPermViolation(subj []byte) {
// Sends the permission violation error to the remote, logs it and closes the connection.
// If this is from a server soliciting, the reconnection will be delayed.
func (c *client) leafPermViolation(pub bool, subj []byte) {
c.mu.Lock()
if c.leaf.remote != nil {
delay := leafNodeReconnectAfterPermViolation
if s := c.srv; s != nil {
if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
delay = srvdelay
}
}
c.leaf.remote.connDelay = delay
}
c.mu.Unlock()
c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
var action string
if pub {
c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
Expand All @@ -1609,3 +1595,31 @@ func (c *client) leafPermViolation(pub bool, subj []byte) {
// TODO: add a new close reason that is more appropriate?
c.closeConnection(ProtocolViolation)
}

// Invoked from generic processErr() for LEAF connections.
func (c *client) leafProcessErr(errStr string) {
// We will look for Loop detected error coming from the other side.
// If we solicit, set the connect delay.
if !strings.Contains(errStr, "Loop detected") {
return
}
c.handleLeafNodeLoop(false)
}

// If this leaf connection solicits, sets the connect delay to the given value,
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
// Returns the connection's account name and delay.
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
c.mu.Lock()
if c.isSolicitedLeafNode() {
if s := c.srv; s != nil {
if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
delay = srvdelay
}
}
c.leaf.remote.setConnectDelay(delay)
}
accName := c.acc.Name
c.mu.Unlock()
return accName, delay
}
67 changes: 67 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,3 +1290,70 @@ func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) {
return nil
})
}

// Make sure that if the node that detects the loop (and sends the error and
// close the connection) is the accept side, the remote node (the one that solicits)
// properly use the reconnect delay.
func TestLeafNodeLoopDetectedOnAcceptSide(t *testing.T) {
bo := DefaultOptions()
bo.LeafNode.Host = "127.0.0.1"
bo.LeafNode.Port = -1
b := RunServer(bo)
defer b.Shutdown()

l := &captureErrorLogger{errCh: make(chan string, 10)}
b.SetLogger(l, false, false)

u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", bo.LeafNode.Port))

ao := testDefaultOptionsForGateway("A")
ao.Accounts = []*Account{NewAccount("SYS")}
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
ao.SystemAccount = "SYS"
ao.LeafNode.ReconnectInterval = 5 * time.Millisecond
ao.LeafNode.Remotes = []*RemoteLeafOpts{
{
URLs: []*url.URL{u},
Hub: true,
},
}
a := RunServer(ao)
defer a.Shutdown()

co := testGatewayOptionsFromToWithServers(t, "C", "A", a)
co.Accounts = []*Account{NewAccount("SYS")}
co.SystemAccount = "SYS"
co.LeafNode.ReconnectInterval = 5 * time.Millisecond
co.LeafNode.Remotes = []*RemoteLeafOpts{
{
URLs: []*url.URL{u},
Hub: true,
},
}
c := RunServer(co)
defer c.Shutdown()

for i := 0; i < 2; i++ {
select {
case e := <-l.errCh:
if !strings.Contains(e, "Loop detected") {
t.Fatalf("Unexpected error: %q", e)
}
case <-time.After(200 * time.Millisecond):
// We are likely to detect from each A and C servers,
// but consider a failure if we did not receive any.
if i == 0 {
t.Fatalf("Should have detected loop")
}
}
}

// The reconnect attempt is set to 5ms, but the default loop delay
// is 30 seconds, so we should not get any new error for that long.
// Check if we are getting more errors..
select {
case e := <-l.errCh:
t.Fatalf("Should not have gotten another error, got %q", e)
case <-time.After(50 * time.Millisecond):
// OK!
}
}