Skip to content

Commit

Permalink
Merge pull request #2137 from hashicorp/f-pq-near
Browse files Browse the repository at this point in the history
Support "near" parameter in prepared query service block
  • Loading branch information
ryanuber authored Jul 1, 2016
2 parents 56eb3bb + 2b24644 commit ab16547
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 4 deletions.
9 changes: 9 additions & 0 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,15 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
Token: d.agent.config.ACLToken,
AllowStale: d.config.AllowStale,
},

// Always pass the local agent through. In the DNS interface, there
// is no provision for passing additional query parameters, so we
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Node: d.agent.config.NodeName,
},
}

// TODO (slackpad) - What's a safe limit we can set here? It seems like
Expand Down
34 changes: 34 additions & 0 deletions command/agent/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3166,3 +3166,37 @@ func TestDNS_InvalidQueries(t *testing.T) {
}
}
}

func TestDNS_PreparedQuery_AgentSource(t *testing.T) {
dir, srv := makeDNSServer(t)
defer os.RemoveAll(dir)
defer srv.agent.Shutdown()

testutil.WaitForLeader(t, srv.agent.RPC, "dc1")

m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
// Check that the agent inserted its self-name and datacenter to
// the RPC request body.
if args.Agent.Datacenter != srv.agent.config.Datacenter ||
args.Agent.Node != srv.agent.config.NodeName {
t.Fatalf("bad: %#v", args.Agent)
}
return nil
}

{
m := new(dns.Msg)
m.SetQuestion("foo.query.consul.", dns.TypeSRV)

c := new(dns.Client)
addr, _ := srv.agent.config.ClientListener("", srv.agent.config.Ports.DNS)
if _, _, err := c.Exchange(m, addr.String()); err != nil {
t.Fatalf("err: %v", err)
}
}
}
8 changes: 8 additions & 0 deletions command/agent/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down Expand Up @@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down
40 changes: 40 additions & 0 deletions command/agent/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
Expand Down Expand Up @@ -323,6 +327,38 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
})

// Ensure the proper params are set when no special args are passed
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Source.Node != "" {
t.Fatalf("expect node to be empty, got %q", args.Source.Node)
}
expect := structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
}
if !reflect.DeepEqual(args.Agent, expect) {
t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
}
return nil
}

req, err := http.NewRequest("GET", "/v1/query/my-id/execute", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
if _, err := srv.PreparedQuerySpecific(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
})

httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
Expand Down Expand Up @@ -357,6 +393,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
Expand Down
2 changes: 2 additions & 0 deletions consul/prepared_query/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
Failover: structs.QueryDatacenterOptions{
Datacenters: []string{"dc1", "dc2"},
},
Near: "_agent",
Tags: []string{"tag1", "tag2", "tag3"},
}
if err := walk(service, fn); err != nil {
Expand All @@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
".Service:the-service",
".Failover.Datacenters[0]:dc1",
".Failover.Datacenters[1]:dc2",
".Near:_agent",
".Tags[0]:tag1",
".Tags[1]:tag2",
".Tags[2]:tag3",
Expand Down
20 changes: 19 additions & 1 deletion consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,25 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Shuffle the results in case coordinates are not available if they
// requested an RTT sort.
reply.Nodes.Shuffle()
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {

// Build the query source. This can be provided by the client, or by
// the prepared query. Client-specified takes priority.
qs := args.Source
if qs.Datacenter == "" {
qs.Datacenter = args.Agent.Datacenter
}
if query.Service.Near != "" && qs.Node == "" {
qs.Node = query.Service.Near
}

// Respect the magic "_agent" flag.
if qs.Node == "_agent" {
qs.Node = args.Agent.Node
}

// Perform the distance sort
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
if err != nil {
return err
}

Expand Down
192 changes: 191 additions & 1 deletion consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,197 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
}

// Set the query to return results nearest to node3. This is the only
// node with coordinates, and it carries the service we are asking for,
// so node3 should always show up first.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.Near = "node3"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Now run the query and make sure the sort looks right.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}

// Query again, but this time set a client-supplied query source. This
// proves that we allow overriding the baked-in value with ?near.
{
// Set up the query with a non-existent node. This will cause the
// nodes to be shuffled if the passed node is respected, proving
// that we allow the override to happen.
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatalf("expect nodes to be shuffled")
}
}

// Bake the magic "_agent" flag into the query.
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Check that we sort the local agent first when the magic flag is set.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}

// Check that the query isn't just sorting "node3" first because we
// provided it in the Agent query source. Proves that we use the
// Agent source when the magic "_agent" flag is passed.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

// Expect the set to be shuffled since we have no coordinates
// on the "foo" node.
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect nodes to be shuffled")
}
}

// Shuffles if the response comes from a non-local DC. Proves that the
// agent query source does not interfere with the order.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc2",
Node: "node3",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if reply.Nodes[0].Node.Node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect node shuffle for remote results")
}
}

// Un-bake the near parameter.
query.Query.Service.Near = ""
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Update the health of a node to mark it critical.
setHealth := func(node string, health string) {
req := structs.RegisterRequest{
Expand Down Expand Up @@ -1683,7 +1874,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
}

// Make the query more picky so it excludes warning nodes.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.OnlyPassing = true
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading

0 comments on commit ab16547

Please sign in to comment.