Skip to content

Commit

Permalink
api: enable selecting subset of services using rendezvous hashing
Browse files Browse the repository at this point in the history
This PR adds the 'choose' query parameter to the '/v1/service/<service>' endpoint.

The value of 'choose' is in the form '<number>|<key>', number is the number
of desired services and key is a value unique but consistent to the requester
(e.g. allocID).

Folks aren't really expected to use this API directly, but rather through consul-template
which will soon be getting a new helper function making use of this query parameter.

Example,

curl 'localhost:4646/v1/service/redis?choose=2|abc123'

Note: consul-templte v0.29.1 includes the necessary nomadServices functionality.
  • Loading branch information
shoenig committed Jun 25, 2022
1 parent f5b8506 commit bdead31
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 15 deletions.
3 changes: 3 additions & 0 deletions .changelog/12862.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: enable setting ?choose parameter when querying services
```
5 changes: 4 additions & 1 deletion command/agent/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h
func (s *HTTPServer) serviceGetRequest(
resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) {

args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName}
args := structs.ServiceRegistrationByNameRequest{
ServiceName: serviceName,
Choose: req.URL.Query().Get("choose"),
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
Expand Down
105 changes: 98 additions & 7 deletions command/agent/service_registration_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -157,6 +158,7 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
name string
}{
{
name: "delete by ID",
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
Expand Down Expand Up @@ -186,9 +188,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Nil(t, out)
require.NoError(t, err)
},
name: "delete by ID",
},
{
name: "get service by name",
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
Expand All @@ -214,9 +216,99 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.NotZero(t, respW.Header().Get("X-Nomad-Index"))
require.Equal(t, serviceReg, obj.([]*structs.ServiceRegistration)[0])
},
name: "get service by name",
},
{
name: "get service using choose",
testFn: func(s *TestAgent) {
// Grab the state so we can manipulate and test against it.
testState := s.Agent.server.State()

err := testState.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 10,
[]*structs.ServiceRegistration{{
ID: "978d519a-46ad-fb04-966b-000000000001",
ServiceName: "redis",
Namespace: "default",
NodeID: "node1",
Datacenter: "dc1",
JobID: "job1",
AllocID: "8b83191f-cb29-e23a-d955-220b65ef676d",
Tags: nil,
Address: "10.0.0.1",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}, {
ID: "978d519a-46ad-fb04-966b-000000000002",
ServiceName: "redis",
Namespace: "default",
NodeID: "node2",
Datacenter: "dc1",
JobID: "job1",
AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078",
Tags: nil,
Address: "10.0.0.2",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}, {
ID: "978d519a-46ad-fb04-966b-000000000003",
ServiceName: "redis",
Namespace: "default",
NodeID: "node3",
Datacenter: "dc1",
JobID: "job1",
AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078",
Tags: nil,
Address: "10.0.0.3",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}},
)
must.NoError(t, err)

// Build the HTTP request for 1 instance of the service, using key=abc123
req, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=1|abc123", nil)
must.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.ServiceRegistrationRequest(respW, req)
must.NoError(t, err)

// Check we got the correct type back.
services, ok := (obj).([]*structs.ServiceRegistration)
must.True(t, ok)

// Check we got the expected number of services back.
must.Len(t, 1, services)

// Build the HTTP request for 2 instances of the service, still using key=abc123
req2, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=2|abc123", nil)
must.NoError(t, err)
respW2 := httptest.NewRecorder()

// Send the 2nd HTTP request.
obj2, err := s.Server.ServiceRegistrationRequest(respW2, req2)
must.NoError(t, err)

// Check we got the correct type back.
services2, ok := (obj2).([]*structs.ServiceRegistration)
must.True(t, ok)

// Check we got the expected number of services back.
must.Len(t, 2, services2)

// Check the first service is the same as the previous service.
must.Eq(t, services[0], services2[0])

// Check the second service is not the same as the first service.
must.NotEq(t, services2[0], services2[1])
},
},
{
name: "incorrect URI format",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -230,9 +322,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "invalid URI")
require.Nil(t, obj)
},
name: "incorrect URI format",
},
{
name: "get service empty name",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -246,9 +338,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "missing service name")
require.Nil(t, obj)
},
name: "get service empty name",
},
{
name: "get service incorrect method",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -262,9 +354,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "get service incorrect method",
},
{
name: "delete service empty id",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -278,9 +370,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "missing service id")
require.Nil(t, obj)
},
name: "delete service empty id",
},
{
name: "delete service incorrect method",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -294,7 +386,6 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "delete service incorrect method",
},
}

Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/grpc-ecosystem/go-grpc-middleware v1.2.1-0.20200228141219-3ce3d519df39
github.com/hashicorp/consul v1.7.8
github.com/hashicorp/consul-template v0.29.0
github.com/hashicorp/consul-template v0.29.1
github.com/hashicorp/consul/api v1.13.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/cronexpr v1.1.1
Expand Down Expand Up @@ -117,7 +117,7 @@ require (
github.com/zclconf/go-cty-yaml v1.0.2
go.etcd.io/bbolt v1.3.5
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220609121020-a51bd0440498
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down Expand Up @@ -210,6 +210,7 @@ require (
github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.1 // indirect
github.com/hashicorp/mdns v1.0.4 // indirect
github.com/hashicorp/vault/api/auth/kubernetes v0.1.0 // indirect
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
Expand Down
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul v1.7.8 h1:hp308KxAf3zWoGuwp2e+0UUhrm6qHjeBQk3jCZ+bjcY=
github.com/hashicorp/consul v1.7.8/go.mod h1:urbfGaVZDmnXC6geg0LYPh/SRUk1E8nfmDHpz+Q0nLw=
github.com/hashicorp/consul-template v0.29.0 h1:rDmF3Wjqp5ztCq054MruzEpi9ArcyJ/Rp4eWrDhMldM=
github.com/hashicorp/consul-template v0.29.0/go.mod h1:p1A8Z6Mz7gbXu38SI1c9nt5ItBK7ACWZG4ZE1A5Tr2M=
github.com/hashicorp/consul-template v0.29.1 h1:icm/H7klHYlxpUoWqSmTIWaSLEfGqUJJBsZA/2JhTLU=
github.com/hashicorp/consul-template v0.29.1/go.mod h1:QIohwBuXlKXtsmGGQdWrISlUy4E6LFg5tLZyrw4MyoU=
github.com/hashicorp/consul/api v1.4.0/go.mod h1:xc8u05kyMa3Wjr9eEAsIAo3dg8+LywT5E/Cl7cNS5nU=
github.com/hashicorp/consul/api v1.13.0 h1:2hnLQ0GjQvw7f3O61jMO8gbasZviZTrt9R8WzgiirHc=
github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ=
Expand Down Expand Up @@ -801,9 +801,13 @@ github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpT
github.com/hashicorp/serf v0.9.7 h1:hkdgbqizGQHuU5IPqYM1JdSMV8nKfpuOnZYXssk9muY=
github.com/hashicorp/serf v0.9.7/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoIospckxBxk6Q=
github.com/hashicorp/vault/api v1.3.0/go.mod h1:EabNQLI0VWbWoGlA+oBLC8PXmR9D60aUVgQGvangFWQ=
github.com/hashicorp/vault/api v1.4.1 h1:mWLfPT0RhxBitjKr6swieCEP2v5pp/M//t70S3kMLRo=
github.com/hashicorp/vault/api v1.4.1/go.mod h1:LkMdrZnWNrFaQyYYazWVn7KshilfDidgVBq6YiTq/bM=
github.com/hashicorp/vault/api/auth/kubernetes v0.1.0 h1:6BtyahbF4aQp8gg3ww0A/oIoqzbhpNP1spXU3nHE0n0=
github.com/hashicorp/vault/api/auth/kubernetes v0.1.0/go.mod h1:Pdgk78uIs0mgDOLvc3a+h/vYIT9rznw2sz+ucuH9024=
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=
github.com/hashicorp/vault/sdk v0.3.0/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0=
github.com/hashicorp/vault/sdk v0.4.1 h1:3SaHOJY687jY1fnB61PtL0cOkKItphrbLmux7T92HBo=
github.com/hashicorp/vault/sdk v0.4.1/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0=
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 h1:O/pT5C1Q3mVXMyuqg7yuAWUg/jMZR1/0QTzTRdNR6Uw=
Expand Down Expand Up @@ -1329,8 +1333,9 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167 h1:O8uGbHCqlTp2P6QJSLmCojM4mN6UemYv8K+dCnmHmu0=
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
79 changes: 78 additions & 1 deletion nomad/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package nomad

import (
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService(
http.StatusBadRequest, "failed to read result page: %v", err)
}

// Select which subset and the order of services to return if using ?choose
if args.Choose != "" {
chosen, chooseErr := s.choose(services, args.Choose)
if chooseErr != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to choose services: %v", chooseErr)
}
services = chosen
}

// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
Expand All @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService(
})
}

// choose uses rendezvous hashing to make a stable selection of a subset of services
// to return.
//
// parameter must in the form "<number>|<key>", where number is the number of services
// to select, and key is incorporated in the hashing function with each service -
// creating a unique yet consistent priority distribution pertaining to the requester.
// In practice (i.e. via consul-template), the key is the AllocID generating a request
// for upstream services.
//
// https://en.wikipedia.org/wiki/Rendezvous_hashing
// w := priority (i.e. hash value)
// h := hash function
// O := object - (i.e. requesting service - using key (allocID) as a proxy)
// S := site (i.e. destination service)
func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) {
// extract the number of services
tokens := strings.SplitN(parameter, "|", 2)
if len(tokens) != 2 {
return nil, structs.ErrMalformedChooseParameter
}
n, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, structs.ErrMalformedChooseParameter
}

// extract the hash key
key := tokens[1]
if key == "" {
return nil, structs.ErrMalformedChooseParameter
}

// if there are fewer services than requested, go with the number of services
if l := len(services); l < n {
n = l
}

type pair struct {
hash string
service *structs.ServiceRegistration
}

// associate hash for each service
priorities := make([]*pair, len(services))
for i, service := range services {
priorities[i] = &pair{
hash: service.HashWith(key),
service: service,
}
}

// sort by the hash; creating random distribution of priority
sort.SliceStable(priorities, func(i, j int) bool {
return priorities[i].hash < priorities[j].hash
})

// choose top n services
chosen := make([]*structs.ServiceRegistration, n)
for i := 0; i < n; i++ {
chosen[i] = priorities[i].service
}

return chosen, nil
}

// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
Expand All @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions,
}
}
default:
// In the event we got any error other than notfound, consider this
// In the event we got any error other than ErrTokenNotFound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
Expand Down
Loading

0 comments on commit bdead31

Please sign in to comment.