Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ephemeral Nodes for via Session behavior settings. #487

Merged
merged 3 commits into from
Nov 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion command/agent/session_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request)
return nil, nil
}

// Default the session to our node + serf check
// Default the session to our node + serf check + release session invalidate behavior
args := structs.SessionRequest{
Op: structs.SessionCreate,
Session: structs.Session{
Node: s.agent.config.NodeName,
Checks: []string{consul.SerfCheckID},
LockDelay: 15 * time.Second,
Behavior: structs.SessionKeysRelease,
},
}
s.parseDC(req, &args.Datacenter)
Expand Down
113 changes: 113 additions & 0 deletions command/agent/session_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,55 @@ func TestSessionCreate(t *testing.T) {
})
}

func TestSessionCreateDelete(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Create a health check
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: srv.agent.config.NodeName,
Address: "127.0.0.1",
Check: &structs.HealthCheck{
CheckID: "consul",
Node: srv.agent.config.NodeName,
Name: "consul",
ServiceID: "consul",
Status: structs.HealthPassing,
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Associate session with node and 2 health checks, and make it delete on session destroy
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Name": "my-cool-session",
"Node": srv.agent.config.NodeName,
"Checks": []string{consul.SerfCheckID, "consul"},
"LockDelay": "20s",
"Behavior": structs.SessionKeysDelete,
}
enc.Encode(raw)

req, err := http.NewRequest("PUT", "/v1/session/create", body)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
obj, err := srv.SessionCreate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if _, ok := obj.(sessionCreateResponse); !ok {
t.Fatalf("should work")
}
})
}

func TestFixupLockDelay(t *testing.T) {
inp := map[string]interface{}{
"lockdelay": float64(15),
Expand Down Expand Up @@ -105,6 +154,28 @@ func makeTestSession(t *testing.T, srv *HTTPServer) string {
return sessResp.ID
}

func makeTestSessionDelete(t *testing.T, srv *HTTPServer) string {
// Create Session with delete behavior
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Behavior": "delete",
}
enc.Encode(raw)

req, err := http.NewRequest("PUT", "/v1/session/create", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.SessionCreate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
sessResp := obj.(sessionCreateResponse)
return sessResp.ID
}

func TestSessionDestroy(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
id := makeTestSession(t, srv)
Expand Down Expand Up @@ -188,3 +259,45 @@ func TestSessionsForNode(t *testing.T) {
}
})
}

func TestSessionDeleteDestroy(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
id := makeTestSessionDelete(t, srv)

// now create a new key for the session and acquire it
buf := bytes.NewBuffer([]byte("test"))
req, err := http.NewRequest("PUT", "/v1/kv/ephemeral?acquire="+id, buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if res := obj.(bool); !res {
t.Fatalf("should work")
}

// now destroy the session, this should delete the key created above
req, err = http.NewRequest("PUT", "/v1/session/destroy/"+id, nil)
resp = httptest.NewRecorder()
obj, err = srv.SessionDestroy(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp := obj.(bool); !resp {
t.Fatalf("should work")
}

// Verify that the key is gone
req, _ = http.NewRequest("GET", "/v1/kv/ephemeral", nil)
resp = httptest.NewRecorder()
obj, _ = srv.KVSEndpoint(resp, req)
res, found := obj.(structs.DirEntries)
if found || len(res) != 0 {
t.Fatalf("bad: %v found, should be nothing", res)
}
})
}
8 changes: 8 additions & 0 deletions consul/session_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
if args.Session.Node == "" && args.Op == structs.SessionCreate {
return fmt.Errorf("Must provide Node")
}
switch args.Session.Behavior {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a case for the empty string which does default, and return an error in the default case. e.g. behavior = foobar should return an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, agreed. Will fix this and the other file as well.

On Nov 20, 2014 5:58 PM, Armon Dadgar notifications@github.com wrote:

In consul/session_endpoint.go:

@@ -28,6 +28,13 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
if args.Session.Node == "" && args.Op == structs.SessionCreate {
return fmt.Errorf("Must provide Node")
}

  • switch args.Session.Behavior {
    

I think there should be a case for the empty string which does default, and return an error in the default case. e.g. behavior = foobar should return an error.


Reply to this email directly or view it on GitHubhttps://github.com//pull/487/files#r20684732.

case structs.SessionKeysRelease, structs.SessionKeysDelete:
// we like it, use it
case "":
args.Session.Behavior = structs.SessionKeysRelease // force default behavior
default:
return fmt.Errorf("Invalid Behavior setting '%s'", args.Session.Behavior)
}

// If this is a create, we must generate the Session ID. This must
// be done prior to appending to the raft log, because the ID is not
Expand Down
63 changes: 63 additions & 0 deletions consul/session_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,69 @@ func TestSessionEndpoint_Apply(t *testing.T) {
}
}

