diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index bb7d96d0bd87..559593f8e9f4 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -57,39 +57,41 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ return out, nil } -func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Setup the request args := structs.DCSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } var out structs.IndexedNodes + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.Nodes, nil + return out.Nodes, nil } -func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.DCSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } var out structs.IndexedServices + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.Services, nil + return out.Services, nil } -func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } // Check for a tag @@ -104,22 +106,23 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedServiceNodes + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.ServiceNodes, nil + return out.ServiceNodes, nil } -func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default Datacenter args := structs.NodeSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } // Pull out the node name @@ -127,13 +130,14 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedNodeServices + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.NodeServices, nil + return out.NodeServices, nil } diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 573ddf88638b..2d6dc39d4ea5 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/consul/structs" "net/http" + "net/http/httptest" "os" "testing" "time" @@ -115,14 +116,14 @@ func TestCatalogNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + // Verify an index is set + assertIndex(t, resp) nodes := obj.(structs.Nodes) if len(nodes) != 2 { @@ -170,7 +171,8 @@ func TestCatalogNodes_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } @@ -180,7 +182,7 @@ func TestCatalogNodes_Blocking(t *testing.T) { t.Fatalf("too fast") } - if idx <= out.Index { + if idx := getIndex(t, resp); idx <= out.Index { t.Fatalf("bad: %v", idx) } @@ -218,14 +220,13 @@ func TestCatalogServices(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogServices(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) services := obj.(structs.Services) if len(services) != 2 { @@ -262,14 +263,13 @@ func TestCatalogServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogServiceNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if len(nodes) != 1 { @@ -306,14 +306,12 @@ func TestCatalogNodeServices(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogNodeServices(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodeServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) services := obj.(*structs.NodeServices) if len(services.Services) != 1 { diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index 3520cc2a061f..e77444e39561 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -6,11 +6,11 @@ import ( "strings" ) -func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ChecksInStateRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } // Pull out the service name @@ -18,22 +18,23 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req if args.State == "" { resp.WriteHeader(400) resp.Write([]byte("Missing check state")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedHealthChecks + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.HealthChecks, nil + return out.HealthChecks, nil } -func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.NodeSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } // Pull out the service name @@ -41,22 +42,23 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedHealthChecks + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.HealthChecks, nil + return out.HealthChecks, nil } -func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } // Pull out the service name @@ -64,22 +66,23 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedHealthChecks + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.HealthChecks, nil + return out.HealthChecks, nil } -func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { - return 0, nil, nil + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } // Check for a tag @@ -94,13 +97,14 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedCheckServiceNodes + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.Nodes, nil + return out.Nodes, nil } diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 1bc9e6a04bae..b8bf1bef5e79 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/consul/structs" "net/http" + "net/http/httptest" "os" "testing" "time" @@ -23,14 +24,12 @@ func TestHealthChecksInState(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthChecksInState(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthChecksInState(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for the server nodes := obj.(structs.HealthChecks) @@ -54,14 +53,12 @@ func TestHealthNodeChecks(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthNodeChecks(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthNodeChecks(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for the server nodes := obj.(structs.HealthChecks) @@ -100,14 +97,12 @@ func TestHealthServiceChecks(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthServiceChecks(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthServiceChecks(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for consul nodes := obj.(structs.HealthChecks) @@ -130,14 +125,12 @@ func TestHealthServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthServiceNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for consul nodes := obj.(structs.CheckServiceNodes) diff --git a/command/agent/http.go b/command/agent/http.go index 67afbe3b9ff2..4030f6fec3fa 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -64,15 +64,15 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/catalog/register", s.wrap(s.CatalogRegister)) s.mux.HandleFunc("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) - s.mux.HandleFunc("/v1/catalog/nodes", s.wrapQuery(s.CatalogNodes)) - s.mux.HandleFunc("/v1/catalog/services", s.wrapQuery(s.CatalogServices)) - s.mux.HandleFunc("/v1/catalog/service/", s.wrapQuery(s.CatalogServiceNodes)) - s.mux.HandleFunc("/v1/catalog/node/", s.wrapQuery(s.CatalogNodeServices)) + s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) + s.mux.HandleFunc("/v1/catalog/services", s.wrap(s.CatalogServices)) + s.mux.HandleFunc("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) + s.mux.HandleFunc("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) - s.mux.HandleFunc("/v1/health/node/", s.wrapQuery(s.HealthNodeChecks)) - s.mux.HandleFunc("/v1/health/checks/", s.wrapQuery(s.HealthServiceChecks)) - s.mux.HandleFunc("/v1/health/state/", s.wrapQuery(s.HealthChecksInState)) - s.mux.HandleFunc("/v1/health/service/", s.wrapQuery(s.HealthServiceNodes)) + s.mux.HandleFunc("/v1/health/node/", s.wrap(s.HealthNodeChecks)) + s.mux.HandleFunc("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) + s.mux.HandleFunc("/v1/health/state/", s.wrap(s.HealthChecksInState)) + s.mux.HandleFunc("/v1/health/service/", s.wrap(s.HealthServiceNodes)) s.mux.HandleFunc("/v1/agent/services", s.wrap(s.AgentServices)) s.mux.HandleFunc("/v1/agent/checks", s.wrap(s.AgentChecks)) @@ -132,16 +132,6 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque return f } -// wrapQuery is used to wrap query functions to make them more convenient -func (s *HTTPServer) wrapQuery(handler func(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { - f := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - idx, obj, err := handler(resp, req) - setIndex(resp, idx) - return obj, err - } - return s.wrap(f) -} - // Renders a simple index page func (s *HTTPServer) Index(resp http.ResponseWriter, req *http.Request) { if req.URL.Path == "/" { @@ -173,9 +163,31 @@ func setIndex(resp http.ResponseWriter, index uint64) { resp.Header().Add("X-Consul-Index", strconv.FormatUint(index, 10)) } +// setKnownLeader is used to set the known leader header +func setKnownLeader(resp http.ResponseWriter, known bool) { + s := "true" + if !known { + s = "false" + } + resp.Header().Add("X-Consul-KnownLeader", s) +} + +// setLastContact is used to set the last contact header +func setLastContact(resp http.ResponseWriter, last time.Duration) { + lastMsec := uint64(last / time.Millisecond) + resp.Header().Add("X-Consul-LastContact", strconv.FormatUint(lastMsec, 10)) +} + +// setMeta is used to set the query response meta data +func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) { + setIndex(resp, m.Index) + setLastContact(resp, m.LastContact) + setKnownLeader(resp, m.KnownLeader) +} + // parseWait is used to parse the ?wait and ?index query params // Returns true on error -func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQuery) bool { +func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { query := req.URL.Query() if wait := query.Get("wait"); wait != "" { dur, err := time.ParseDuration(wait) @@ -198,6 +210,24 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQ return false } +// parseConsistency is used to parse the ?stale and ?consistent query params. +// Returns true on error +func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { + query := req.URL.Query() + if _, ok := query["stale"]; ok { + b.AllowStale = true + } + if _, ok := query["consistent"]; ok { + b.RequireConsistent = true + } + if b.AllowStale && b.RequireConsistent { + resp.WriteHeader(400) + resp.Write([]byte("Cannot specify ?stale with ?consistent, conflicting semantics.")) + return true + } + return false +} + // parseDC is used to parse the ?dc query param func (s *HTTPServer) parseDC(req *http.Request, dc *string) { if other := req.URL.Query().Get("dc"); other != "" { @@ -209,7 +239,10 @@ func (s *HTTPServer) parseDC(req *http.Request, dc *string) { // parse is a convenience method for endpoints that need // to use both parseWait and parseDC. -func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.BlockingQuery) bool { +func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { s.parseDC(req, dc) + if parseConsistency(resp, req, b) { + return true + } return parseWait(resp, req, b) } diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 4b97c597aec1..84d99b44fa52 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -9,6 +9,7 @@ import ( "net/http" "net/http/httptest" "os" + "strconv" "testing" "time" ) @@ -40,6 +41,52 @@ func TestSetIndex(t *testing.T) { } } +func TestSetKnownLeader(t *testing.T) { + resp := httptest.NewRecorder() + setKnownLeader(resp, true) + header := resp.Header().Get("X-Consul-KnownLeader") + if header != "true" { + t.Fatalf("Bad: %v", header) + } + resp = httptest.NewRecorder() + setKnownLeader(resp, false) + header = resp.Header().Get("X-Consul-KnownLeader") + if header != "false" { + t.Fatalf("Bad: %v", header) + } +} + +func TestSetLastContact(t *testing.T) { + resp := httptest.NewRecorder() + setLastContact(resp, 123456*time.Microsecond) + header := resp.Header().Get("X-Consul-LastContact") + if header != "123" { + t.Fatalf("Bad: %v", header) + } +} + +func TestSetMeta(t *testing.T) { + meta := structs.QueryMeta{ + Index: 1000, + KnownLeader: true, + LastContact: 123456 * time.Microsecond, + } + resp := httptest.NewRecorder() + setMeta(resp, &meta) + header := resp.Header().Get("X-Consul-Index") + if header != "1000" { + t.Fatalf("Bad: %v", header) + } + header = resp.Header().Get("X-Consul-KnownLeader") + if header != "true" { + t.Fatalf("Bad: %v", header) + } + header = resp.Header().Get("X-Consul-LastContact") + if header != "123" { + t.Fatalf("Bad: %v", header) + } +} + func TestContentTypeIsJSON(t *testing.T) { dir, srv := makeHTTPServer(t) @@ -69,7 +116,7 @@ func TestContentTypeIsJSON(t *testing.T) { func TestParseWait(t *testing.T) { resp := httptest.NewRecorder() - var b structs.BlockingQuery + var b structs.QueryOptions req, err := http.NewRequest("GET", "/v1/catalog/nodes?wait=60s&index=1000", nil) @@ -91,7 +138,7 @@ func TestParseWait(t *testing.T) { func TestParseWait_InvalidTime(t *testing.T) { resp := httptest.NewRecorder() - var b structs.BlockingQuery + var b structs.QueryOptions req, err := http.NewRequest("GET", "/v1/catalog/nodes?wait=60foo&index=1000", nil) @@ -110,7 +157,7 @@ func TestParseWait_InvalidTime(t *testing.T) { func TestParseWait_InvalidIndex(t *testing.T) { resp := httptest.NewRecorder() - var b structs.BlockingQuery + var b structs.QueryOptions req, err := http.NewRequest("GET", "/v1/catalog/nodes?wait=60s&index=foo", nil) @@ -126,3 +173,83 @@ func TestParseWait_InvalidIndex(t *testing.T) { t.Fatalf("bad code: %v", resp.Code) } } + +func TestParseConsistency(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?stale", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseConsistency(resp, req, &b); d { + t.Fatalf("unexpected done") + } + + if !b.AllowStale { + t.Fatalf("Bad: %v", b) + } + if b.RequireConsistent { + t.Fatalf("Bad: %v", b) + } + + b = structs.QueryOptions{} + req, err = http.NewRequest("GET", + "/v1/catalog/nodes?consistent", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseConsistency(resp, req, &b); d { + t.Fatalf("unexpected done") + } + + if b.AllowStale { + t.Fatalf("Bad: %v", b) + } + if !b.RequireConsistent { + t.Fatalf("Bad: %v", b) + } +} + +func TestParseConsistency_Invalid(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?stale&consistent", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseConsistency(resp, req, &b); !d { + t.Fatalf("expected done") + } + + if resp.Code != 400 { + t.Fatalf("bad code: %v", resp.Code) + } +} + +// assertIndex tests that X-Consul-Index is set and non-zero +func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + t.Fatalf("Bad: %v", header) + } +} + +// getIndex parses X-Consul-Index +func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 { + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + val, err := strconv.Atoi(header) + if err != nil { + t.Fatalf("Bad: %v", header) + } + return uint64(val) +} diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index af97aa5c164f..a72b7286c442 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -12,7 +12,7 @@ import ( func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.KeyRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } @@ -47,10 +47,10 @@ func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *s // Make the RPC var out structs.IndexedDirEntries + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC(method, &args, &out); err != nil { return nil, err } - setIndex(resp, out.Index) // Check if we get a not found if len(out.Entries) == 0 { diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index 6bb476e0a863..f28ef17cf24a 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -57,11 +57,7 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - - header := resp.Header().Get("X-Consul-Index") - if header == "" { - t.Fatalf("Bad: %v", header) - } + assertIndex(t, resp) res, ok := obj.(structs.DirEntries) if !ok { @@ -138,11 +134,7 @@ func TestKVSEndpoint_Recurse(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - - header := resp.Header().Get("X-Consul-Index") - if header == "" { - t.Fatalf("Bad: %v", header) - } + assertIndex(t, resp) res, ok := obj.(structs.DirEntries) if !ok { diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index e288229937ce..51dc11bd01f0 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -14,7 +14,7 @@ type Catalog struct { // Register is used register that a node is providing a given service. func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.Register", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now()) @@ -55,7 +55,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Deregister is used to remove a service registration for a given node. func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.Deregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now()) @@ -91,39 +91,41 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { // ListNodes is used to query the nodes in a DC func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { - if done, err := c.srv.forward("Catalog.ListNodes", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.ListNodes", args, args, reply); done { return err } // Get the local state state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.BlockingQuery, + return c.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("Nodes"), - func() (uint64, error) { + func() error { reply.Index, reply.Nodes = state.Nodes() - return reply.Index, nil + return nil }) } // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { - if done, err := c.srv.forward("Catalog.ListServices", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done { return err } // Get the current nodes state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.BlockingQuery, + return c.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("Services"), - func() (uint64, error) { + func() error { reply.Index, reply.Services = state.Services() - return reply.Index, nil + return nil }) } // ServiceNodes returns all the nodes registered as part of a service func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { - if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.ServiceNodes", args, args, reply); done { return err } @@ -134,15 +136,16 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Get the nodes state := c.srv.fsm.State() - err := c.srv.blockingRPC(&args.BlockingQuery, + err := c.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("ServiceNodes"), - func() (uint64, error) { + func() error { if args.TagFilter { reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) } - return reply.Index, nil + return nil }) // Provide some metrics @@ -160,7 +163,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // NodeServices returns all the services registered as part of a node func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { - if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.NodeServices", args, args, reply); done { return err } @@ -171,10 +174,11 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.BlockingQuery, + return c.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("NodeServices"), - func() (uint64, error) { + func() error { reply.Index, reply.NodeServices = state.NodeServices(args.Node) - return reply.Index, nil + return nil }) } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 1184d6ddb576..3a174ce45637 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -6,6 +6,7 @@ import ( "net/rpc" "os" "sort" + "strings" "testing" "time" ) @@ -232,6 +233,168 @@ func TestCatalogListNodes(t *testing.T) { } } +func TestCatalogListNodes_StaleRaad(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the follower as the client + var client *rpc.Client + if !s1.IsLeader() { + client = client1 + + // Inject fake data on the follower! + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + } else { + client = client2 + + // Inject fake data on the follower! + s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{AllowStale: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + found := false + for _, n := range out.Nodes { + if n.Node == "foo" { + found = true + } + } + if !found { + t.Fatalf("failed to find foo") + } + + if out.QueryMeta.LastContact == 0 { + t.Fatalf("should have a last contact time") + } + if !out.QueryMeta.KnownLeader { + t.Fatalf("should have known leader") + } +} + +func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the leader as the client, kill the follower + var client *rpc.Client + if s1.IsLeader() { + client = client1 + s2.Shutdown() + } else { + client = client2 + s1.Shutdown() + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{RequireConsistent: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") { + t.Fatalf("err: %v", err) + } + + if out.QueryMeta.LastContact != 0 { + t.Fatalf("should not have a last contact time") + } + if out.QueryMeta.KnownLeader { + t.Fatalf("should have no known leader") + } +} + +func TestCatalogListNodes_ConsistentRead(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the leader as the client, kill the follower + var client *rpc.Client + if s1.IsLeader() { + client = client1 + } else { + client = client2 + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{RequireConsistent: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if out.QueryMeta.LastContact != 0 { + t.Fatalf("should not have a last contact time") + } + if !out.QueryMeta.KnownLeader { + t.Fatalf("should have known leader") + } +} + func BenchmarkCatalogListNodes(t *testing.B) { dir1, s1 := testServer(nil) defer os.RemoveAll(dir1) @@ -394,6 +557,39 @@ func TestCatalogListServices_Timeout(t *testing.T) { } } +func TestCatalogListServices_Stale(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + args.AllowStale = true + var out structs.IndexedServices + + // Inject a fake service + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000}) + + // Run the query, do not wait for leader! + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Should find the service + if len(out.Services) != 1 { + t.Fatalf("bad: %v", out) + } + + // Should not have a leader! Stale read + if out.KnownLeader { + t.Fatalf("bad: %v", out) + } +} + func TestCatalogListServiceNodes(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index f1428c43f702..e6db8c99adb2 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -14,34 +14,36 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.ChecksInState", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.ChecksInState", args, args, reply); done { return err } // Get the state specific checks state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.BlockingQuery, + return h.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("ChecksInState"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.ChecksInState(args.State) - return reply.Index, nil + return nil }) } // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.NodeChecks", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.NodeChecks", args, args, reply); done { return err } // Get the node checks state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.BlockingQuery, + return h.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("NodeChecks"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) - return reply.Index, nil + return nil }) } @@ -54,23 +56,24 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Potentially forward - if done, err := h.srv.forward("Health.ServiceChecks", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.ServiceChecks", args, args, reply); done { return err } // Get the service checks state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.BlockingQuery, + return h.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("ServiceChecks"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) - return reply.Index, nil + return nil }) } // ServiceNodes returns all the nodes registered as part of a service including health info func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := h.srv.forward("Health.ServiceNodes", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.ServiceNodes", args, args, reply); done { return err } @@ -81,15 +84,16 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Get the nodes state := h.srv.fsm.State() - err := h.srv.blockingRPC(&args.BlockingQuery, + err := h.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("CheckServiceNodes"), - func() (uint64, error) { + func() error { if args.TagFilter { reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) } - return reply.Index, nil + return nil }) // Provide some metrics diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 0e884524f36b..b955f3428a1f 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -15,7 +15,7 @@ type KVS struct { // Apply is used to apply a KVS request to the data store. This should // only be used for operations that modify the data func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { + if done, err := k.srv.forward("KVS.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) @@ -44,18 +44,19 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Get is used to lookup a single key func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done { + if done, err := k.srv.forward("KVS.Get", args, args, reply); done { return err } // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.BlockingQuery, + return k.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("KVSGet"), - func() (uint64, error) { + func() error { index, ent, err := state.KVSGet(args.Key) if err != nil { - return 0, err + return err } if ent == nil { // Must provide non-zero index to prevent blocking @@ -70,24 +71,25 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er reply.Index = ent.ModifyIndex reply.Entries = structs.DirEntries{ent} } - return reply.Index, nil + return nil }) } // List is used to list all keys with a given prefix func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done { + if done, err := k.srv.forward("KVS.List", args, args, reply); done { return err } // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.BlockingQuery, + return k.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, state.QueryTables("KVSList"), - func() (uint64, error) { + func() error { index, ent, err := state.KVSList(args.Key) if err != nil { - return 0, err + return err } if len(ent) == 0 { // Must provide non-zero index to prevent blocking @@ -110,6 +112,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e reply.Index = maxIndex reply.Entries = ent } - return reply.Index, nil + return nil }) } diff --git a/consul/rpc.go b/consul/rpc.go index cb4185a3dc0a..fa80a582d89f 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -134,13 +134,19 @@ func (s *Server) handleConsulConn(conn net.Conn) { // forward is used to forward to a remote DC or to forward to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) forward(method, dc string, args interface{}, reply interface{}) (bool, error) { +func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { // Handle DC forwarding + dc := info.RequestDatacenter() if dc != s.config.Datacenter { err := s.forwardDC(method, dc, args, reply) return true, err } + // Check if we can allow a stale read + if info.IsRead() && info.AllowStaleRead() { + return false, nil + } + // Handle leader forwarding if !s.IsLeader() { err := s.forwardLeader(method, args, reply) @@ -197,7 +203,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. -func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error { +func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta, + tables MDBTables, run func() error) error { var timeout <-chan time.Time var notifyCh chan struct{} @@ -233,12 +240,22 @@ SETUP_NOTIFY: s.fsm.State().Watch(tables, notifyCh) } - // Run the query function RUN_QUERY: - idx, err := run() + // Update the query meta data + s.setQueryMeta(m) + + // Check if query must be consistent + if b.RequireConsistent { + if err := s.consistentRead(); err != nil { + return err + } + } + + // Run the query function + err := run() // Check for minimum query time - if err == nil && idx <= b.MinQueryIndex { + if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex { select { case <-notifyCh: goto SETUP_NOTIFY @@ -247,3 +264,22 @@ RUN_QUERY: } return err } + +// setQueryMeta is used to populate the QueryMeta data for an RPC call +func (s *Server) setQueryMeta(m *structs.QueryMeta) { + if s.IsLeader() { + m.LastContact = 0 + m.KnownLeader = true + } else { + m.LastContact = time.Now().Sub(s.raft.LastContact()) + m.KnownLeader = (s.raft.Leader() != nil) + } +} + +// consistentRead is used to ensure we do not perform a stale +// read. This is done by verifying leadership before the read. +func (s *Server) consistentRead() error { + defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) + future := s.raft.VerifyLeader() + return future.Error() +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 5c11801a7ffb..2aac5ee982e6 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -28,14 +28,64 @@ const ( HealthCritical = "critical" ) -// BlockingQuery is used to block on a query and wait for a change. -// Either both fields, or neither must be provided. -type BlockingQuery struct { - // If set, wait until query exceeds given index +// RPCInfo is used to describe common information about query +type RPCInfo interface { + RequestDatacenter() string + IsRead() bool + AllowStaleRead() bool +} + +// QueryOptions is used to specify various flags for read queries +type QueryOptions struct { + // If set, wait until query exceeds given index. Must be provided + // with MaxQueryTime. MinQueryIndex uint64 - // Provided with MinQueryIndex to wait for change + // Provided with MinQueryIndex to wait for change. MaxQueryTime time.Duration + + // If set, any follower can service the request. Results + // may be arbitrarily stale. + AllowStale bool + + // If set, the leader must verify leadership prior to + // servicing the request. Prevents a stale read. + RequireConsistent bool +} + +// QueryOption only applies to reads, so always true +func (q QueryOptions) IsRead() bool { + return true +} + +func (q QueryOptions) AllowStaleRead() bool { + return q.AllowStale +} + +type WriteRequest struct{} + +// WriteRequest only applies to writes, always false +func (w WriteRequest) IsRead() bool { + return false +} + +func (w WriteRequest) AllowStaleRead() bool { + return false +} + +// QueryMeta allows a query response to include potentially +// useful metadata about a query +type QueryMeta struct { + // This is the index associated with the read + Index uint64 + + // If AllowStale is used, this is time elapsed since + // last contact between the follower and leader. This + // can be used to gauge staleness. + LastContact time.Duration + + // Used to indicate if there is a known leader node + KnownLeader bool } // RegisterRequest is used for the Catalog.Register endpoint @@ -47,6 +97,11 @@ type RegisterRequest struct { Address string Service *NodeService Check *HealthCheck + WriteRequest +} + +func (r *RegisterRequest) RequestDatacenter() string { + return r.Datacenter } // DeregisterRequest is used for the Catalog.Deregister endpoint @@ -57,12 +112,21 @@ type DeregisterRequest struct { Node string ServiceID string CheckID string + WriteRequest +} + +func (r *DeregisterRequest) RequestDatacenter() string { + return r.Datacenter } // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { Datacenter string - BlockingQuery + QueryOptions +} + +func (r *DCSpecificRequest) RequestDatacenter() string { + return r.Datacenter } // ServiceSpecificRequest is used to query about a specific node @@ -71,21 +135,33 @@ type ServiceSpecificRequest struct { ServiceName string ServiceTag string TagFilter bool // Controls tag filtering - BlockingQuery + QueryOptions +} + +func (r *ServiceSpecificRequest) RequestDatacenter() string { + return r.Datacenter } // NodeSpecificRequest is used to request the information about a single node type NodeSpecificRequest struct { Datacenter string Node string - BlockingQuery + QueryOptions +} + +func (r *NodeSpecificRequest) RequestDatacenter() string { + return r.Datacenter } // ChecksInStateRequest is used to query for nodes in a state type ChecksInStateRequest struct { Datacenter string State string - BlockingQuery + QueryOptions +} + +func (r *ChecksInStateRequest) RequestDatacenter() string { + return r.Datacenter } // Used to return information about a node @@ -144,33 +220,33 @@ type CheckServiceNode struct { type CheckServiceNodes []CheckServiceNode type IndexedNodes struct { - Index uint64 Nodes Nodes + QueryMeta } type IndexedServices struct { - Index uint64 Services Services + QueryMeta } type IndexedServiceNodes struct { - Index uint64 ServiceNodes ServiceNodes + QueryMeta } type IndexedNodeServices struct { - Index uint64 NodeServices *NodeServices + QueryMeta } type IndexedHealthChecks struct { - Index uint64 HealthChecks HealthChecks + QueryMeta } type IndexedCheckServiceNodes struct { - Index uint64 Nodes CheckServiceNodes + QueryMeta } // DirEntry is used to represent a directory entry. This is @@ -198,18 +274,27 @@ type KVSRequest struct { Datacenter string Op KVSOp // Which operation are we performing DirEnt DirEntry // Which directory entry + WriteRequest +} + +func (r *KVSRequest) RequestDatacenter() string { + return r.Datacenter } // KeyRequest is used to request a key, or key prefix type KeyRequest struct { Datacenter string Key string - BlockingQuery + QueryOptions +} + +func (r *KeyRequest) RequestDatacenter() string { + return r.Datacenter } type IndexedDirEntries struct { - Index uint64 Entries DirEntries + QueryMeta } // Decode is used to decode a MsgPack encoded object diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index ec616ea7a1e0..8d4079422af6 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -42,6 +42,40 @@ note is that when the query returns there is **no guarantee** of a change. It is possible that the timeout was reached, or that there was an idempotent write that does not affect the result. +## Consistency Modes + +Most of the read query endpoints support multiple levels of consistency. +These are to provide a tuning knob that clients can be used to find a happy +medium that best matches their needs. + +The three read modes are: + +* default - If not specified, this mode is used. It is strongly consistent + in almost all cases. However, there is a small window in which an new + leader may be elected, and the old leader may service stale values. The + trade off is fast reads, but potentially stale values. This condition is + hard to trigger, and most clients should not need to worry about the stale read. + This only applies to reads, and a split-brain is not possible on writes. + +* consistent - This mode is strongly consistent without caveats. It requires + that a leader verify with a quorum of peers that it is still leader. This + introduces an additional round-trip to all server nodes. The trade off is + always consistent reads, but increased latency due to an extra round trip. + Most clients should not use this unless they cannot tolerate a stale read. + +* stale - This mode allows any server to service the read, regardless of if + it is the leader. This means reads can be arbitrarily stale, but are generally + within 50 milliseconds of the leader. The trade off is very fast and scalable + reads but values will be stale. This mode allows reads without a leader, meaning + a cluster that is unavailable will still be able to respond. + +To switch these modes, either the "?stale" or "?consistent" query parameters +are provided. It is an error to provide both. + +To support bounding how stale data is, there is an "X-Consul-LastContact" +which is the last time a server was contacted by the leader node in +milliseconds. The "X-Consul-KnownLeader" also indicates if there is a known +leader. These can be used to gauage if a stale read should be used. ## KV @@ -81,7 +115,8 @@ that modified this key. This index corresponds to the `X-Consul-Index` header value that is returned. A blocking query can be used to wait for a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds to the latest `ModifyIndex` and so a blocking query waits until any of the -listed keys are updated. +listed keys are updated. The multiple consistency modes can be used for +`GET` requests as well. The `Key` is simply the full path of the entry. `Flags` are an opaque unsigned integer that can be attached to each entry. The use of this is @@ -347,7 +382,8 @@ The following endpoints are supported: * /v1/catalog/service/\ : Lists the nodes in a given service * /v1/catalog/node/\ : Lists the services provided by a node -The last 4 endpoints of the catalog support blocking queries. +The last 4 endpoints of the catalog support blocking queries and +consistency modes. ### /v1/catalog/register @@ -473,7 +509,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/catalog/services @@ -492,7 +528,7 @@ It returns a JSON body like this: The main object keys are the service names, while the array provides all the known tags for a given service. -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/catalog/service/\ @@ -517,7 +553,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/catalog/node/\ @@ -549,7 +585,7 @@ It returns a JSON body like this: } } -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ## Health @@ -564,7 +600,7 @@ The following endpoints are supported: * /v1/health/service/\: Returns the nodes and health info of a service * /v1/health/state/\: Returns the checks in a given state -All of the health endpoints supports blocking queries. +All of the health endpoints supports blocking queries and all consistency modes. ### /v1/health/node/\ @@ -603,7 +639,7 @@ joins the Consul cluster, it is part of a distributed failure detection provided by Serf. If a node fails, it is detected and the status is automatically changed to "critical". -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/health/checks/\ @@ -627,7 +663,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/health/service/\ @@ -684,7 +720,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/health/state/\ @@ -718,7 +754,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ## Status diff --git a/website/source/docs/internals/consensus.html.markdown b/website/source/docs/internals/consensus.html.markdown index 98e52f778ec7..55ccad315872 100644 --- a/website/source/docs/internals/consensus.html.markdown +++ b/website/source/docs/internals/consensus.html.markdown @@ -131,6 +131,39 @@ only for data in their datacenter. When a request is received for a remote datac the request is forwarded to the correct leader. This design allows for lower latency transactions and higher availability without sacrificing consistency. +## Consistency Modes + +Although all writes to the replicated log go through Raft, reads are more +flexible. To support various tradeoffs that developers may want, Consul +supports 3 different consistency modes for reads. + +The three read modes are: + +* default - Raft makes use of leader leasing, providing a time window + in which the leader assumes it's role is stable. However, if a leader + is partitioned from the remaining peers, a new leader may be elected + while the old leader is holding the lease. This means there are 2 leader + nodes. There is no risk of a split-brain since the old leader will be + unable to commit new logs. However, if the old leader services any reads + the values are potentially stale. The default consistency mode relies only + on leader leasing, exposing clients to potentially stale values. We make + this trade off because reads are fast, usually strongly consistent, and + only stale in a hard to trigger situation. The time window of stale reads + is also bounded, since the leader will step down due to the partition. + +* consistent - This mode is strongly consistent without caveats. It requires + that a leader verify with a quorum of peers that it is still leader. This + introduces an additional round-trip to all server nodes. The trade off is + always consistent reads, but increased latency due to an extra round trip. + +* stale - This mode allows any server to service the read, regardless of if + it is the leader. This means reads can be arbitrarily stale, but are generally + within 50 milliseconds of the leader. The trade off is very fast and scalable + reads but values will be stale. This mode allows reads without a leader, meaning + a cluster that is unavailable will still be able to respond. + +For more documentation about using these various modes, see the [HTTP API](/docs/agent/http.html). + ## Deployment Table Below is a table that shows for the number of servers how large the