Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node metadata #2643

Merged
merged 9 commits into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type QueryOptions struct {
// that node. Setting this to "_agent" will use the agent's node
// for the sort.
Near string

// NodeMeta is used to filter results by nodes with the given
// metadata key/value pairs. Currently, only one key/value pair can
// be provided for filtering.
NodeMeta map[string]string
}

// WriteOptions are used to parameterize a write
Expand Down Expand Up @@ -386,6 +391,11 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Near != "" {
r.params.Set("near", q.Near)
}
if len(q.NodeMeta) > 0 {
for key, value := range q.NodeMeta {
r.params.Add("node-meta", key+":"+value)
}
}
}

// durToMsec converts a duration to a millisecond specified string. If the
Expand Down
3 changes: 3 additions & 0 deletions api/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ type Node struct {
Node string
Address string
TaggedAddresses map[string]string
Meta map[string]string
}

type CatalogService struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string
ServiceName string
ServiceAddress string
Expand All @@ -29,6 +31,7 @@ type CatalogRegistration struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
Datacenter string
Service *AgentService
Check *AgentCheck
Expand Down
113 changes: 113 additions & 0 deletions api/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,64 @@ func TestCatalog_Nodes(t *testing.T) {
})
}

func TestCatalog_Nodes_MetaFilter(t *testing.T) {
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()

catalog := c.Catalog()

// Make sure we get the node back when filtering by its metadata
testutil.WaitForResult(func() (bool, error) {
nodes, meta, err := catalog.Nodes(&QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}

if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}

if len(nodes) == 0 {
return false, fmt.Errorf("Bad: %v", nodes)
}

if _, ok := nodes[0].TaggedAddresses["wan"]; !ok {
return false, fmt.Errorf("Bad: %v", nodes[0])
}

if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" {
return false, fmt.Errorf("Bad: %v", nodes[0].Meta)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})

// Get nothing back when we use an invalid filter
testutil.WaitForResult(func() (bool, error) {
nodes, meta, err := catalog.Nodes(&QueryOptions{NodeMeta: map[string]string{"nope":"nope"}})
if err != nil {
return false, err
}

if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}

if len(nodes) != 0 {
return false, fmt.Errorf("Bad: %v", nodes)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

func TestCatalog_Services(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down Expand Up @@ -87,6 +145,56 @@ func TestCatalog_Services(t *testing.T) {
})
}

func TestCatalog_Services_NodeMetaFilter(t *testing.T) {
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()

catalog := c.Catalog()

// Make sure we get the service back when filtering by the node's metadata
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Services(&QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}

if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}

if len(services) == 0 {
return false, fmt.Errorf("Bad: %v", services)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})

// Get nothing back when using an invalid filter
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Services(&QueryOptions{NodeMeta: map[string]string{"nope":"nope"}})
if err != nil {
return false, err
}

if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}

if len(services) != 0 {
return false, fmt.Errorf("Bad: %v", services)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

func TestCatalog_Service(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down Expand Up @@ -173,6 +281,7 @@ func TestCatalog_Registration(t *testing.T) {
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: service,
Check: check,
}
Expand Down Expand Up @@ -200,6 +309,10 @@ func TestCatalog_Registration(t *testing.T) {
return false, fmt.Errorf("missing checkid service:redis1")
}

if v, ok := node.Node.Meta["somekey"]; !ok || v != "somevalue" {
return false, fmt.Errorf("missing node meta pair somekey:somevalue")
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
Expand Down
88 changes: 87 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,26 @@ const (
"but no reason was provided. This is a default message."
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
"service, but no reason was provided. This is a default message."

// The meta key prefix reserved for Consul's internal use
metaKeyReservedPrefix = "consul-"

// The maximum number of metadata key pairs allowed to be registered
metaMaxKeyPairs = 64

// The maximum allowed length of a metadata key
metaKeyMaxLength = 128

// The maximum allowed length of a metadata value
metaValueMaxLength = 512
)

var (
// dnsNameRe checks if a name or tag is dns-compatible.
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)

// metaKeyFormat checks if a metadata key string is valid
metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
)

/*
Expand Down Expand Up @@ -246,13 +261,16 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
return nil, err
}

// Load checks/services.
// Load checks/services/metadata.
if err := agent.loadServices(config); err != nil {
return nil, err
}
if err := agent.loadChecks(config); err != nil {
return nil, err
}
if err := agent.loadMetadata(config); err != nil {
return nil, err
}

// Start watching for critical services to deregister, based on their
// checks.
Expand Down Expand Up @@ -1677,6 +1695,74 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
}
}

// loadMetadata loads node metadata fields from the agent config and
// updates them on the local agent.
func (a *Agent) loadMetadata(conf *Config) error {
a.state.Lock()
defer a.state.Unlock()

for key, value := range conf.Meta {
a.state.metadata[key] = value
}

a.state.changeMade()

return nil
}

// parseMetaPair parses a key/value pair of the form key:value
func parseMetaPair(raw string) (string, string) {
pair := strings.SplitN(raw, ":", 2)
if len(pair) == 2 {
return pair[0], pair[1]
} else {
return pair[0], ""
}
}

// validateMeta validates a set of key/value pairs from the agent config
func validateMetadata(meta map[string]string) error {
if len(meta) > metaMaxKeyPairs {
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
}

for key, value := range meta {
if err := validateMetaPair(key, value); err != nil {
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
}
}

return nil
}

// validateMetaPair checks that the given key/value pair is in a valid format
func validateMetaPair(key, value string) error {
if key == "" {
return fmt.Errorf("Key cannot be blank")
}
if !metaKeyFormat(key) {
return fmt.Errorf("Key contains invalid characters")
}
if len(key) > metaKeyMaxLength {
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
}
if strings.HasPrefix(key, metaKeyReservedPrefix) {
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
}
if len(value) > metaValueMaxLength {
return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength)
}
return nil
}

// unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() {
a.state.Lock()
defer a.state.Unlock()

a.state.metadata = make(map[string]string)
}

// serviceMaintCheckID returns the ID of a given service's maintenance check
func serviceMaintCheckID(serviceID string) types.CheckID {
return types.CheckID(structs.ServiceMaintPrefix + serviceID)
Expand Down
2 changes: 2 additions & 0 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type AgentSelf struct {
Coord *coordinate.Coordinate
Member serf.Member
Stats map[string]map[string]string
Meta map[string]string
}

func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -47,6 +48,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
Coord: c,
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
}, nil
}

Expand Down
10 changes: 9 additions & 1 deletion command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ func TestAgent_Checks_ACLFilter(t *testing.T) {
}

func TestAgent_Self(t *testing.T) {
dir, srv := makeHTTPServer(t)
meta := map[string]string{
"somekey": "somevalue",
}
dir, srv := makeHTTPServerWithConfig(t, func(conf *Config) {
conf.Meta = meta
})
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
Expand Down Expand Up @@ -232,6 +237,9 @@ func TestAgent_Self(t *testing.T) {
if !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}
if !reflect.DeepEqual(meta, val.Meta) {
t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta)
}

srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)
Expand Down
Loading