Skip to content

Commit

Permalink
Merge pull request #68 from hashicorp/f-consistency
Browse files Browse the repository at this point in the history
Adding support for "stale" and "consistent" read modes
  • Loading branch information
armon committed Apr 21, 2014
2 parents c1bd716 + 23dd1a0 commit 57a45ea
Show file tree
Hide file tree
Showing 16 changed files with 745 additions and 198 deletions.
48 changes: 26 additions & 22 deletions command/agent/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,39 +57,41 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
return out, nil
}

func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Setup the request
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

var out structs.IndexedNodes
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.Nodes, nil
return out.Nodes, nil
}

func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

var out structs.IndexedServices
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.Services, nil
return out.Services, nil
}

func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Check for a tag
Expand All @@ -104,36 +106,38 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
if args.ServiceName == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing service name"))
return 0, nil, nil
return nil, nil
}

// Make the RPC request
var out structs.IndexedServiceNodes
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.ServiceNodes, nil
return out.ServiceNodes, nil
}

func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default Datacenter
args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Pull out the node name
args.Node = strings.TrimPrefix(req.URL.Path, "/v1/catalog/node/")
if args.Node == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing node name"))
return 0, nil, nil
return nil, nil
}

// Make the RPC request
var out structs.IndexedNodeServices
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.NodeServices, nil
return out.NodeServices, nil
}
36 changes: 17 additions & 19 deletions command/agent/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
Expand Down Expand Up @@ -115,14 +116,14 @@ func TestCatalogNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, obj, err := srv.CatalogNodes(nil, req)
resp := httptest.NewRecorder()
obj, err := srv.CatalogNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Verify an index is set
assertIndex(t, resp)

nodes := obj.(structs.Nodes)
if len(nodes) != 2 {
Expand Down Expand Up @@ -170,7 +171,8 @@ func TestCatalogNodes_Blocking(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, obj, err := srv.CatalogNodes(nil, req)
resp := httptest.NewRecorder()
obj, err := srv.CatalogNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -180,7 +182,7 @@ func TestCatalogNodes_Blocking(t *testing.T) {
t.Fatalf("too fast")
}

if idx <= out.Index {
if idx := getIndex(t, resp); idx <= out.Index {
t.Fatalf("bad: %v", idx)
}

Expand Down Expand Up @@ -218,14 +220,13 @@ func TestCatalogServices(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, obj, err := srv.CatalogServices(nil, req)
resp := httptest.NewRecorder()
obj, err := srv.CatalogServices(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if idx == 0 {
t.Fatalf("bad: %v", idx)
}
assertIndex(t, resp)

services := obj.(structs.Services)
if len(services) != 2 {
Expand Down Expand Up @@ -262,14 +263,13 @@ func TestCatalogServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, obj, err := srv.CatalogServiceNodes(nil, req)
resp := httptest.NewRecorder()
obj, err := srv.CatalogServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if idx == 0 {
t.Fatalf("bad: %v", idx)
}
assertIndex(t, resp)

nodes := obj.(structs.ServiceNodes)
if len(nodes) != 1 {
Expand Down Expand Up @@ -306,14 +306,12 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, obj, err := srv.CatalogNodeServices(nil, req)
resp := httptest.NewRecorder()
obj, err := srv.CatalogNodeServices(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if idx == 0 {
t.Fatalf("bad: %v", idx)
}
assertIndex(t, resp)

services := obj.(*structs.NodeServices)
if len(services.Services) != 1 {
Expand Down
52 changes: 28 additions & 24 deletions command/agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,80 +6,83 @@ import (
"strings"
)

func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.ChecksInStateRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Pull out the service name
args.State = strings.TrimPrefix(req.URL.Path, "/v1/health/state/")
if args.State == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing check state"))
return 0, nil, nil
return nil, nil
}

// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.HealthChecks, nil
return out.HealthChecks, nil
}

func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Pull out the service name
args.Node = strings.TrimPrefix(req.URL.Path, "/v1/health/node/")
if args.Node == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing node name"))
return 0, nil, nil
return nil, nil
}

// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.HealthChecks, nil
return out.HealthChecks, nil
}

func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Pull out the service name
args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/health/checks/")
if args.ServiceName == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing service name"))
return 0, nil, nil
return nil, nil
}

// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.HealthChecks, nil
return out.HealthChecks, nil
}

func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Check for a tag
Expand All @@ -94,13 +97,14 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
if args.ServiceName == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing service name"))
return 0, nil, nil
return nil, nil
}

// Make the RPC request
var out structs.IndexedCheckServiceNodes
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return 0, nil, err
return nil, err
}
return out.Index, out.Nodes, nil
return out.Nodes, nil
}
Loading

0 comments on commit 57a45ea

Please sign in to comment.