diff --git a/bench/bench.json b/bench/bench.json index 492cca5fdb15..5dfb67bd9369 100644 --- a/bench/bench.json +++ b/bench/bench.json @@ -9,7 +9,7 @@ "api_key": "{{ user `do_api_key` }}", "client_id": "{{ user `do_client_id` }}", "region_id": "1", - "size_id": "61", + "size_id": "66", "image_id": "3101045", "snapshot_name": "bench-bootstrap-{{ isotime }}", "name": "bootstrap" @@ -19,7 +19,7 @@ "api_key": "{{ user `do_api_key` }}", "client_id": "{{ user `do_client_id` }}", "region_id": "1", - "size_id": "61", + "size_id": "66", "image_id": "3101045", "snapshot_name": "bench-server-{{ isotime }}", "name": "server" @@ -29,7 +29,7 @@ "api_key": "{{ user `do_api_key` }}", "client_id": "{{ user `do_client_id` }}", "region_id": "1", - "size_id": "61", + "size_id": "66", "image_id": "3101045", "snapshot_name": "bench-worker-{{ isotime }}", "name": "worker" @@ -73,8 +73,8 @@ { "type": "shell", "inline": [ - "curl https://s3.amazonaws.com/hc-ops/boom_linux_amd64 -o /usr/bin/boom", - "chmod +x /usr/bin/boom" + "curl https://s3.amazonaws.com/hc-ops/boom_linux_amd64 -o /usr/local/bin/boom", + "chmod +x /usr/local/bin/boom" ] }, { diff --git a/bench/conf/common.json b/bench/conf/common.json index 2a3fa5d6a852..78ce3bed7041 100644 --- a/bench/conf/common.json +++ b/bench/conf/common.json @@ -1,4 +1,5 @@ { "data_dir": "/var/lib/consul", + "enable_debug": true, "log_level": "info" } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index fee1b28a624f..c10649672289 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -1,15 +1,15 @@ package agent import ( + "errors" "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "github.com/hashicorp/serf/serf" "net/http" "os" "testing" "time" - "errors" ) func TestHTTPAgentServices(t *testing.T) { diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index f8b2b6a2c701..bbbeaea8d026 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -2,8 +2,8 @@ package agent import ( "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "net/http" "net/http/httptest" "os" @@ -174,7 +174,7 @@ func TestCatalogNodes_Blocking(t *testing.T) { } // Should block for a while - if time.Now().Sub(start) < 50 * time.Millisecond { + if time.Now().Sub(start) < 50*time.Millisecond { // TODO: Failing t.Fatalf("too fast") } diff --git a/command/agent/dns_test.go b/command/agent/dns_test.go index eeab068d9d5e..d4add63548d8 100644 --- a/command/agent/dns_test.go +++ b/command/agent/dns_test.go @@ -2,8 +2,8 @@ package agent import ( "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "github.com/miekg/dns" "os" "strings" diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index a4b4b9817e09..be8a993687e8 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -2,8 +2,8 @@ package agent import ( "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "net/http" "net/http/httptest" "os" diff --git a/command/agent/http.go b/command/agent/http.go index 515514ad2eca..c44c0498040b 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -93,6 +93,12 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint)) + s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate)) + s.mux.HandleFunc("/v1/session/destroy/", s.wrap(s.SessionDestroy)) + s.mux.HandleFunc("/v1/session/info/", s.wrap(s.SessionGet)) + s.mux.HandleFunc("/v1/session/node/", s.wrap(s.SessionsForNode)) + s.mux.HandleFunc("/v1/session/list", s.wrap(s.SessionList)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 60d28b6f466f..6b57693f4189 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "io" "io/ioutil" "net/http" @@ -255,3 +256,12 @@ func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 { } return uint64(val) } + +func httpTest(t *testing.T, f func(srv *HTTPServer)) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + f(srv) +} diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index 7791d1e9f3c8..2adaae3528a2 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -156,6 +156,18 @@ func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *s applyReq.Op = structs.KVSCAS } + // Check for lock acquisition + if _, ok := params["acquire"]; ok { + applyReq.DirEnt.Session = params.Get("acquire") + applyReq.Op = structs.KVSLock + } + + // Check for lock release + if _, ok := params["release"]; ok { + applyReq.DirEnt.Session = params.Get("release") + applyReq.Op = structs.KVSUnlock + } + // Check the content-length if req.ContentLength > maxKVSize { resp.WriteHeader(413) diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index 61216faafca2..867b2a01f475 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -3,8 +3,8 @@ package agent import ( "bytes" "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "net/http" "net/http/httptest" "os" @@ -339,3 +339,73 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { } } } + +func TestKVSEndpoint_AcquireRelease(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + // Acquire the lock + id := makeTestSession(t, srv) + req, err := http.NewRequest("PUT", + "/v1/kv/test?acquire="+id, bytes.NewReader(nil)) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if res := obj.(bool); !res { + t.Fatalf("should work") + } + + // Verify we have the lock + req, err = http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d := obj.(structs.DirEntries)[0] + + // Check the flags + if d.Session != id { + t.Fatalf("bad: %v", d) + } + + // Release the lock + req, err = http.NewRequest("PUT", + "/v1/kv/test?release="+id, bytes.NewReader(nil)) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if res := obj.(bool); !res { + t.Fatalf("should work") + } + + // Verify we do not have the lock + req, err = http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d = obj.(structs.DirEntries)[0] + + // Check the flags + if d.Session != "" { + t.Fatalf("bad: %v", d) + } + }) +} diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 9ce16fed31f6..655b5b2031fc 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -1,8 +1,8 @@ package agent import ( - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "os" "reflect" "testing" diff --git a/command/agent/rpc_client_test.go b/command/agent/rpc_client_test.go index 77f34632ae4c..66d48065cec3 100644 --- a/command/agent/rpc_client_test.go +++ b/command/agent/rpc_client_test.go @@ -1,15 +1,15 @@ package agent import ( + "errors" "fmt" - "github.com/hashicorp/serf/serf" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/serf/serf" "io" "net" "os" "strings" "testing" - "errors" "time" ) diff --git a/command/agent/session_endpoint.go b/command/agent/session_endpoint.go new file mode 100644 index 000000000000..a5c02b4df55a --- /dev/null +++ b/command/agent/session_endpoint.go @@ -0,0 +1,185 @@ +package agent + +import ( + "fmt" + "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/consul/structs" + "net/http" + "strings" + "time" +) + +const ( + // lockDelayMinThreshold is used to convert a numeric lock + // delay value from nanoseconds to seconds if it is below this + // threshold. Users often send a value like 5, which they assume + // is seconds, but because Go uses nanosecond granularity, ends + // up being very small. If we see a value below this threshold, + // we multply by time.Second + lockDelayMinThreshold = 1000 +) + +// sessionCreateResponse is used to wrap the session ID +type sessionCreateResponse struct { + ID string +} + +// SessionCreate is used to create a new session +func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Mandate a PUT request + if req.Method != "PUT" { + resp.WriteHeader(405) + return nil, nil + } + + // Default the session to our node + serf check + args := structs.SessionRequest{ + Op: structs.SessionCreate, + Session: structs.Session{ + Node: s.agent.config.NodeName, + Checks: []string{consul.SerfCheckID}, + LockDelay: 15 * time.Second, + }, + } + s.parseDC(req, &args.Datacenter) + + // Handle optional request body + if req.ContentLength > 0 { + if err := decodeBody(req, &args.Session, FixupLockDelay); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err))) + return nil, nil + } + } + + // Create the session, get the ID + var out string + if err := s.agent.RPC("Session.Apply", &args, &out); err != nil { + return nil, err + } + + // Format the response as a JSON object + return sessionCreateResponse{out}, nil +} + +// FixupLockDelay is used to handle parsing the JSON body to session/create +// and properly parsing out the lock delay duration value. +func FixupLockDelay(raw interface{}) error { + rawMap, ok := raw.(map[string]interface{}) + if !ok { + return nil + } + var key string + for k, _ := range rawMap { + if strings.ToLower(k) == "lockdelay" { + key = k + break + } + } + if key != "" { + val := rawMap[key] + // Convert a string value into an integer + if vStr, ok := val.(string); ok { + dur, err := time.ParseDuration(vStr) + if err != nil { + return err + } + if dur < lockDelayMinThreshold { + dur = dur * time.Second + } + rawMap[key] = dur + } + // Convert low value integers into seconds + if vNum, ok := val.(float64); ok { + dur := time.Duration(vNum) + if dur < lockDelayMinThreshold { + dur = dur * time.Second + } + rawMap[key] = dur + } + } + return nil +} + +// SessionDestroy is used to destroy an existing session +func (s *HTTPServer) SessionDestroy(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := structs.SessionRequest{ + Op: structs.SessionDestroy, + } + s.parseDC(req, &args.Datacenter) + + // Pull out the session id + args.Session.ID = strings.TrimPrefix(req.URL.Path, "/v1/session/destroy/") + if args.Session.ID == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing session")) + return nil, nil + } + + var out string + if err := s.agent.RPC("Session.Apply", &args, &out); err != nil { + return nil, err + } + return true, nil +} + +// SessionGet is used to get info for a particular session +func (s *HTTPServer) SessionGet(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := structs.SessionSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Pull out the session id + args.Session = strings.TrimPrefix(req.URL.Path, "/v1/session/info/") + if args.Session == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing session")) + return nil, nil + } + + var out structs.IndexedSessions + defer setMeta(resp, &out.QueryMeta) + if err := s.agent.RPC("Session.Get", &args, &out); err != nil { + return nil, err + } + return out.Sessions, nil +} + +// SessionList is used to list all the sessions +func (s *HTTPServer) SessionList(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + var out structs.IndexedSessions + defer setMeta(resp, &out.QueryMeta) + if err := s.agent.RPC("Session.List", &args, &out); err != nil { + return nil, err + } + return out.Sessions, nil +} + +// SessionsForNode returns all the nodes belonging to a node +func (s *HTTPServer) SessionsForNode(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := structs.NodeSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Pull out the node name + args.Node = strings.TrimPrefix(req.URL.Path, "/v1/session/node/") + if args.Node == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing node name")) + return nil, nil + } + + var out structs.IndexedSessions + defer setMeta(resp, &out.QueryMeta) + if err := s.agent.RPC("Session.NodeSessions", &args, &out); err != nil { + return nil, err + } + return out.Sessions, nil +} diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go new file mode 100644 index 000000000000..ad3d58a8225e --- /dev/null +++ b/command/agent/session_endpoint_test.go @@ -0,0 +1,188 @@ +package agent + +import ( + "bytes" + "encoding/json" + "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/consul/structs" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestSessionCreate(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + // Create a health check + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: srv.agent.config.NodeName, + Address: "127.0.0.1", + Check: &structs.HealthCheck{ + CheckID: "consul", + Node: srv.agent.config.NodeName, + Name: "consul", + ServiceID: "consul", + }, + } + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Associate session with node and 2 health checks + body := bytes.NewBuffer(nil) + enc := json.NewEncoder(body) + raw := map[string]interface{}{ + "Node": srv.agent.config.NodeName, + "Checks": []string{consul.SerfCheckID, "consul"}, + "LockDelay": "20s", + } + enc.Encode(raw) + + req, err := http.NewRequest("PUT", "/v1/session/create", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := obj.(sessionCreateResponse); !ok { + t.Fatalf("should work") + } + }) +} + +func TestFixupLockDelay(t *testing.T) { + inp := map[string]interface{}{ + "lockdelay": float64(15), + } + if err := FixupLockDelay(inp); err != nil { + t.Fatalf("err: %v", err) + } + if inp["lockdelay"] != 15*time.Second { + t.Fatalf("bad: %v", inp) + } + + inp = map[string]interface{}{ + "lockDelay": float64(15 * time.Second), + } + if err := FixupLockDelay(inp); err != nil { + t.Fatalf("err: %v", err) + } + if inp["lockDelay"] != 15*time.Second { + t.Fatalf("bad: %v", inp) + } + + inp = map[string]interface{}{ + "LockDelay": "15s", + } + if err := FixupLockDelay(inp); err != nil { + t.Fatalf("err: %v", err) + } + if inp["LockDelay"] != 15*time.Second { + t.Fatalf("bad: %v", inp) + } +} + +func makeTestSession(t *testing.T, srv *HTTPServer) string { + req, err := http.NewRequest("PUT", "/v1/session/create", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := httptest.NewRecorder() + obj, err := srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + sessResp := obj.(sessionCreateResponse) + return sessResp.ID +} + +func TestSessionDestroy(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + id := makeTestSession(t, srv) + + req, err := http.NewRequest("PUT", "/v1/session/destroy/"+id, nil) + resp := httptest.NewRecorder() + obj, err := srv.SessionDestroy(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp := obj.(bool); !resp { + t.Fatalf("should work") + } + }) +} + +func TestSessionGet(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + id := makeTestSession(t, srv) + + req, err := http.NewRequest("GET", + "/v1/session/info/"+id, nil) + resp := httptest.NewRecorder() + obj, err := srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 1 { + t.Fatalf("bad: %v", respObj) + } + }) +} + +func TestSessionList(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + var ids []string + for i := 0; i < 10; i++ { + ids = append(ids, makeTestSession(t, srv)) + } + + req, err := http.NewRequest("GET", "/v1/session/list", nil) + resp := httptest.NewRecorder() + obj, err := srv.SessionList(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 10 { + t.Fatalf("bad: %v", respObj) + } + }) +} + +func TestSessionsForNode(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + var ids []string + for i := 0; i < 10; i++ { + ids = append(ids, makeTestSession(t, srv)) + } + + req, err := http.NewRequest("GET", + "/v1/session/node/"+srv.agent.config.NodeName, nil) + resp := httptest.NewRecorder() + obj, err := srv.SessionsForNode(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 10 { + t.Fatalf("bad: %v", respObj) + } + }) +} diff --git a/command/agent/status_endpoint_test.go b/command/agent/status_endpoint_test.go index 4dd1f4e204a9..0e22eaa30f75 100644 --- a/command/agent/status_endpoint_test.go +++ b/command/agent/status_endpoint_test.go @@ -1,9 +1,9 @@ package agent import ( + "github.com/hashicorp/consul/testutil" "os" "testing" - "github.com/hashicorp/consul/testutil" ) func TestStatusLeader(t *testing.T) { diff --git a/command/agent/ui_endpoint_test.go b/command/agent/ui_endpoint_test.go index 510ff973a326..da4f7e590fc1 100644 --- a/command/agent/ui_endpoint_test.go +++ b/command/agent/ui_endpoint_test.go @@ -3,8 +3,8 @@ package agent import ( "bytes" "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "io" "io/ioutil" "net/http" diff --git a/command/force_leave_test.go b/command/force_leave_test.go index 8189d41d2aec..d297380a8601 100644 --- a/command/force_leave_test.go +++ b/command/force_leave_test.go @@ -1,13 +1,13 @@ package command import ( + "errors" "fmt" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/serf/serf" "github.com/mitchellh/cli" "strings" "testing" - "errors" ) func TestForceLeaveCommand_implements(t *testing.T) { diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 9af0b0bd2bef..233c39fd28c2 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -498,7 +498,7 @@ func TestCatalogListServices_Blocking(t *testing.T) { } // Should block at least 100ms - if time.Now().Sub(start) < 100 * time.Millisecond { + if time.Now().Sub(start) < 100*time.Millisecond { t.Fatalf("too fast") } @@ -544,7 +544,7 @@ func TestCatalogListServices_Timeout(t *testing.T) { } // Should block at least 100ms - if time.Now().Sub(start) < 100 * time.Millisecond { + if time.Now().Sub(start) < 100*time.Millisecond { // TODO: Failing t.Fatalf("too fast") } diff --git a/consul/client_test.go b/consul/client_test.go index b0b748eef102..10ab0fde7f31 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -2,8 +2,8 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "net" "os" "testing" diff --git a/consul/fsm.go b/consul/fsm.go index 22854729fd3c..41a911a56f1a 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -67,6 +67,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyDeregister(buf[1:], log.Index) case structs.KVSRequestType: return c.applyKVSOperation(buf[1:], log.Index) + case structs.SessionRequestType: + return c.applySessionOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -152,6 +154,20 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { } else { return act } + case structs.KVSLock: + act, err := c.state.KVSLock(index, &req.DirEnt) + if err != nil { + return err + } else { + return act + } + case structs.KVSUnlock: + act, err := c.state.KVSUnlock(index, &req.DirEnt) + if err != nil { + return err + } else { + return act + } default: c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op) return fmt.Errorf("Invalid KVS operation '%s'", req.Op) @@ -159,6 +175,27 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { return nil } +func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} { + var req structs.SessionRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + switch req.Op { + case structs.SessionCreate: + if err := c.state.SessionCreate(index, &req.Session); err != nil { + return err + } else { + return req.Session.ID + } + case structs.SessionDestroy: + return c.state.SessionDestroy(index, req.Session.ID) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op) + return fmt.Errorf("Invalid Session operation '%s'", req.Op) + } + return nil +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) @@ -222,6 +259,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { return err } + case structs.SessionRequestType: + var req structs.Session + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.SessionRestore(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -244,6 +290,25 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { return err } + if err := s.persistNodes(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistSessions(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistKV(sink, encoder); err != nil { + sink.Cancel() + return err + } + return nil +} + +func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, + encoder *codec.Encoder) error { // Get all the nodes nodes := s.state.Nodes() @@ -258,7 +323,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { // Register the node itself sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { - sink.Cancel() return err } @@ -268,7 +332,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { req.Service = srv sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { - sink.Cancel() return err } } @@ -280,16 +343,31 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { req.Check = check sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { - sink.Cancel() return err } } } + return nil +} - // Enable GC of the ndoes - nodes = nil +func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + sessions, err := s.state.SessionList() + if err != nil { + return err + } - // Dump the KVS entries + for _, s := range sessions { + sink.Write([]byte{byte(structs.SessionRequestType)}) + if err := encoder.Encode(s); err != nil { + return err + } + } + return nil +} + +func (s *consulSnapshot) persistKV(sink raft.SnapshotSink, + encoder *codec.Encoder) error { streamCh := make(chan interface{}, 256) errorCh := make(chan error) go func() { @@ -298,25 +376,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { } }() -OUTER: for { select { case raw := <-streamCh: if raw == nil { - break OUTER + return nil } sink.Write([]byte{byte(structs.KVSRequestType)}) if err := encoder.Encode(raw); err != nil { - sink.Cancel() return err } case err := <-errorCh: - sink.Cancel() return err } } - return nil } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index ac8453bec2da..5e5d086d8478 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -326,6 +326,8 @@ func TestFSM_SnapshotRestore(t *testing.T) { Key: "/test", Value: []byte("foo"), }) + session := &structs.Session{Node: "foo"} + fsm.state.SessionCreate(9, session) // Snapshot snap, err := fsm.Snapshot() @@ -383,6 +385,15 @@ func TestFSM_SnapshotRestore(t *testing.T) { if string(d.Value) != "foo" { t.Fatalf("bad: %v", d) } + + // Verify session is restored + _, s, err := fsm.state.SessionGet(session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", d) + } } func TestFSM_KVSSet(t *testing.T) { @@ -569,3 +580,190 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { t.Fatalf("bad: %v", d) } } + +func TestFSM_SessionCreate_Destroy(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + fsm.state.EnsureCheck(2, &structs.HealthCheck{ + Node: "foo", + CheckID: "web", + Status: structs.HealthPassing, + }) + + // Create a new session + req := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + Checks: []string{"web"}, + }, + } + buf, err := structs.Encode(structs.SessionRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if err, ok := resp.(error); ok { + t.Fatalf("resp: %v", err) + } + + // Get the session + id := resp.(string) + _, session, err := fsm.state.SessionGet(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if session == nil { + t.Fatalf("missing") + } + + // Verify the session + if session.ID != id { + t.Fatalf("bad: %v", *session) + } + if session.Node != "foo" { + t.Fatalf("bad: %v", *session) + } + if session.Checks[0] != "web" { + t.Fatalf("bad: %v", *session) + } + + // Try to destroy + destroy := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionDestroy, + Session: structs.Session{ + ID: id, + }, + } + buf, err = structs.Encode(structs.SessionRequestType, destroy) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + _, session, err = fsm.state.SessionGet(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if session != nil { + t.Fatalf("should be destroyed") + } +} + +func TestFSM_KVSLock(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + session := &structs.Session{Node: "foo"} + fsm.state.SessionCreate(2, session) + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Value: []byte("test"), + Session: session.ID, + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is locked + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } + if d.LockIndex != 1 { + t.Fatalf("bad: %v", *d) + } + if d.Session != session.ID { + t.Fatalf("bad: %v", *d) + } +} + +func TestFSM_KVSUnlock(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + session := &structs.Session{Node: "foo"} + fsm.state.SessionCreate(2, session) + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Value: []byte("test"), + Session: session.ID, + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != true { + t.Fatalf("resp: %v", resp) + } + + req = structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Value: []byte("test"), + Session: session.ID, + }, + } + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is unlocked + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } + if d.LockIndex != 1 { + t.Fatalf("bad: %v", *d) + } + if d.Session != "" { + t.Fatalf("bad: %v", *d) + } +} diff --git a/consul/health_endpoint_test.go b/consul/health_endpoint_test.go index b817f9db0bdd..8f3725ec2ff1 100644 --- a/consul/health_endpoint_test.go +++ b/consul/health_endpoint_test.go @@ -1,8 +1,8 @@ package consul import ( - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "os" "testing" ) diff --git a/consul/internal_endpoint_test.go b/consul/internal_endpoint_test.go index 9b08fe74da8f..e3c33fe92565 100644 --- a/consul/internal_endpoint_test.go +++ b/consul/internal_endpoint_test.go @@ -1,8 +1,8 @@ package consul import ( - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "os" "testing" ) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index fde952432f50..91d8f3bdeaed 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -25,6 +25,23 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { return fmt.Errorf("Must provide key") } + // If this is a lock, we must check for a lock-delay. Since lock-delay + // is based on wall-time, each peer expire the lock-delay at a slightly + // different time. This means the enforcement of lock-delay cannot be done + // after the raft log is committed as it would lead to inconsistent FSMs. + // Instead, the lock-delay must be enforced before commit. This means that + // only the wall-time of the leader node is used, preventing any inconsistencies. + if args.Op == structs.KVSLock { + state := k.srv.fsm.State() + expires := state.KVSLockDelay(args.DirEnt.Key) + if expires.After(time.Now()) { + k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v", + args.DirEnt.Key, expires) + *reply = false + return nil + } + } + // Apply the update resp, err := k.srv.raftApply(structs.KVSRequestType, args) if err != nil { diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 32ab830ea998..c4131fdcbdd0 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -1,10 +1,11 @@ package consul import ( - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "os" "testing" + "time" ) func TestKVS_Apply(t *testing.T) { @@ -224,5 +225,72 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { if dirent.Keys[2] != "/test/sub/" { t.Fatalf("Bad: %v", dirent.Keys) } +} + +func TestKVS_Apply_LockDelay(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + // Create and invalidate a session with a lock + state := s1.fsm.State() + if err := state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{ + Node: "foo", + LockDelay: 50 * time.Millisecond, + } + if err := state.SessionCreate(2, session); err != nil { + t.Fatalf("err: %v", err) + } + id := session.ID + d := &structs.DirEntry{ + Key: "test", + Session: id, + } + if ok, err := state.KVSLock(3, d); err != nil || !ok { + t.Fatalf("err: %v", err) + } + if err := state.SessionDestroy(4, id); err != nil { + t.Fatalf("err: %v", err) + } + + // Make a new session that is valid + if err := state.SessionCreate(5, session); err != nil { + t.Fatalf("err: %v", err) + } + validId := session.ID + // Make a lock request + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "test", + Session: validId, + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if out != false { + t.Fatalf("should not acquire") + } + + // Wait for lock-delay + time.Sleep(50 * time.Millisecond) + + // Should acquire + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if out != true { + t.Fatalf("should acquire") + } } diff --git a/consul/leader_test.go b/consul/leader_test.go index 8e4c4a63b101..d6aec6bd1089 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -1,13 +1,13 @@ package consul import ( + "errors" "fmt" - "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" "github.com/hashicorp/serf/serf" "os" "testing" - "errors" "time" ) diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 96c1fad883f5..c4c84b0dc9ce 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -63,6 +63,7 @@ type MDBTxn struct { readonly bool tx *mdb.Txn dbis map[string]mdb.DBI + after []func() } // Abort is used to close the transaction @@ -74,7 +75,19 @@ func (t *MDBTxn) Abort() { // Commit is used to commit a transaction func (t *MDBTxn) Commit() error { - return t.tx.Commit() + if err := t.tx.Commit(); err != nil { + return err + } + for _, f := range t.after { + f() + } + t.after = nil + return nil +} + +// Defer is used to defer a function call until a successful commit +func (t *MDBTxn) Defer(f func()) { + t.after = append(t.after, f) } type IndexFunc func(*MDBIndex, []string) string @@ -734,6 +747,19 @@ func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error { return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0) } +// SetMaxLastIndexTxn is used to set the last index within a transaction +// if it exceeds the current maximum +func (t *MDBTable) SetMaxLastIndexTxn(tx *MDBTxn, index uint64) error { + current, err := t.LastIndexTxn(tx) + if err != nil { + return err + } + if index > current { + return t.SetLastIndexTxn(tx, index) + } + return nil +} + // StartTxn is used to create a transaction that spans a list of tables func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) { var tx *MDBTxn diff --git a/consul/server.go b/consul/server.go index d873cd2b9b38..598e6bde3e11 100644 --- a/consul/server.go +++ b/consul/server.go @@ -108,6 +108,7 @@ type endpoints struct { Raft *Raft Status *Status KVS *KVS + Session *Session Internal *Internal } @@ -316,6 +317,7 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error { s.endpoints.Catalog = &Catalog{s} s.endpoints.Health = &Health{s} s.endpoints.KVS = &KVS{s} + s.endpoints.Session = &Session{s} s.endpoints.Internal = &Internal{s} // Register the handlers @@ -324,6 +326,7 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error { s.rpcServer.Register(s.endpoints.Catalog) s.rpcServer.Register(s.endpoints.Health) s.rpcServer.Register(s.endpoints.KVS) + s.rpcServer.Register(s.endpoints.Session) s.rpcServer.Register(s.endpoints.Internal) list, err := net.ListenTCP("tcp", s.config.RPCAddr) diff --git a/consul/server_test.go b/consul/server_test.go index a0687bbfdf51..dc984ba35ad2 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -1,6 +1,7 @@ package consul import ( + "errors" "fmt" "github.com/hashicorp/consul/testutil" "io/ioutil" @@ -8,7 +9,6 @@ import ( "os" "testing" "time" - "errors" ) var nextPort = 15000 @@ -293,7 +293,7 @@ func TestServer_JoinLAN_TLS(t *testing.T) { // Verify Raft has established a peer testutil.WaitForResult(func() (bool, error) { - return s1.Stats()["raft"]["num_peers"] == "1", nil + return s1.Stats()["raft"]["num_peers"] == "1", nil }, func(err error) { t.Fatalf("no peer established") }) diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go new file mode 100644 index 000000000000..069f374d1e2d --- /dev/null +++ b/consul/session_endpoint.go @@ -0,0 +1,106 @@ +package consul + +import ( + "fmt" + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "time" +) + +// Session endpoint is used to manipulate sessions for KV +type Session struct { + srv *Server +} + +// Apply is used to apply a modifying request to the data store. This should +// only be used for operations that modify the data +func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { + if done, err := s.srv.forward("Session.Apply", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "session", "apply"}, time.Now()) + + // Verify the args + if args.Session.ID == "" && args.Op == structs.SessionDestroy { + return fmt.Errorf("Must provide ID") + } + if args.Session.Node == "" && args.Op == structs.SessionCreate { + return fmt.Errorf("Must provide Node") + } + + // Apply the update + resp, err := s.srv.raftApply(structs.SessionRequestType, args) + if err != nil { + s.srv.logger.Printf("[ERR] consul.session: Apply failed: %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a string + if respString, ok := resp.(string); ok { + *reply = respString + } + return nil +} + +// Get is used to retrieve a single session +func (s *Session) Get(args *structs.SessionSpecificRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.Get", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + return s.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, + state.QueryTables("SessionGet"), + func() error { + index, session, err := state.SessionGet(args.Session) + reply.Index = index + if session != nil { + reply.Sessions = structs.Sessions{session} + } + return err + }) +} + +// List is used to list all the active sessions +func (s *Session) List(args *structs.DCSpecificRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.List", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + return s.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, + state.QueryTables("SessionList"), + func() error { + var err error + reply.Index, reply.Sessions, err = state.SessionList() + return err + }) +} + +// NodeSessions is used to get all the sessions for a particular node +func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.NodeSessions", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + return s.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, + state.QueryTables("NodeSessions"), + func() error { + var err error + reply.Index, reply.Sessions, err = state.NodeSessions(args.Node) + return err + }) +} diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go new file mode 100644 index 000000000000..8b59d7a28c00 --- /dev/null +++ b/consul/session_endpoint_test.go @@ -0,0 +1,212 @@ +package consul + +import ( + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "os" + "testing" +) + +func TestSessionEndpoint_Apply(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + // Just add a node + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + }, + } + var out string + if err := client.Call("Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + id := out + + // Verify + state := s1.fsm.State() + _, s, err := state.SessionGet(out) + if err != nil { + t.Fatalf("err: %v", err) + } + if s == nil { + t.Fatalf("should not be nil") + } + + // Do a delete + arg.Op = structs.SessionDestroy + arg.Session.ID = out + if err := client.Call("Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify + _, s, err = state.SessionGet(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if s != nil { + t.Fatalf("bad: %v", s) + } +} + +func TestSessionEndpoint_Get(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + }, + } + var out string + if err := client.Call("Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + getR := structs.SessionSpecificRequest{ + Datacenter: "dc1", + Session: out, + } + var sessions structs.IndexedSessions + if err := client.Call("Session.Get", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + + if sessions.Index == 0 { + t.Fatalf("Bad: %v", sessions) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("Bad: %v", sessions) + } + s := sessions.Sessions[0] + if s.ID != out { + t.Fatalf("bad: %v", s) + } +} + +func TestSessionEndpoint_List(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + ids := []string{} + for i := 0; i < 5; i++ { + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + }, + } + var out string + if err := client.Call("Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + ids = append(ids, out) + } + + getR := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var sessions structs.IndexedSessions + if err := client.Call("Session.List", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + + if sessions.Index == 0 { + t.Fatalf("Bad: %v", sessions) + } + if len(sessions.Sessions) != 5 { + t.Fatalf("Bad: %v", sessions.Sessions) + } + for i := 0; i < len(sessions.Sessions); i++ { + s := sessions.Sessions[i] + if !strContains(ids, s.ID) { + t.Fatalf("bad: %v", s) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + } +} + +func TestSessionEndpoint_NodeSessions(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureNode(1, structs.Node{"bar", "127.0.0.1"}) + ids := []string{} + for i := 0; i < 10; i++ { + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "bar", + }, + } + if i < 5 { + arg.Session.Node = "foo" + } + var out string + if err := client.Call("Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if i < 5 { + ids = append(ids, out) + } + } + + getR := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: "foo", + } + var sessions structs.IndexedSessions + if err := client.Call("Session.NodeSessions", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + + if sessions.Index == 0 { + t.Fatalf("Bad: %v", sessions) + } + if len(sessions.Sessions) != 5 { + t.Fatalf("Bad: %v", sessions.Sessions) + } + for i := 0; i < len(sessions.Sessions); i++ { + s := sessions.Sessions[i] + if !strContains(ids, s.ID) { + t.Fatalf("bad: %v", s) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + } +} diff --git a/consul/state_store.go b/consul/state_store.go index a5e99162eab7..1b137f33ad27 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -10,6 +10,8 @@ import ( "os" "runtime" "strings" + "sync" + "time" ) const ( @@ -17,10 +19,23 @@ const ( dbServices = "services" dbChecks = "checks" dbKVS = "kvs" + dbSessions = "sessions" + dbSessionChecks = "sessionChecks" dbMaxMapSize32bit uint64 = 512 * 1024 * 1024 // 512MB maximum size dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size ) +// kvMode is used internally to control which type of set +// operation we are performing +type kvMode int + +const ( + kvSet kvMode = iota + kvCAS + kvLock + kvUnlock +) + // The StateStore is responsible for maintaining all the Consul // state. It is manipulated by the FSM which maintains consistency // through the use of Raft. The goals of the StateStore are to provide @@ -29,16 +44,34 @@ const ( // implementation uses the Lightning Memory-Mapped Database (MDB). // This gives us Multi-Version Concurrency Control for "free" type StateStore struct { - logger *log.Logger - path string - env *mdb.Env - nodeTable *MDBTable - serviceTable *MDBTable - checkTable *MDBTable - kvsTable *MDBTable - tables MDBTables - watch map[*MDBTable]*NotifyGroup - queryTables map[string]MDBTables + logger *log.Logger + path string + env *mdb.Env + nodeTable *MDBTable + serviceTable *MDBTable + checkTable *MDBTable + kvsTable *MDBTable + sessionTable *MDBTable + sessionCheckTable *MDBTable + tables MDBTables + watch map[*MDBTable]*NotifyGroup + queryTables map[string]MDBTables + + // lockDelay is used to mark certain locks as unacquirable. + // When a lock is forcefully released (failing health + // check, destroyed session, etc), it is subject to the LockDelay + // impossed by the session. This prevents another session from + // acquiring the lock for some period of time as a protection against + // split-brains. This is inspired by the lock-delay in Chubby. + // Because this relies on wall-time, we cannot assume all peers + // perceive time as flowing uniformly. This means KVSLock MUST ignore + // lockDelay, since the lockDelay may have expired on the leader, + // but not on the follower. Rejecting the lock could result in + // inconsistencies in the FSMs due to the rate time progresses. Instead, + // only the opinion of the leader is respected, and the Raft log + // is never questioned. + lockDelay map[string]time.Time + lockDelayLock sync.RWMutex } // StateSnapshot is used to provide a point-in-time snapshot @@ -49,6 +82,15 @@ type StateSnapshot struct { lastIndex uint64 } +// sessionCheck is used to create a many-to-one table such +// that each check registered by a session can be mapped back +// to the session row. +type sessionCheck struct { + Node string + CheckID string + Session string +} + // Close is used to abort the transaction and allow for cleanup func (s *StateSnapshot) Close() error { s.tx.Abort() @@ -70,10 +112,11 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { } s := &StateStore{ - logger: log.New(logOutput, "", log.LstdFlags), - path: path, - env: env, - watch: make(map[*MDBTable]*NotifyGroup), + logger: log.New(logOutput, "", log.LstdFlags), + path: path, + env: env, + watch: make(map[*MDBTable]*NotifyGroup), + lockDelay: make(map[string]time.Time), } // Ensure we can initialize @@ -209,6 +252,10 @@ func (s *StateStore) initialize() error { Fields: []string{"Key"}, IdxFunc: DefaultIndexPrefixFunc, }, + "session": &MDBIndex{ + AllowBlank: true, + Fields: []string{"Session"}, + }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) @@ -219,8 +266,47 @@ func (s *StateStore) initialize() error { }, } + s.sessionTable = &MDBTable{ + Name: dbSessions, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"ID"}, + }, + "node": &MDBIndex{ + AllowBlank: true, + Fields: []string{"Node"}, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.Session) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + + s.sessionCheckTable = &MDBTable{ + Name: dbSessionChecks, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Node", "CheckID", "Session"}, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(sessionCheck) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + // Store the set of tables - s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable} + s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, + s.kvsTable, s.sessionTable, s.sessionCheckTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder @@ -247,6 +333,9 @@ func (s *StateStore) initialize() error { "KVSGet": MDBTables{s.kvsTable}, "KVSList": MDBTables{s.kvsTable}, "KVSListKeys": MDBTables{s.kvsTable}, + "SessionGet": MDBTables{s.sessionTable}, + "SessionList": MDBTables{s.sessionTable}, + "NodeSessions": MDBTables{s.sessionTable}, } return nil } @@ -278,7 +367,7 @@ func (s *StateStore) EnsureNode(index uint64, node structs.Node) error { if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.nodeTable].Notify() + tx.Defer(func() { s.watch[s.nodeTable].Notify() }) return tx.Commit() } @@ -311,8 +400,7 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes) { // EnsureService is used to ensure a given node exposes a service func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error { - tables := MDBTables{s.nodeTable, s.serviceTable} - tx, err := tables.StartTxn(false) + tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } @@ -343,7 +431,7 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.serviceTable].Notify() + tx.Defer(func() { s.watch[s.serviceTable].Notify() }) return tx.Commit() } @@ -406,8 +494,7 @@ func (s *StateStore) parseNodeServices(tables MDBTables, tx *MDBTxn, name string // DeleteNodeService is used to delete a node service func (s *StateStore) DeleteNodeService(index uint64, node, id string) error { - tables := MDBTables{s.serviceTable, s.checkTable} - tx, err := tables.StartTxn(false) + tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } @@ -419,35 +506,52 @@ func (s *StateStore) DeleteNodeService(index uint64, node, id string) error { if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.serviceTable].Notify() + tx.Defer(func() { s.watch[s.serviceTable].Notify() }) + } + + // Invalidate any sessions using these checks + checks, err := s.checkTable.GetTxn(tx, "node", node, id) + if err != nil { + return err + } + for _, c := range checks { + check := c.(*structs.HealthCheck) + if err := s.invalidateCheck(index, tx, node, check.CheckID); err != nil { + return err + } } + if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil { return err } else if n > 0 { if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.checkTable].Notify() + tx.Defer(func() { s.watch[s.checkTable].Notify() }) } return tx.Commit() } // DeleteNode is used to delete a node and all it's services func (s *StateStore) DeleteNode(index uint64, node string) error { - tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable} - tx, err := tables.StartTxn(false) + tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() + // Invalidate any sessions held by the node + if err := s.invalidateNode(index, tx, node); err != nil { + return err + } + if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil { return err } else if n > 0 { if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.serviceTable].Notify() + tx.Defer(func() { s.watch[s.serviceTable].Notify() }) } if n, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil { return err @@ -455,7 +559,7 @@ func (s *StateStore) DeleteNode(index uint64, node string) error { if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.checkTable].Notify() + tx.Defer(func() { s.watch[s.checkTable].Notify() }) } if n, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil { return err @@ -463,7 +567,7 @@ func (s *StateStore) DeleteNode(index uint64, node string) error { if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.nodeTable].Notify() + tx.Defer(func() { s.watch[s.nodeTable].Notify() }) } return tx.Commit() } @@ -578,8 +682,7 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error } // Start the txn - tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable} - tx, err := tables.StartTxn(false) + tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } @@ -608,6 +711,14 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error check.ServiceName = srv.ServiceName } + // Invalidate any sessions if status is critical + if check.Status == structs.HealthCritical { + err := s.invalidateCheck(index, tx, check.Node, check.CheckID) + if err != nil { + return err + } + } + // Ensure the check is set if err := s.checkTable.InsertTxn(tx, check); err != nil { return err @@ -615,25 +726,30 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.checkTable].Notify() + tx.Defer(func() { s.watch[s.checkTable].Notify() }) return tx.Commit() } // DeleteNodeCheck is used to delete a node health check func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error { - tx, err := s.checkTable.StartTxn(false, nil) + tx, err := s.tables.StartTxn(false) if err != nil { return err } defer tx.Abort() + // Invalidate any sessions held by this check + if err := s.invalidateCheck(index, tx, node, id); err != nil { + return err + } + if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil { return err } else if n > 0 { if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.checkTable].Notify() + tx.Defer(func() { s.watch[s.checkTable].Notify() }) } return tx.Commit() } @@ -837,35 +953,8 @@ func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) str // KVSSet is used to create or update a KV entry func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { - // Start a new txn - tx, err := s.kvsTable.StartTxn(false, nil) - if err != nil { - return err - } - defer tx.Abort() - - // Get the existing node - res, err := s.kvsTable.GetTxn(tx, "id", d.Key) - if err != nil { - return err - } - - // Set the create and modify times - if len(res) == 0 { - d.CreateIndex = index - } else { - d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex - } - d.ModifyIndex = index - - if err := s.kvsTable.InsertTxn(tx, d); err != nil { - return err - } - if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { - return err - } - defer s.watch[s.kvsTable].Notify() - return tx.Commit() + _, err := s.kvsSet(index, d, kvSet) + return err } // KVSRestore is used to restore a DirEntry. It should only be used when @@ -986,15 +1075,44 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } - defer s.watch[s.kvsTable].Notify() + tx.Defer(func() { s.watch[s.kvsTable].Notify() }) } return tx.Commit() } // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { + return s.kvsSet(index, d, kvCAS) +} + +// KVSLock works like KVSSet but only writes if the lock can be acquired +func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) { + return s.kvsSet(index, d, kvLock) +} + +// KVSUnlock works like KVSSet but only writes if the lock can be unlocked +func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) { + return s.kvsSet(index, d, kvUnlock) +} + +// KVSLockDelay returns the expiration time of a key lock delay. A key may +// have a lock delay if it was unlocked due to a session invalidation instead +// of a graceful unlock. This must be checked on the leader node, and not in +// KVSLock due to the variability of clocks. +func (s *StateStore) KVSLockDelay(key string) time.Time { + s.lockDelayLock.RLock() + expires := s.lockDelay[key] + s.lockDelayLock.RUnlock() + return expires +} + +// kvsSet is the internal setter +func (s *StateStore) kvsSet( + index uint64, + d *structs.DirEntry, + mode kvMode) (bool, error) { // Start a new txn - tx, err := s.kvsTable.StartTxn(false, nil) + tx, err := s.tables.StartTxn(false) if err != nil { return false, err } @@ -1015,10 +1133,51 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er // Use the ModifyIndex as the constraint. A modify of time of 0 // means we are doing a set-if-not-exists, while any other value // means we expect that modify time. - if d.ModifyIndex == 0 && exist != nil { - return false, nil - } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { - return false, nil + if mode == kvCAS { + if d.ModifyIndex == 0 && exist != nil { + return false, nil + } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { + return false, nil + } + } + + // If attempting to lock, check this is possible + if mode == kvLock { + // Verify we have a session + if d.Session == "" { + return false, fmt.Errorf("Missing session") + } + + // Bail if it is already locked + if exist != nil && exist.Session != "" { + return false, nil + } + + // Verify the session exists + res, err := s.sessionTable.GetTxn(tx, "id", d.Session) + if err != nil { + return false, err + } + if len(res) == 0 { + return false, fmt.Errorf("Invalid session") + } + + // Update the lock index + if exist != nil { + exist.LockIndex++ + exist.Session = d.Session + } else { + d.LockIndex = 1 + } + } + + // If attempting to unlock, verify the key exists and is held + if mode == kvUnlock { + if exist == nil || exist.Session != d.Session { + return false, nil + } + // Clear the session to unlock + exist.Session = "" } // Set the create and modify times @@ -1026,6 +1185,9 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er d.CreateIndex = index } else { d.CreateIndex = exist.CreateIndex + d.LockIndex = exist.LockIndex + d.Session = exist.Session + } d.ModifyIndex = index @@ -1035,10 +1197,281 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return false, err } - defer s.watch[s.kvsTable].Notify() + tx.Defer(func() { s.watch[s.kvsTable].Notify() }) return true, tx.Commit() } +// SessionCreate is used to create a new session. The +// ID will be populated on a successful return +func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { + // Assign the create index + session.CreateIndex = index + + // Start the transaction + tx, err := s.tables.StartTxn(false) + if err != nil { + panic(fmt.Errorf("Failed to start txn: %v", err)) + } + defer tx.Abort() + + // Verify that the node exists + res, err := s.nodeTable.GetTxn(tx, "id", session.Node) + if err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("Missing node registration") + } + + // Verify that the checks exist and are not critical + for _, checkId := range session.Checks { + res, err := s.checkTable.GetTxn(tx, "id", session.Node, checkId) + if err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("Missing check '%s' registration", checkId) + } + chk := res[0].(*structs.HealthCheck) + if chk.Status == structs.HealthCritical { + return fmt.Errorf("Check '%s' is in %s state", checkId, chk.Status) + } + } + + // Generate a new session ID, verify uniqueness + session.ID = generateUUID() + for { + res, err = s.sessionTable.GetTxn(tx, "id", session.ID) + if err != nil { + return err + } + // Quit if this ID is unique + if len(res) == 0 { + break + } + } + + // Insert the session + if err := s.sessionTable.InsertTxn(tx, session); err != nil { + return err + } + + // Insert the check mappings + sCheck := sessionCheck{Node: session.Node, Session: session.ID} + for _, checkID := range session.Checks { + sCheck.CheckID = checkID + if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil { + return err + } + } + + // Trigger the update notifications + if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + tx.Defer(func() { s.watch[s.sessionTable].Notify() }) + return tx.Commit() +} + +// SessionRestore is used to restore a session. It should only be used when +// doing a restore, otherwise SessionCreate should be used. +func (s *StateStore) SessionRestore(session *structs.Session) error { + // Start the transaction + tx, err := s.tables.StartTxn(false) + if err != nil { + panic(fmt.Errorf("Failed to start txn: %v", err)) + } + defer tx.Abort() + + // Insert the session + if err := s.sessionTable.InsertTxn(tx, session); err != nil { + return err + } + + // Insert the check mappings + sCheck := sessionCheck{Node: session.Node, Session: session.ID} + for _, checkID := range session.Checks { + sCheck.CheckID = checkID + if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil { + return err + } + } + + // Trigger the update notifications + index := session.CreateIndex + if err := s.sessionTable.SetMaxLastIndexTxn(tx, index); err != nil { + return err + } + tx.Defer(func() { s.watch[s.sessionTable].Notify() }) + return tx.Commit() +} + +// SessionGet is used to get a session entry +func (s *StateStore) SessionGet(id string) (uint64, *structs.Session, error) { + idx, res, err := s.sessionTable.Get("id", id) + var d *structs.Session + if len(res) > 0 { + d = res[0].(*structs.Session) + } + return idx, d, err +} + +// SessionList is used to list all the open sessions +func (s *StateStore) SessionList() (uint64, []*structs.Session, error) { + idx, res, err := s.sessionTable.Get("id") + out := make([]*structs.Session, len(res)) + for i, raw := range res { + out[i] = raw.(*structs.Session) + } + return idx, out, err +} + +// NodeSessions is used to list all the open sessions for a node +func (s *StateStore) NodeSessions(node string) (uint64, []*structs.Session, error) { + idx, res, err := s.sessionTable.Get("node", node) + out := make([]*structs.Session, len(res)) + for i, raw := range res { + out[i] = raw.(*structs.Session) + } + return idx, out, err +} + +// SessionDelete is used to destroy a session. +func (s *StateStore) SessionDestroy(index uint64, id string) error { + tx, err := s.tables.StartTxn(false) + if err != nil { + panic(fmt.Errorf("Failed to start txn: %v", err)) + } + defer tx.Abort() + + if err := s.invalidateSession(index, tx, id); err != nil { + return err + } + return tx.Commit() +} + +// invalideNode is used to invalide all sessions belonging to a node +// All tables should be locked in the tx. +func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error { + sessions, err := s.sessionTable.GetTxn(tx, "node", node) + if err != nil { + return err + } + for _, sess := range sessions { + session := sess.(*structs.Session).ID + if err := s.invalidateSession(index, tx, session); err != nil { + return err + } + } + return nil +} + +// invalidateCheck is used to invalide all sessions belonging to a check +// All tables should be locked in the tx. +func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check string) error { + sessionChecks, err := s.sessionCheckTable.GetTxn(tx, "id", node, check) + if err != nil { + return err + } + for _, sc := range sessionChecks { + session := sc.(*sessionCheck).Session + if err := s.invalidateSession(index, tx, session); err != nil { + return err + } + } + return nil +} + +// invalidateSession is used to invalide a session within a given txn +// All tables should be locked in the tx. +func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) error { + // Get the session + res, err := s.sessionTable.GetTxn(tx, "id", id) + if err != nil { + return err + } + + // Quit if this session does not exist + if len(res) == 0 { + return nil + } + session := res[0].(*structs.Session) + + // Enforce the MaxLockDelay + delay := session.LockDelay + if delay > structs.MaxLockDelay { + delay = structs.MaxLockDelay + } + + // Invalidate any held locks + if err := s.invalidateLocks(index, tx, delay, id); err != nil { + return err + } + + // Nuke the session + if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil { + return err + } + + // Delete the check mappings + for _, checkID := range session.Checks { + if _, err := s.sessionCheckTable.DeleteTxn(tx, "id", + session.Node, checkID, id); err != nil { + return err + } + } + + // Trigger the update notifications + if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + tx.Defer(func() { s.watch[s.sessionTable].Notify() }) + return nil +} + +// invalidateLocks is used to invalidate all the locks held by a session +// within a given txn. All tables should be locked in the tx. +func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, + lockDelay time.Duration, id string) error { + pairs, err := s.kvsTable.GetTxn(tx, "session", id) + if err != nil { + return err + } + + var expires time.Time + if lockDelay > 0 { + s.lockDelayLock.Lock() + defer s.lockDelayLock.Unlock() + expires = time.Now().Add(lockDelay) + } + + for _, pair := range pairs { + kv := pair.(*structs.DirEntry) + kv.Session = "" // Clear the lock + kv.ModifyIndex = index // Update the modified time + if err := s.kvsTable.InsertTxn(tx, kv); err != nil { + return err + } + // If there is a lock delay, prevent acquisition + // for at least lockDelay period + if lockDelay > 0 { + s.lockDelay[kv.Key] = expires + time.AfterFunc(lockDelay, func() { + s.lockDelayLock.Lock() + delete(s.lockDelay, kv.Key) + s.lockDelayLock.Unlock() + }) + } + } + if len(pairs) > 0 { + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + } + return nil +} + // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables @@ -1102,3 +1535,13 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { return s.store.kvsTable.StreamTxn(stream, s.tx, "id") } + +// SessionList is used to list all the open sessions +func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { + res, err := s.store.sessionTable.GetTxn(s.tx, "id") + out := make([]*structs.Session, len(res)) + for i, raw := range res { + out[i] = raw.(*structs.Session) + } + return out, err +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index cdf259164e3c..308335826c92 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -6,6 +6,7 @@ import ( "reflect" "sort" "testing" + "time" ) func testStateStore() (*StateStore, error) { @@ -636,6 +637,21 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("err: %v", err) } + // Add some sessions + session := &structs.Session{Node: "foo"} + if err := store.SessionCreate(16, session); err != nil { + t.Fatalf("err: %v", err) + } + + session = &structs.Session{Node: "bar"} + if err := store.SessionCreate(17, session); err != nil { + t.Fatalf("err: %v", err) + } + d.Session = session.ID + if ok, err := store.KVSLock(18, d); err != nil || !ok { + t.Fatalf("err: %v", err) + } + // Take a snapshot snap, err := store.Snapshot() if err != nil { @@ -644,7 +660,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 15 { + if idx := snap.LastIndex(); idx != 18 { t.Fatalf("bad: %v", idx) } @@ -699,14 +715,23 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing KVS entries!") } + // Check there are 2 sessions + sessions, err := snap.SessionList() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions) != 2 { + t.Fatalf("missing sessions") + } + // Make some changes! - if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { + if err := store.EnsureService(19, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService(15, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { + if err := store.EnsureService(20, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode(16, structs.Node{"baz", "127.0.0.3"}); err != nil { + if err := store.EnsureNode(21, structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } checkAfter := &structs.HealthCheck{ @@ -716,12 +741,12 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthCritical, ServiceID: "db", } - if err := store.EnsureCheck(17, checkAfter); err != nil { - t.Fatalf("err: %v") + if err := store.EnsureCheck(22, checkAfter); err != nil { + t.Fatalf("err: %v", err) } - if err := store.KVSDelete(18, "/web/a"); err != nil { - t.Fatalf("err: %v") + if err := store.KVSDelete(23, "/web/b"); err != nil { + t.Fatalf("err: %v", err) } // Check snapshot has old values @@ -773,6 +798,15 @@ func TestStoreSnapshot(t *testing.T) { if len(ents) != 2 { t.Fatalf("missing KVS entries!") } + + // Check there are 2 sessions + sessions, err = snap.SessionList() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions) != 2 { + t.Fatalf("missing sessions") + } } func TestEnsureCheck(t *testing.T) { @@ -1561,3 +1595,511 @@ func TestKVSDeleteTree(t *testing.T) { t.Fatalf("bad: %v", ents) } } + +func TestSessionCreate(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + check := &structs.HealthCheck{ + Node: "foo", + CheckID: "bar", + Status: structs.HealthPassing, + } + if err := store.EnsureCheck(13, check); err != nil { + t.Fatalf("err: %v") + } + + session := &structs.Session{ + Node: "foo", + Checks: []string{"bar"}, + } + + if err := store.SessionCreate(1000, session); err != nil { + t.Fatalf("err: %v", err) + } + + if session.ID == "" { + t.Fatalf("bad: %v", session) + } + + if session.CreateIndex != 1000 { + t.Fatalf("bad: %v", session) + } +} + +func TestSessionCreate_Invalid(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // No node registered + session := &structs.Session{ + Node: "foo", + Checks: []string{"bar"}, + } + if err := store.SessionCreate(1000, session); err.Error() != "Missing node registration" { + t.Fatalf("err: %v", err) + } + + // Check not registered + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + if err := store.SessionCreate(1000, session); err.Error() != "Missing check 'bar' registration" { + t.Fatalf("err: %v", err) + } + + // Unhealthy check + check := &structs.HealthCheck{ + Node: "foo", + CheckID: "bar", + Status: structs.HealthCritical, + } + if err := store.EnsureCheck(13, check); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.SessionCreate(1000, session); err.Error() != "Check 'bar' is in critical state" { + t.Fatalf("err: %v", err) + } +} + +func TestSession_Lookups(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create a session + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{ + Node: "foo", + } + if err := store.SessionCreate(1000, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup by ID + idx, s2, err := store.SessionGet(session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if !reflect.DeepEqual(s2, session) { + t.Fatalf("bad: %v", s2) + } + + // Create many sessions + ids := []string{session.ID} + for i := 0; i < 10; i++ { + session := &structs.Session{ + Node: "foo", + } + if err := store.SessionCreate(uint64(1000+i), session); err != nil { + t.Fatalf("err: %v", err) + } + ids = append(ids, session.ID) + } + + // List all + idx, all, err := store.SessionList() + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1009 { + t.Fatalf("bad: %v", idx) + } + + // Retrieve the ids + var out []string + for _, s := range all { + out = append(out, s.ID) + } + + sort.Strings(ids) + sort.Strings(out) + if !reflect.DeepEqual(ids, out) { + t.Fatalf("bad: %v %v", ids, out) + } + + // List by node + idx, nodes, err := store.NodeSessions("foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1009 { + t.Fatalf("bad: %v", idx) + } + + // Check again for the node list + out = nil + for _, s := range nodes { + out = append(out, s.ID) + } + sort.Strings(out) + if !reflect.DeepEqual(ids, out) { + t.Fatalf("bad: %v %v", ids, out) + } +} + +func TestSessionInvalidate_CriticalHealthCheck(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + check := &structs.HealthCheck{ + Node: "foo", + CheckID: "bar", + Status: structs.HealthPassing, + } + if err := store.EnsureCheck(13, check); err != nil { + t.Fatalf("err: %v") + } + + session := &structs.Session{ + Node: "foo", + Checks: []string{"bar"}, + } + if err := store.SessionCreate(14, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Invalidate the check + check.Status = structs.HealthCritical + if err := store.EnsureCheck(15, check); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup by ID, should be nil + _, s2, err := store.SessionGet(session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if s2 != nil { + t.Fatalf("session should be invalidated") + } +} + +func TestSessionInvalidate_DeleteHealthCheck(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + check := &structs.HealthCheck{ + Node: "foo", + CheckID: "bar", + Status: structs.HealthPassing, + } + if err := store.EnsureCheck(13, check); err != nil { + t.Fatalf("err: %v") + } + + session := &structs.Session{ + Node: "foo", + Checks: []string{"bar"}, + } + if err := store.SessionCreate(14, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the check + if err := store.DeleteNodeCheck(15, "foo", "bar"); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup by ID, should be nil + _, s2, err := store.SessionGet(session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if s2 != nil { + t.Fatalf("session should be invalidated") + } +} + +func TestSessionInvalidate_DeleteNode(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + + session := &structs.Session{ + Node: "foo", + } + if err := store.SessionCreate(14, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the node + if err := store.DeleteNode(15, "foo"); err != nil { + t.Fatalf("err: %v") + } + + // Lookup by ID, should be nil + _, s2, err := store.SessionGet(session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if s2 != nil { + t.Fatalf("session should be invalidated") + } +} + +func TestSessionInvalidate_DeleteNodeService(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", nil, 5000}); err != nil { + t.Fatalf("err: %v", err) + } + check := &structs.HealthCheck{ + Node: "foo", + CheckID: "api", + Name: "Can connect", + Status: structs.HealthPassing, + ServiceID: "api", + } + if err := store.EnsureCheck(13, check); err != nil { + t.Fatalf("err: %v") + } + + session := &structs.Session{ + Node: "foo", + Checks: []string{"api"}, + } + if err := store.SessionCreate(14, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Should invalidate the session + if err := store.DeleteNodeService(15, "foo", "api"); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup by ID, should be nil + _, s2, err := store.SessionGet(session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if s2 != nil { + t.Fatalf("session should be invalidated") + } +} + +func TestKVSLock(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{Node: "foo"} + if err := store.SessionCreate(4, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Lock with a non-existing keys should work + d := &structs.DirEntry{ + Key: "/foo", + Flags: 42, + Value: []byte("test"), + Session: session.ID, + } + ok, err := store.KVSLock(5, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + if d.LockIndex != 1 { + t.Fatalf("bad: %v", d) + } + + // Re-locking should fail + ok, err = store.KVSLock(6, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected fail") + } + + // Set a normal key + k1 := &structs.DirEntry{ + Key: "/bar", + Flags: 0, + Value: []byte("asdf"), + } + if err := store.KVSSet(7, k1); err != nil { + t.Fatalf("err: %v", err) + } + + // Should acquire the lock + k1.Session = session.ID + ok, err = store.KVSLock(8, k1) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + + // Re-acquire should fail + ok, err = store.KVSLock(9, k1) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected fail") + } + +} + +func TestKVSUnlock(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{Node: "foo"} + if err := store.SessionCreate(4, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Unlock with a non-existing keys should fail + d := &structs.DirEntry{ + Key: "/foo", + Flags: 42, + Value: []byte("test"), + Session: session.ID, + } + ok, err := store.KVSUnlock(5, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected fail") + } + + // Lock should work + d.Session = session.ID + if ok, _ := store.KVSLock(6, d); !ok { + t.Fatalf("expected lock") + } + + // Unlock should work + d.Session = session.ID + ok, err = store.KVSUnlock(7, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + + // Re-lock should work + d.Session = session.ID + if ok, err := store.KVSLock(8, d); err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("expected lock") + } + if d.LockIndex != 2 { + t.Fatalf("bad: %v", d) + } +} + +func TestSessionInvalidate_KeyUnlock(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond} + if err := store.SessionCreate(4, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Lock a key with the session + d := &structs.DirEntry{ + Key: "/foo", + Flags: 42, + Value: []byte("test"), + Session: session.ID, + } + ok, err := store.KVSLock(5, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + + // Delete the node + if err := store.DeleteNode(6, "foo"); err != nil { + t.Fatalf("err: %v") + } + + // Key should be unlocked + idx, d2, err := store.KVSGet("/foo") + if idx != 6 { + t.Fatalf("bad: %v", idx) + } + if d2.LockIndex != 1 { + t.Fatalf("bad: %v", *d2) + } + if d2.Session != "" { + t.Fatalf("bad: %v", *d2) + } + + // Key should have a lock delay + expires := store.KVSLockDelay("/foo") + if expires.Before(time.Now().Add(30 * time.Millisecond)) { + t.Fatalf("Bad: %v", expires) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 2a672565f73c..b08a1c8e46e9 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -19,6 +19,7 @@ const ( RegisterRequestType MessageType = iota DeregisterRequestType KVSRequestType + SessionRequestType ) const ( @@ -28,6 +29,12 @@ const ( HealthCritical = "critical" ) +const ( + // MaxLockDelay provides a maximum LockDelay value for + // a session. Any value above this will not be respected. + MaxLockDelay = 60 * time.Second +) + // RPCInfo is used to describe common information about query type RPCInfo interface { RequestDatacenter() string @@ -275,9 +282,11 @@ type IndexedNodeDump struct { type DirEntry struct { CreateIndex uint64 ModifyIndex uint64 + LockIndex uint64 Key string Flags uint64 Value []byte + Session string `json:",omitempty"` } type DirEntries []*DirEntry @@ -287,7 +296,9 @@ const ( KVSSet KVSOp = "set" KVSDelete = "delete" KVSDeleteTree = "delete-tree" - KVSCAS = "cas" // Check-and-set + KVSCAS = "cas" // Check-and-set + KVSLock = "lock" // Lock a key + KVSUnlock = "unlock" // Unlock a key ) // KVSRequest is used to operate on the Key-Value store @@ -335,6 +346,52 @@ type IndexedKeyList struct { QueryMeta } +// Session is used to represent an open session in the KV store. +// This issued to associate node checks with acquired locks. +type Session struct { + CreateIndex uint64 + ID string + Node string + Checks []string + LockDelay time.Duration +} +type Sessions []*Session + +type SessionOp string + +const ( + SessionCreate SessionOp = "create" + SessionDestroy = "destroy" +) + +// SessionRequest is used to operate on sessions +type SessionRequest struct { + Datacenter string + Op SessionOp // Which operation are we performing + Session Session // Which session + WriteRequest +} + +func (r *SessionRequest) RequestDatacenter() string { + return r.Datacenter +} + +// SessionSpecificRequest is used to request a session by ID +type SessionSpecificRequest struct { + Datacenter string + Session string + QueryOptions +} + +func (r *SessionSpecificRequest) RequestDatacenter() string { + return r.Datacenter +} + +type IndexedSessions struct { + Sessions Sessions + QueryMeta +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle diff --git a/consul/util.go b/consul/util.go index 977d38a04f5c..f830086c04a1 100644 --- a/consul/util.go +++ b/consul/util.go @@ -1,6 +1,7 @@ package consul import ( + crand "crypto/rand" "encoding/binary" "fmt" "github.com/hashicorp/serf/serf" @@ -160,3 +161,18 @@ func runtimeStats() map[string]string { "cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10), } } + +// generateUUID is used to generate a random UUID +func generateUUID() string { + buf := make([]byte, 16) + if _, err := crand.Read(buf); err != nil { + panic(fmt.Errorf("failed to read random bytes: %v", err)) + } + + return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x", + buf[0:4], + buf[4:6], + buf[6:8], + buf[8:10], + buf[10:16]) +} diff --git a/consul/util_test.go b/consul/util_test.go index edd73fdf69ad..589404a341e9 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -2,6 +2,7 @@ package consul import ( "github.com/hashicorp/serf/serf" + "regexp" "testing" ) @@ -75,3 +76,19 @@ func TestByteConversion(t *testing.T) { t.Fatalf("no match") } } + +func TestGenerateUUID(t *testing.T) { + prev := generateUUID() + for i := 0; i < 100; i++ { + id := generateUUID() + if prev == id { + t.Fatalf("Should get a new ID!") + } + + matched, err := regexp.MatchString( + "[\\da-f]{8}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{12}", id) + if !matched || err != nil { + t.Fatalf("expected match %s %v %s", id, matched, err) + } + } +} diff --git a/testutil/wait.go b/testutil/wait.go index e3e0e9149b1a..0bf40937ad41 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -1,9 +1,9 @@ package testutil import ( - "time" - "testing" "github.com/hashicorp/consul/consul/structs" + "testing" + "time" ) type testFn func() (bool, error) @@ -27,7 +27,7 @@ func WaitForResult(test testFn, error errorFn) { } } -type rpcFn func(string, interface {}, interface {}) error +type rpcFn func(string, interface{}, interface{}) error func WaitForLeader(t *testing.T, rpc rpcFn, dc string) structs.IndexedNodes { var out structs.IndexedNodes diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index abfc9e04f594..c220d60bdd1b 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -10,12 +10,13 @@ The main interface to Consul is a RESTful HTTP API. The API can be used for CRUD for nodes, services, checks, and configuration. The endpoints are versioned to enable changes without breaking backwards compatibility. -All endpoints fall into one of 6 categories: +All endpoints fall into one of several categories: * kv - Key/Value store * agent - Agent control * catalog - Manages nodes and services * health - Manages health checks +* session - Session manipulation * status - Consul system status * internal - Internal APIs. Purposely undocumented, subject to change. @@ -94,6 +95,8 @@ By default the datacenter of the agent is queried, however the dc can be provided using the "?dc=" query parameter. If a client wants to write to all Datacenters, one request per datacenter must be made. +### GET Method + When using the `GET` method, Consul will return the specified key, or if the "?recurse" query parameter is provided, it will return all keys with the given prefix. @@ -104,9 +107,11 @@ Each object will look like: { "CreateIndex": 100, "ModifyIndex": 200, + "LockIndex": 200, "Key": "zip", "Flags": 0, - "Value": "dGVzdA==" + "Value": "dGVzdA==", + "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e" } ] @@ -116,15 +121,36 @@ that modified this key. This index corresponds to the `X-Consul-Index` header value that is returned. A blocking query can be used to wait for a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds to the latest `ModifyIndex` and so a blocking query waits until any of the -listed keys are updated. The multiple consistency modes can be used for -`GET` requests as well. +listed keys are updated. The `LockIndex` is the last index of a successful +lock acquisition. If the lock is held, the `Session` key provides the +session that owns the lock. The `Key` is simply the full path of the entry. `Flags` are an opaque unsigned integer that can be attached to each entry. The use of this is left totally to the user. Lastly, the `Value` is a base64 key value. +It is possible to also only list keys without any values by using the +"?keys" query parameter along with a `GET` request. This will return +a list of the keys under the given prefix. The optional "?separator=" +can be used to list only up to a given separator. + +For example, listing "/web/" with a "/" seperator may return: + + [ + "/web/bar", + "/web/foo", + "/web/subdir/" + ] + +Using the key listing method may be suitable when you do not need +the values or flags, or want to implement a key-space explorer. + If no entries are found, a 404 code is returned. +This endpoint supports blocking queries and all consistency modes. + +### PUT method + When using the `PUT` method, Consul expects the request body to be the value corresponding to the key. There are a number of parameters that can be used with a PUT request: @@ -133,36 +159,33 @@ be used with a PUT request: 0 and 2^64-1. It is opaque to the user, but a client application may use it. -* ?cas=\ : This flag is used to turn the `PUT` into a **Check-And-Set** +* ?cas=\ : This flag is used to turn the `PUT` into a Check-And-Set operation. This is very useful as it allows clients to build more complex syncronization primitives on top. If the index is 0, then Consul will only put the key if it does not already exist. If the index is non-zero, then the key is only set if the index matches the `ModifyIndex` of that key. -The return value is simply either `true` or `false`. If the CAS check fails, -then `false` will be returned. +* ?acquire=\ : This flag is used to turn the `PUT` into a lock acquisition + operation. This is useful as it allows leader election to be built on top + of Consul. If the lock is not held and the session is valid, this increments + the `LockIndex` and sets the `Session` value of the key in addition to updating + the key contents. A key does not need to exist to be acquired. + +* ?release=\ : This flag is used to turn the `PUT` into a lock release + operation. This is useful when paired with "?acquire=" as it allows clients to + yield a lock. This will leave the `LockIndex` unmodified but will clear the associated + `Session` of the key. The key must be held by this session to be unlocked. + +The return value is simply either `true` or `false`. If `false` is returned, +then the update has not taken place. + +### DELETE method Lastly, the `DELETE` method can be used to delete a single key or all keys sharing a prefix. If the "?recurse" query parameter is provided, then all keys with the prefix are deleted, otherwise only the specified key. -It is possible to also only list keys without any values by using the -"?keys" query parameter along with a `GET` request. This will return -a list of the keys under the given prefix. The optional "?separator=" -can be used to list only up to a given separator. - -For example, listing "/web/" with a "/" seperator may return: - - [ - "/web/bar", - "/web/foo", - "/web/subdir/" - ] - -Using the key listing method may be suitable when you do not need -the values or flags, or want to implement a key-space explorer. - ## Agent The Agent endpoints are used to interact with a local Consul agent. Usually, @@ -805,6 +828,137 @@ It returns a JSON body like this: This endpoint supports blocking queries and all consistency modes. +## Session + +The Session endpoints are used to create, destroy and query sessions. +The following endpoints are supported: + +* /v1/session/create: Creates a new session +* /v1/session/destroy/\: Destroys a given session +* /v1/session/info/\: Queries a given session +* /v1/session/node/\: Lists sessions belonging to a node +* /v1/session/list: Lists all the active sessions + +All of the read session endpoints supports blocking queries and all consistency modes. + +### /v1/session/create + +The create endpoint is used to initialize a new session. +There is more documentation on sessions [here](/docs/internals/sessions.html). +Sessions must be associated with a node, and optionally any number of checks. +By default, the agent uses it's own node name, and provides the "serfHealth" +check, along with a 15 second lock delay. + +By default, the agent's local datacenter is used, but another datacenter +can be specified using the "?dc=" query parameter. It is not recommended +to use cross-region sessions. + +The create endpoint expects a JSON request body to be PUT. The request +body must look like: + + { + "LockDelay": "15s", + "Node": "foobar", + "Checks": ["a", "b", "c"] + } + +None of the fields are mandatory, and in fact no body needs to be PUT +if the defaults are to be used. The `LockDelay` field can be specified +as a duration string using a "s" suffix for seconds. It can also be a numeric +value. Small values are treated as seconds, and otherwise it is provided with +nanosecond granularity. + +The `Node` field must refer to a node that is already registered. By default, +the agent will use it's own name. Lastly, the `Checks` field is used to provide +a list of associated health checks. By default the "serfHealth" check is provided. +It is highly recommended that if you override this list, you include that check. + +The return code is 200 on success, along with a body like: + + {"ID":"adf4238a-882b-9ddc-4a9d-5b6758e4159e"} + +This is used to provide the ID of the newly created session. + +### /v1/session/destroy/\ + +The destroy endpoint is hit with a PUT and destroys the given session. +By default the local datacenter is used, but the "?dc=" query parameter +can be used to specify the datacenter. The session being destroyed must +be provided after the slash. + +The return code is 200 on success. + +### /v1/session/info/\ + +This endpoint is hit with a GET and returns the session information +by ID within a given datacenter. By default the datacenter of the agent is queried, +however the dc can be provided using the "?dc=" query parameter. +The session being queried must be provided after the slash. + +It returns a JSON body like this: + + [ + { + "LockDelay": 1.5e+10, + "Checks": [ + "serfHealth" + ], + "Node": "foobar", + "ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e", + "CreateIndex": 1086449 + } + ] + +If the session is not found, null is returned instead of a JSON list. +This endpoint supports blocking queries and all consistency modes. + +### /v1/session/node/\ + +This endpoint is hit with a GET and returns the active sessions +for a given node and datacenter. By default the datacenter of the agent is queried, +however the dc can be provided using the "?dc=" query parameter. +The node being queried must be provided after the slash. + +It returns a JSON body like this: + + [ + { + "LockDelay": 1.5e+10, + "Checks": [ + "serfHealth" + ], + "Node": "foobar", + "ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e", + "CreateIndex": 1086449 + }, + ... + ] + +This endpoint supports blocking queries and all consistency modes. + +### /v1/session/list + +This endpoint is hit with a GET and returns the active sessions +for a given datacenter. By default the datacenter of the agent is queried, +however the dc can be provided using the "?dc=" query parameter. + +It returns a JSON body like this: + + [ + { + "LockDelay": 1.5e+10, + "Checks": [ + "serfHealth" + ], + "Node": "foobar", + "ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e", + "CreateIndex": 1086449 + }, + ... + ] + +This endpoint supports blocking queries and all consistency modes. + ## Status The Status endpoints are used to get information about the status diff --git a/website/source/docs/guides/leader-election.html.markdown b/website/source/docs/guides/leader-election.html.markdown new file mode 100644 index 000000000000..6a2a1fc7d5c7 --- /dev/null +++ b/website/source/docs/guides/leader-election.html.markdown @@ -0,0 +1,73 @@ +--- +layout: "docs" +page_title: "Leader Election" +sidebar_current: "docs-guides-leader" +--- + +# Leader Election + +The goal of this guide is to cover how to build client-side leader election using Consul. +If you are interested in the leader election used internally to Consul, you want to +read about the [consensus protocol](/docs/internals/consensus.html) instead. + +There are a number of ways that leader election can be built, so our goal is not to +cover all the possible methods. Instead, we will focus on using Consul's support for +[sessions](/docs/internals/sessions.html), which allow us to build a system that can +gracefully handle failures. + +## Contending Nodes + +The first flow we cover is for nodes who are attempting to acquire leadership +for a given service. All nodes that are participating should agree on a given +key being used to coordinate. A good choice is simply: + + service//leader + +We will refer to this as just `key` for simplicy. + +The first step is to create a session. This is done using the /v1/session/create endpoint. +The session by default makes use of only the gossip failure detector. Additional checks +can be specified if desired. The session ID returned will be refered to as `session`. + +Create `body` to represent the local node. This can be a simple JSON object +that contains the node's name, port or any application specific information +that may be needed. + +Attempt to `acquire` the `key` by doing a `PUT`. This is something like: + + curl -X PUT -d body http://localhost:8500/v1/kv/key?acquire=session + +This will either return `true` or `false`. If `true` is returned, the lock +has been acquired and the local node is now the leader. If `false` is returned, +some other node has acquired the lock. + +All nodes now remain in an idle waiting state. In this state, we watch for changes +on `key`. This is because the lock may be released, the node may fail, etc. +The leader must also watch for changes since it's lock may be released by an operator, +or automatically released due to a false positive in the failure detector. + +Watching for changes is done by doing a blocking query against `key`. If we ever +notice that the `Session` of the `key` is blank, then there is no leader, and we should +retry acquiring the lock. Each attempt to acquire the key should be seperated by a timed +wait. This is because Consul may be enforcing a [`lock-delay`](/docs/internals/sessions.html). + +If the leader ever wishes to step down voluntarily, this should be done by simply +releasing the lock: + + curl -X PUT http://localhost:8500/v1/kv/key?release=session + +## Discovering a Leader + +The second flow is for nodes who are attempting to discover the leader +for a given servie. All nodes that are participating should agree on the key +being used to coordinate, including the contendors. This key will be referred +to as just `key`. + +Clients have a very simple role, they simply read `key` to discover who the current +leader is. If the key has no associated `Session`, then there is no leader. Otherwise, +the value of the key will provide all the application-dependent information required. + +Clients should also watch the key using a blocking query for any changes. If the leader +steps down, or fails, then the `Session` associated with the key will be cleared. When +a new leader is elected, the key value will also be updated. + diff --git a/website/source/docs/internals/sessions.html.markdown b/website/source/docs/internals/sessions.html.markdown new file mode 100644 index 000000000000..f573a568c533 --- /dev/null +++ b/website/source/docs/internals/sessions.html.markdown @@ -0,0 +1,114 @@ +--- +layout: "docs" +page_title: "Sessions" +sidebar_current: "docs-internals-sessions" +--- + +# Sessions + +Consul provides a session mechansim which can be used to build distributed locks. +Sessions act as a binding layer between nodes, health checks, and key/value data. +They are designed to provide granular locking, and are heavily inspired +by [The Chubby Lock Service for Loosely-Coupled Distributed Systems](http://research.google.com/archive/chubby.html). + +
+Advanced Topic! This page covers technical details of +the internals of Consul. You don't need to know these details to effectively +operate and use Consul. These details are documented here for those who wish +to learn about them without having to go spelunking through the source code. +
+ +## Session Design + +A session in Consul represents a contract that has very specific semantics. +When a session is constructed a node name, a list of health checks, and a +`lock-delay` are provided. The newly constructed session is provided with +a named ID which can be used to refer to it. This ID can be used with the KV +store to acquire locks, which are advisory mechanisms for mutual exclusion. +Below is a diagram showing the relationship between these components: + +![Session Architecture](/images/consul-sessions.png) + +The contract that Consul provides is that under any of the folllowing +situations the session will be *invalidated*: + +* Node is deregistered +* Any of the health checks are deregistered +* Any of the health checks go to the critical state +* Session is explicitly destroyed + +When a session is invalidated, any of the locks held in association +with the session are released, and the `ModifyIndex` of the key is +incremented. The session is also destroyed during an invalidation +and can no longer be used to acquire further locks. + +While this is a simple design, it enables a multitude of usage +patterns. By default, the [gossip based failure detector](/docs/internals/gossip.html) +is used as the associated health check. This failure detector allows +Consul to detect when a node that is holding a lock has failed, and +to automatically release the lock. This ability provides **liveness** to +Consul locks, meaning under failure the system can continue to make +progress. However, because there is no perfect failure detector, it's possible +to have a false positive (failure detected) which causes the lock to +be released even though the lock owner is still alive. This means +we are sacrificing some **safety**. + +Conversely, it is possible to create a session with no associated +health checks. This removes the possibility of a false positive, +and trades liveness for safety. You can be absolutely certain Consul +will not release the lock even if the existing owner has failed. +Since Consul APIs allow a session to be force destroyed, this allows +systems to be built that require an operator to intervene in the +case of a failure, but preclude the possibility of a split-brain. + +The final nuance is that sessions may provide a `lock-delay`. This +is a time duration, between 0 and 60 second. When a session invalidation +takes place, Consul prevents any of the previously held locks from +being re-acquired for the `lock-delay` interval; this is a safe guard +inspired by Google's Chubby. The purpose of this delay is to allow +the potentially still live leader to detect the invalidation and stop +processing requests that may lead to inconsistent state. While not a +bulletproof method, it does avoid the need to introduce sleep states +into application logic, and can help mitigate many issues. While the +default is to use a 15 second delay, clients are able to disable this +mechanism by providing a zero delay value. + +## KV Integration + +Integration between the Key/Value store and sessions are the primary +place where sessions are used. A session must be created prior to use, +and is then refered to by it's ID. + +The Key/Value API is extended to support an `acquire` and `release` operation. +The `acquire` operation acts like a Check-And-Set operation, except it +can only succeed if there is no existing lock holder. On success, there +is a normal key update, but there is also an increment to the `LockIndex`, +and the `Session` value is updated to reflect the session holding the lock. + +Once held, the lock can be released using a corresponding `release` operation, +providing the same session. Again, this acts like a Check-And-Set operations, +since the request will fail if given an invalid session. A critical note is +that the lock can be released without being the creator of the session. +This is by design, as it allows operators to intervene and force terminate +a session if necessary. As mentioned above, a session invalidation will also +cause all held locks to be released. When a lock is released, the `LockIndex`, +does not change, however the `Session` is cleared and the `ModifyIndex` increments. + +These semantics (heavily borrowed from Chubby), allow the tuple of (Key, LockIndex, Session) +to act as a unique "sequencer". This `sequencer` can be passed around and used +to verify if the request belongs to the current lock holder. Because the `LockIndex` +is incremented on each `acquire`, even if the same session re-acquires a lock, +the `sequencer` will be able to detect a stale request. Similarly, if a session is +invalided, the Session corresponding to the given `LockIndex` will be blank. + +To make clear, this locking system is purely *advisory*. There is no enforcement +that clients must acquire a lock to perform any operation. Any client can +read, write, and delete a key without owning the corresponding lock. It is not +the goal of Consul to protect against misbehaving clients. + +## Leader Election + +The primitives provided by sessions and the locking mechanisms of the KV +store can be used to build client-side leader election algorithms. +These are covered in more detail in the [Leader Election guide](/docs/guides/leader-election.html). + diff --git a/website/source/images/consul-sessions.png b/website/source/images/consul-sessions.png new file mode 100644 index 000000000000..babd143f75c1 Binary files /dev/null and b/website/source/images/consul-sessions.png differ diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 17fa286a0309..3d6c84808312 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -34,6 +34,10 @@ Gossip Protocol + > + Sessions + + > Security Model @@ -140,6 +144,10 @@ External Services + > + Leader Election + + > Multiple Datacenters