Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support "near" parameter in prepared query service block #2137

Merged
merged 15 commits into from
Jul 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,15 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
Token: d.agent.config.ACLToken,
AllowStale: d.config.AllowStale,
},

// Always pass the local agent through. In the DNS interface, there
// is no provision for passing additional query parameters, so we
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Node: d.agent.config.NodeName,
},
}

// TODO (slackpad) - What's a safe limit we can set here? It seems like
Expand Down
34 changes: 34 additions & 0 deletions command/agent/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3166,3 +3166,37 @@ func TestDNS_InvalidQueries(t *testing.T) {
}
}
}

func TestDNS_PreparedQuery_AgentSource(t *testing.T) {
dir, srv := makeDNSServer(t)
defer os.RemoveAll(dir)
defer srv.agent.Shutdown()

testutil.WaitForLeader(t, srv.agent.RPC, "dc1")

m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
// Check that the agent inserted its self-name and datacenter to
// the RPC request body.
if args.Agent.Datacenter != srv.agent.config.Datacenter ||
args.Agent.Node != srv.agent.config.NodeName {
t.Fatalf("bad: %#v", args.Agent)
}
return nil
}

{
m := new(dns.Msg)
m.SetQuestion("foo.query.consul.", dns.TypeSRV)

c := new(dns.Client)
addr, _ := srv.agent.config.ClientListener("", srv.agent.config.Ports.DNS)
if _, _, err := c.Exchange(m, addr.String()); err != nil {
t.Fatalf("err: %v", err)
}
}
}
8 changes: 8 additions & 0 deletions command/agent/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down Expand Up @@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down
40 changes: 40 additions & 0 deletions command/agent/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
Expand Down Expand Up @@ -323,6 +327,38 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
})

// Ensure the proper params are set when no special args are passed
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Source.Node != "" {
t.Fatalf("expect node to be empty, got %q", args.Source.Node)
}
expect := structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
}
if !reflect.DeepEqual(args.Agent, expect) {
t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
}
return nil
}

req, err := http.NewRequest("GET", "/v1/query/my-id/execute", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
if _, err := srv.PreparedQuerySpecific(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
})

httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
Expand Down Expand Up @@ -357,6 +393,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
Expand Down
2 changes: 2 additions & 0 deletions consul/prepared_query/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
Failover: structs.QueryDatacenterOptions{
Datacenters: []string{"dc1", "dc2"},
},
Near: "_agent",
Tags: []string{"tag1", "tag2", "tag3"},
}
if err := walk(service, fn); err != nil {
Expand All @@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
".Service:the-service",
".Failover.Datacenters[0]:dc1",
".Failover.Datacenters[1]:dc2",
".Near:_agent",
".Tags[0]:tag1",
".Tags[1]:tag2",
".Tags[2]:tag3",
Expand Down
20 changes: 19 additions & 1 deletion consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,25 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Shuffle the results in case coordinates are not available if they
// requested an RTT sort.
reply.Nodes.Shuffle()
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {

// Build the query source. This can be provided by the client, or by
// the prepared query. Client-specified takes priority.
qs := args.Source
if qs.Datacenter == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DC should always be supplied by parseDC but this is fine and defensive.

qs.Datacenter = args.Agent.Datacenter
}
if query.Service.Near != "" && qs.Node == "" {
qs.Node = query.Service.Near
}

// Respect the magic "_agent" flag.
if qs.Node == "_agent" {
qs.Node = args.Agent.Node
}

// Perform the distance sort
Copy link
Contributor

@sean- sean- Jun 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to memoize this value for ~60s or whatever the interval is for the tomography calcs? I seem to recall someone saying this calc was "expensive," but if it's not, disregard. DNS can set a TTL, so maybe this isn't a huge issue, but that depends on the level of DNS caching at a given site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too bad - I'd probably leave as-is since this can be DNS TTL-ed and read-scaled with staleness across the servers.

err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
if err != nil {
return err
}

Expand Down
192 changes: 191 additions & 1 deletion consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,197 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
}

// Set the query to return results nearest to node3. This is the only
// node with coordinates, and it carries the service we are asking for,
// so node3 should always show up first.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.Near = "node3"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Now run the query and make sure the sort looks right.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}

// Query again, but this time set a client-supplied query source. This
// proves that we allow overriding the baked-in value with ?near.
{
// Set up the query with a non-existent node. This will cause the
// nodes to be shuffled if the passed node is respected, proving
// that we allow the override to happen.
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatalf("expect nodes to be shuffled")
}
}

// Bake the magic "_agent" flag into the query.
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Check that we sort the local agent first when the magic flag is set.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}

// Check that the query isn't just sorting "node3" first because we
// provided it in the Agent query source. Proves that we use the
// Agent source when the magic "_agent" flag is passed.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

// Expect the set to be shuffled since we have no coordinates
// on the "foo" node.
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect nodes to be shuffled")
}
}

// Shuffles if the response comes from a non-local DC. Proves that the
// agent query source does not interfere with the order.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc2",
Node: "node3",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if reply.Nodes[0].Node.Node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect node shuffle for remote results")
}
}

// Un-bake the near parameter.
query.Query.Service.Near = ""
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Update the health of a node to mark it critical.
setHealth := func(node string, health string) {
req := structs.RegisterRequest{
Expand Down Expand Up @@ -1683,7 +1874,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
}

// Make the query more picky so it excludes warning nodes.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.OnlyPassing = true
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading