Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support "near" parameter in prepared query service block #2137

Merged
merged 15 commits into from
Jul 1, 2016
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,10 @@ RPC:
func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req, resp *dns.Msg) {
// Execute the prepared query.
args := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Node: d.agent.config.NodeName,
},
Datacenter: datacenter,
QueryIDOrName: query,
QueryOptions: structs.QueryOptions{
Expand Down
8 changes: 1 addition & 7 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,7 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
// DC in the request, if given, or else the agent's DC.
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
s.parseDC(req, &source.Datacenter)
if node := req.URL.Query().Get("near"); node != "" {
if node == "_agent" {
source.Node = s.agent.config.NodeName
} else {
source.Node = node
}
}
source.Node = req.URL.Query().Get("near")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this'll break other endpoints that depend on ?near=_agent getting resolved at this level. I think you can pass the existing Source member of the execute request into here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it break? This just pushes the check out to the consul side instead of the agent. I was trying to avoid checking the magic flag in the agent and on the consul side (since the Near parameter can live in either place now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think you'll need to revert this change - should not be needed any more and there are other RPC endpoints that do the sort which rely on _agent getting resolved on the agent side, not the server side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, catalog and health both allow distance sorting. Will fix!

}

// parse is a convenience method for endpoints that need
Expand Down
12 changes: 0 additions & 12 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,6 @@ func TestParseSource(t *testing.T) {
if source.Datacenter != "foo" || source.Node != "bob" {
t.Fatalf("bad: %v", source)
}

// The magic "_agent" node name will use the agent's local node name.
req, err = http.NewRequest("GET",
"/v1/catalog/nodes?near=_agent", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
source = structs.QuerySource{}
srv.parseSource(req, &source)
if source.Datacenter != "dc1" || source.Node != srv.agent.config.NodeName {
t.Fatalf("bad: %v", source)
}
}

func TestParseWait(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions command/agent/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func parseLimit(req *http.Request, limit *int) error {
// preparedQueryExecute executes a prepared query.
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: s.agent.config.Datacenter,
Node: s.agent.config.NodeName,
},
QueryIDOrName: id,
}
s.parseSource(req, &args.Source)
Expand Down
4 changes: 4 additions & 0 deletions command/agent/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func TestPreparedQuery_Execute(t *testing.T) {

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
expected := &structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
Datacenter: "dc1",
QueryIDOrName: "my-id",
Limit: 5,
Expand Down
2 changes: 2 additions & 0 deletions consul/prepared_query/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
Failover: structs.QueryDatacenterOptions{
Datacenters: []string{"dc1", "dc2"},
},
Near: "_agent",
Tags: []string{"tag1", "tag2", "tag3"},
}
if err := walk(service, fn); err != nil {
Expand All @@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
".Service:the-service",
".Failover.Datacenters[0]:dc1",
".Failover.Datacenters[1]:dc2",
".Near:_agent",
".Tags[0]:tag1",
".Tags[1]:tag2",
".Tags[2]:tag3",
Expand Down
36 changes: 35 additions & 1 deletion consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,44 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Shuffle the results in case coordinates are not available if they
// requested an RTT sort.
reply.Nodes.Shuffle()
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {

// Get the source to sort by. This can be passed in by the requestor, or
// pre-defined using the Near parameter in the prepared query. If the
// near parameter was defined, that will be preferred.
sortFrom := args.Source
if query.Service.Near != "" {
sortFrom = structs.QuerySource{
Datacenter: args.Datacenter,
Node: query.Service.Near,
}
}

// Respect the magic "_agent" flag.
preferLocal := false
if sortFrom.Node == "_agent" {
preferLocal = true
sortFrom = args.Origin
}

// Perform the distance sort
Copy link
Contributor

@sean- sean- Jun 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to memoize this value for ~60s or whatever the interval is for the tomography calcs? I seem to recall someone saying this calc was "expensive," but if it's not, disregard. DNS can set a TTL, so maybe this isn't a huge issue, but that depends on the level of DNS caching at a given site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too bad - I'd probably leave as-is since this can be DNS TTL-ed and read-scaled with staleness across the servers.

if err := p.srv.sortNodesByDistanceFrom(sortFrom, reply.Nodes); err != nil {
return err
}

// Nodes cannot be any "closer" than localhost, so this special case ensures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem worth the special case since 0.6 has been out for a while and nobody should be disabling coordinates :-) It'll do some stuff in the common case where the thing is already in the front position as well, and that would add extra complexity to avoid.

// the local node is returned first if it is present in the result. This
// allows the local agent to be preferred even when network coordinates are
// not enabled. Only works if the results come from the request origin DC.
if preferLocal && reply.Datacenter == args.Origin.Datacenter {
for i, node := range reply.Nodes {
if node.Node.Node == args.Origin.Node {
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
reply.Nodes = append([]structs.CheckServiceNode{node}, remote...)
break
}
}
}

// Apply the limit if given.
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
reply.Nodes = reply.Nodes[:args.Limit]
Expand Down
132 changes: 131 additions & 1 deletion consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,137 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
}

// Set the query to prefer a colocated service using the magic _agent token
query.Op = structs.PreparedQueryUpdate
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Now try querying and make sure the local node is preferred.
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

// Repeat this a few times to make sure we don't just get lucky.
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node1" {
t.Fatalf("expect node1 first, got: %q", node)
}
}
}

// Falls back to remote nodes if service is not available locally
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
}

// Shuffles if the response comes from a non-local DC. We may
// need to try multiple times if at first we get a match by chance.
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc2",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if reply.Nodes[0].Node.Node != "node1" {
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect node shuffle for remote results")
}
}

// Bake a non-local node name into Near parameter of the query. This
// node was seeded with a coordinate above so distance sort works.
query.Query.Service.Near = "node3"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Try the distance sort again to ensure the nearest node is returned
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse

for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3, got: %q", node)
}
}
}

// Un-bake the Near parameter.
query.Query.Service.Near = ""
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err:% v", err)
}

// Update the health of a node to mark it critical.
setHealth := func(node string, health string) {
req := structs.RegisterRequest{
Expand Down Expand Up @@ -1683,7 +1814,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
}

// Make the query more picky so it excludes warning nodes.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.OnlyPassing = true
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
Expand Down
11 changes: 11 additions & 0 deletions consul/structs/prepared_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type ServiceQuery struct {
// discarded)
OnlyPassing bool

// Near allows the query to always prefer the node nearest the given
// node. If the node does not exist, results are returned in their
// normal randomly-shuffled order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to mention the magic _agent value here in the comment.

Near string

// Tags are a set of required and/or disallowed tags. If a tag is in
// this list it must be present. If the tag is preceded with "!" then
// it is disallowed.
Expand Down Expand Up @@ -173,6 +178,12 @@ type PreparedQueryExecuteRequest struct {
// Limit will trim the resulting list down to the given limit.
Limit int

// Origin is used to carry around a reference to the node which
// is executing the query on behalf of the client. It is taken
// as a QuerySource so that it can be used directly for queries
// relating to the agent servicing the request.
Origin QuerySource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you take a look at the existing Source parameter right after this one? I think it was originally intended to do what Origin does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty close yeah. I originally tried to use that, but I switched to this because in the HTTP/DNS api code at the agent, you can't be sure what the prepared query actually does, so the agent can't be sure if it should pass its own name + datacenter, or the user's provided ?near and/or ?dc value. This just makes it so we can send both and decide outside of the agent.


// Source is used to sort the results relative to a given node using
// network coordinates.
Source QuerySource
Expand Down