Skip to content

Commit

Permalink
fixing up raft reload tests
Browse files Browse the repository at this point in the history
close second goroutine in raft-net
  • Loading branch information
chelseakomlo committed Jan 17, 2018
1 parent d97c91c commit 1dab7b5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 22 deletions.
13 changes: 5 additions & 8 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,25 +430,22 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
// reinitialize our rpc listener
s.rpcListener.Close()
<-s.listenerCh

// Close existing Raft connections
s.raftTransport.Pause()
s.raftLayer.Close()
s.startRPCListener()

listener, err := s.createRPCListener()
if err != nil {
listener.Close()
return err
}

// Close existing streams
// Close and reload existing Raft connections
s.raftTransport.Pause()
s.raftLayer.Close()
wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
s.raftLayer = NewRaftLayer(s.rpcAdvertise, wrapper)

// Reload raft connections
s.raftTransport.Reload(s.raftLayer)

s.startRPCListener()
time.Sleep(3 * time.Second)

s.logger.Printf("[DEBUG] nomad: finished reloading server connections")
return nil
Expand Down
23 changes: 13 additions & 10 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) {
)
dir := tmpDir(t)
defer os.RemoveAll(dir)

s1 := testServer(t, func(c *Config) {
c.DataDir = path.Join(dir, "nodeA")
})
Expand All @@ -312,10 +313,8 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) {

err := s1.reloadTLSConnections(newTLSConfig)
assert.Nil(err)

assert.True(s1.config.TLSConfig.Equals(newTLSConfig))

time.Sleep(10 * time.Second)
codec := rpcClient(t, s1)

node := mock.Node()
Expand All @@ -327,6 +326,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) {
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp)
assert.NotNil(err)
assert.Contains("rpc error: EOF", err.Error())
}

// Tests that the server will successfully reload its network connections,
Expand All @@ -343,6 +343,7 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) {

dir := tmpDir(t)
defer os.RemoveAll(dir)

s1 := testServer(t, func(c *Config) {
c.DataDir = path.Join(dir, "nodeB")
c.TLSConfig = &config.TLSConfig{
Expand All @@ -362,8 +363,6 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) {
assert.Nil(err)
assert.True(s1.config.TLSConfig.Equals(newTLSConfig))

time.Sleep(10 * time.Second)

codec := rpcClient(t, s1)

node := mock.Node()
Expand Down Expand Up @@ -391,6 +390,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
)
dir := tmpDir(t)
defer os.RemoveAll(dir)

s1 := testServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
Expand Down Expand Up @@ -420,7 +420,6 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
t.Fatalf("should have 2 peers")
})

// the server should be connected to the rest of the cluster
testutil.WaitForLeader(t, s2.RPC)

{
Expand All @@ -439,6 +438,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
assert.Nil(err)
assert.NotEqual(0, resp.Index)

// Check for the job in the FSM of each server in the cluster
{
Expand All @@ -454,7 +454,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
assert.Nil(err)
assert.NotNil(out) // TODO Occasionally is flaky
assert.NotNil(out)
assert.Equal(out.CreateIndex, resp.JobModifyIndex)
}
}
Expand All @@ -478,17 +478,19 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Region: "regionFoo",
Namespace: job.Namespace,
},
}

// TODO(CK) This occasionally is flaky
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
assert.NotNil(err)
assert.Contains("rpc error: EOF", err.Error())

// Check that the job was not persisted
state := s2.fsm.State()
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, _ := state.JobByID(ws, job.Namespace, job.ID)
assert.Nil(out)
Expand All @@ -507,12 +509,12 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
err = s2.reloadTLSConnections(secondNewTLSConfig)
assert.Nil(err)

// the server should be connected to the rest of the cluster
testutil.WaitForLeader(t, s2.RPC)

{
// assert that a job register request will succeed
codec := rpcClient(t, s2)

job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
Expand All @@ -526,14 +528,15 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
assert.Nil(err)
assert.NotEqual(0, resp.Index)

// Check for the job in the FSM of each server in the cluster
{
state := s2.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
assert.Nil(err)
assert.NotNil(out)
assert.NotNil(out) // TODO(CK) This occasionally is flaky
assert.Equal(out.CreateIndex, resp.JobModifyIndex)
}
{
Expand Down
21 changes: 17 additions & 4 deletions vendor/github.com/hashicorp/raft/net_transport.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1dab7b5

Please sign in to comment.