func TestSessionEndpoint_DeleteApply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()

testutil.WaitForLeader(t, client.Call, "dc1")

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})

arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
Name: "my-session",
Behavior: structs.SessionKeysDelete,
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out

// Verify
state := s1.fsm.State()
_, s, err := state.SessionGet(out)
if err != nil {
t.Fatalf("err: %v", err)
}
if s == nil {
t.Fatalf("should not be nil")
}
if s.Node != "foo" {
t.Fatalf("bad: %v", s)
}
if s.Name != "my-session" {
t.Fatalf("bad: %v", s)
}
if s.Behavior != structs.SessionKeysDelete {
t.Fatalf("bad: %v", s)
}

// Do a delete
arg.Op = structs.SessionDestroy
arg.Session.ID = out
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Verify
_, s, err = state.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if s != nil {
t.Fatalf("bad: %v", s)
}
}

func TestSessionEndpoint_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
Expand Down
56 changes: 45 additions & 11 deletions consul/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,15 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
return fmt.Errorf("Missing Session ID")
}

switch session.Behavior {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, handle the empty string and default causes separately

case structs.SessionKeysRelease, structs.SessionKeysDelete:
// we like
case "":
session.Behavior = structs.SessionKeysRelease // force default behavior
default:
return fmt.Errorf("Invalid Session Behavior setting '%s'", session.Behavior)
}

// Assign the create index
session.CreateIndex = index

Expand Down Expand Up @@ -1454,7 +1463,7 @@ func (s *StateStore) SessionDestroy(index uint64, id string) error {
}
defer tx.Abort()

log.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy",
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy",
id)
if err := s.invalidateSession(index, tx, id); err != nil {
return err
Expand All @@ -1471,7 +1480,7 @@ func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error
}
for _, sess := range sessions {
session := sess.(*structs.Session).ID
log.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation",
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation",
session, node)
if err := s.invalidateSession(index, tx, session); err != nil {
return err
Expand All @@ -1489,7 +1498,7 @@ func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check strin
}
for _, sc := range sessionChecks {
session := sc.(*sessionCheck).Session
log.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation",
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation",
session, check)
if err := s.invalidateSession(index, tx, session); err != nil {
return err
Expand All @@ -1513,15 +1522,23 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
}
session := res[0].(*structs.Session)

// Enforce the MaxLockDelay
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
if session.Behavior == structs.SessionKeysDelete {
// delete the keys held by the session
if err := s.deleteKeys(index, tx, id); err != nil {
return err
}

// Invalidate any held locks
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err
} else { // default to release
// Enforce the MaxLockDelay
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}

// Invalidate any held locks
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err
}
}

// Nuke the session
Expand Down Expand Up @@ -1588,6 +1605,23 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
return nil
}

// deleteKeys is used to delete all the keys created by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error {
num, err := s.kvsTable.DeleteTxn(tx, "session", id)
if err != nil {
return err
}

if num > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
}
return nil
}

// ACLSet is used to create or update an ACL entry
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
// Check for an ID
Expand Down
47 changes: 47 additions & 0 deletions consul/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,53 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
}
}

func TestSessionInvalidate_KeyDelete(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()

if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
Behavior: structs.SessionKeysDelete,
}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}

// Lock a key with the session
d := &structs.DirEntry{
Key: "/bar",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}

// Delete the node
if err := store.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v", err)
}

// Key should be deleted
_, d2, err := store.KVSGet("/bar")
if d2 != nil {
t.Fatalf("unexpected undeleted key")
}
}

func TestACLSet_Get(t *testing.T) {
store, err := testStateStore()
if err != nil {
Expand Down
Loading