Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support "near" parameter in prepared query service block #2137

Merged
merged 15 commits into from
Jul 1, 2016
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,15 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
Token: d.agent.config.ACLToken,
AllowStale: d.config.AllowStale,
},

// Always pass the local agent through. In the DNS interface, there
// is no provision for passing additional query parameters, so we
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Node: d.agent.config.NodeName,
},
}

// TODO (slackpad) - What's a safe limit we can set here? It seems like
Expand Down
8 changes: 1 addition & 7 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,7 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
// DC in the request, if given, or else the agent's DC.
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
s.parseDC(req, &source.Datacenter)
if node := req.URL.Query().Get("near"); node != "" {
if node == "_agent" {
source.Node = s.agent.config.NodeName
} else {
source.Node = node
}
}
source.Node = req.URL.Query().Get("near")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this'll break other endpoints that depend on ?near=_agent getting resolved at this level. I think you can pass the existing Source member of the execute request into here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it break? This just pushes the check out to the consul side instead of the agent. I was trying to avoid checking the magic flag in the agent and on the consul side (since the Near parameter can live in either place now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think you'll need to revert this change - should not be needed any more and there are other RPC endpoints that do the sort which rely on _agent getting resolved on the agent side, not the server side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, catalog and health both allow distance sorting. Will fix!

}

// parse is a convenience method for endpoints that need
Expand Down
16 changes: 2 additions & 14 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ func TestParseSource(t *testing.T) {
defer srv.Shutdown()
defer srv.agent.Shutdown()

// Default is agent's DC and no node (since the user didn't care, then
// just give them the cheapest possible query).
// Default is agent's DC and no node (since the user didn't care,
// then just give them the cheapest possible query).
req, err := http.NewRequest("GET",
"/v1/catalog/nodes", nil)
if err != nil {
Expand Down Expand Up @@ -382,18 +382,6 @@ func TestParseSource(t *testing.T) {
if source.Datacenter != "foo" || source.Node != "bob" {
t.Fatalf("bad: %v", source)
}

// The magic "_agent" node name will use the agent's local node name.
req, err = http.NewRequest("GET",
"/v1/catalog/nodes?near=_agent", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
source = structs.QuerySource{}
srv.parseSource(req, &source)
if source.Datacenter != "dc1" || source.Node != srv.agent.config.NodeName {
t.Fatalf("bad: %v", source)
}
}

func TestParseWait(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions command/agent/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down Expand Up @@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down
40 changes: 40 additions & 0 deletions command/agent/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
Expand Down Expand Up @@ -323,6 +327,38 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
})

// Ensure the proper params are set when no special args are passed
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Source.Node != "" {
t.Fatalf("expect node to be empty, got %q", args.Source.Node)
}
expect := structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
}
if !reflect.DeepEqual(args.Agent, expect) {
t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
}
return nil
}

req, err := http.NewRequest("GET", "/v1/query/my-id/execute", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
if _, err := srv.PreparedQuerySpecific(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
})

httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
Expand Down Expand Up @@ -357,6 +393,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
Expand Down
2 changes: 2 additions & 0 deletions consul/prepared_query/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
Failover: structs.QueryDatacenterOptions{
Datacenters: []string{"dc1", "dc2"},
},
Near: "_agent",
Tags: []string{"tag1", "tag2", "tag3"},
}
if err := walk(service, fn); err != nil {
Expand All @@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
".Service:the-service",
".Failover.Datacenters[0]:dc1",
".Failover.Datacenters[1]:dc2",
".Near:_agent",
".Tags[0]:tag1",
".Tags[1]:tag2",
".Tags[2]:tag3",
Expand Down
20 changes: 19 additions & 1 deletion consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,25 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Shuffle the results in case coordinates are not available if they
// requested an RTT sort.
reply.Nodes.Shuffle()
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {

// Build the query source. This can be provided by the client, or by
// the prepared query. Client-specified takes priority.
qs := args.Source
if qs.Datacenter == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DC should always be supplied by parseDC but this is fine and defensive.

qs.Datacenter = args.Agent.Datacenter
}
if query.Service.Near != "" && qs.Node == "" {
qs.Node = query.Service.Near
}

// Respect the magic "_agent" flag.
if qs.Node == "_agent" {
qs.Node = args.Agent.Node
}

// Perform the distance sort
Copy link
Contributor

@sean- sean- Jun 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to memoize this value for ~60s or whatever the interval is for the tomography calcs? I seem to recall someone saying this calc was "expensive," but if it's not, disregard. DNS can set a TTL, so maybe this isn't a huge issue, but that depends on the level of DNS caching at a given site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too bad - I'd probably leave as-is since this can be DNS TTL-ed and read-scaled with staleness across the servers.

err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
if err != nil {
return err
}

Expand Down
192 changes: 191 additions & 1 deletion consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,197 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
}

// Set the query to return results nearest to node3. This is the only
// node with coordinates, and it carries the service we are asking for,
// so node3 should always show up first.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.Near = "node3"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Now run the query and make sure the sort looks right.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}

// Query again, but this time set a client-supplied query source. This
// proves that we allow overriding the baked-in value with ?near.
{
// Set up the query with a non-existent node. This will cause the
// nodes to be shuffled if the passed node is respected, proving
// that we allow the override to happen.
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatalf("expect nodes to be shuffled")
}
}

// Bake the magic "_agent" flag into the query.
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Check that we sort the local agent first when the magic flag is set.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}

// Check that the query isn't just sorting "node3" first because we
// provided it in the Agent query source. Proves that we use the
// Agent source when the magic "_agent" flag is passed.
{
req := structs.PreparedQueryExecuteRequest{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

// Expect the set to be shuffled since we have no coordinates
// on the "foo" node.
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect nodes to be shuffled")
}
}

// Shuffles if the response comes from a non-local DC. Proves that the
// agent query source does not interfere with the order.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc2",
Node: "node3",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if reply.Nodes[0].Node.Node != "node3" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect node shuffle for remote results")
}
}

// Un-bake the near parameter.
query.Query.Service.Near = ""
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Update the health of a node to mark it critical.
setHealth := func(node string, health string) {
req := structs.RegisterRequest{
Expand Down Expand Up @@ -1683,7 +1874,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
}

// Make the query more picky so it excludes warning nodes.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.OnlyPassing = true
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading