Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Namespaces #115

Merged
merged 3 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,6 @@ down the privileges required for ESM the following [ACL policy rules][rules]
can be used:

```hcl
operator = "read"

agent_prefix "" {
policy = "read"
}
Expand Down Expand Up @@ -374,8 +372,6 @@ consul-esm is registered with
parameter

```hcl
operator = "read"

agent "<consul-agent-node-name>" {
policy = "read"
}
Expand Down Expand Up @@ -403,14 +399,59 @@ session "<consul-agent-node-name>" {

For context on usage of each ACL:

- `operator:read` - for feature to check consul and esm version compatibility
- `agent:read` - for features to check version compatibility and calculating network coordinates
- `key:write` - to store assigned checks
- `node:write` - to update the status of each node that esm monitors
- `node:read` - to retrieve nodes that need to be monitored
- `service:write` - to register esm service
- `session:write` - to acquire esm cluster leader lock

### Consul Namespaces (Enterprise Feature)

ESM supports [Consul Enterprise Namespaces
](https://www.consul.io/docs/enterprise/namespaces). When run with enterprise
Consul servers it will scan all accessible Namespaces for external nodes and
health checks to monitor. What is meant by "all accessible" is all Namespaces
accessible via [Namespace ACL rules](
https://www.consul.io/docs/security/acl/acl-rules) that provide `read` level
access to the Namespace. The simplest case of wanting to access all Namespaces
would add the below rule to the ESM ACL policy in the previous section...

```hcl
namespace_prefix "" {
acl = "read"
}
```

If an ESM instance needs to monitor only a subset of existing Namespaces, the
policy will need to grant access to each Namespace explicitly. For example lets
say we have 3 Namespaces, "foo", "bar" and "zed" and you want this ESM to only
monitor "foo" and "bar". Your policy would need to have these listed (or a
common prefix would work)...

```hcl
namespace "foo" {
acl = "read"
}
namespace "bar" {
acl = "read"
}
```

#### Namespaces + `consul_kv_path` config setting:

* If you have multiple ESMs for HA (secondary, backup ESMs) have the **same**
value set to `consul_kv_path`. (in practice these configs are identical)

* If you have multiple ESMs for separate Namespaces each must use a
**different** setting for `consul_kv_path`.

ESM uses the `consul_kv_path` to determine where to keep its meta data. This
meta data will be different for each ESM monitoring different Namespaces.

Note you can have both, those in HA clusters would have the same value and each
separate HA cluster would use different values.

## Contributing

**Note** if you run Linux and see `socket: permission denied` errors with UDP
Expand Down
20 changes: 15 additions & 5 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (a *Agent) watchHealthChecks(nodeListCh chan map[string]bool) {
}
firstRun = false

// All ESM health checks are node checks and in the 'default' namespace
checks, meta, err := a.client.Health().State(api.HealthAny, opts)
if err != nil {
a.logger.Warn("Error querying for health check info", "error", err)
Expand Down Expand Up @@ -527,7 +528,7 @@ VERIFYCONSULSERVER:
}

// Fetch server versions
resp, err := a.client.Operator().AutopilotServerHealth(nil)
svs, _, err := a.client.Catalog().Service("consul", "", nil)
if err != nil {
if strings.Contains(err.Error(), "429") {
// 429 is a warning that something is unhealthy. This may occur when ESM
Expand All @@ -542,13 +543,22 @@ VERIFYCONSULSERVER:

versions := []string{agentVersion}
uniqueVersions := map[string]bool{agentVersion: true}
for _, s := range resp.Servers {
if !uniqueVersions[s.Version] {
uniqueVersions[s.Version] = true
versions = append(versions, s.Version)
var foundServer bool
for _, s := range svs {
if v, ok := s.ServiceMeta["version"]; ok {
foundServer = true
if !uniqueVersions[v] {
uniqueVersions[v] = true
versions = append(versions, v)
}
}
}

if !foundServer {
a.logger.Warn("unable to determine Consul server version, check for " +
"compatibility; requires " + version.GetConsulVersionConstraint())
}

err = version.CheckConsulVersions(versions)
if err != nil {
a.logger.Error("Incompatible Consul versions")
Expand Down
82 changes: 65 additions & 17 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -34,7 +37,7 @@ func testAgent(t *testing.T, cb func(*Config)) *Agent {

go func() {
if err := agent.Run(); err != nil {
t.Fatal(err)
panic(err)
}
}()

Expand Down Expand Up @@ -239,24 +242,69 @@ func TestAgent_LastKnownStatusIsExpired(t *testing.T) {
}

func TestAgent_VerifyConsulCompatibility(t *testing.T) {
// Smoke test to test the compatibility with the current Consul version
// pinned in go dependency.
t.Parallel()
s, err := NewTestServer(t)
if err != nil {
t.Fatal(err)
type testCase struct {
name, agentJSON, serviceJSON string
shouldPass bool
}
testVersion := func(t *testing.T, tc testCase) {
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
uri := r.RequestURI
switch { // ignore anything that doesn't require a return body
case strings.Contains(uri, "status/leader"):
fmt.Fprint(w, `"127.0.0.1"`)
case strings.Contains(uri, "agent/self"):
fmt.Fprint(w, tc.agentJSON)
case strings.Contains(uri, "catalog/service"):
fmt.Fprint(w, tc.serviceJSON)
}
}))
defer ts.Close()

agent := testAgent(t, func(c *Config) {
c.HTTPAddr = ts.URL
c.InstanceID = "test-agent"
})
time.Sleep(time.Millisecond) // race with Run's go routines
defer agent.Shutdown()

err := agent.VerifyConsulCompatibility()
switch {
case tc.shouldPass && err != nil:
t.Fatalf("unexpected error: %s", err)
case !tc.shouldPass && err == nil:
t.Fatalf("should be an error and wasn't: %#v", tc)
}
}
testCases := []testCase{
{
name: "good",
agentJSON: `{"Config": {"Version": "1.10.0"}}`,
serviceJSON: `[{"ServiceMeta" : {"version": "1.10.0"}}]`,
shouldPass: true,
},
{
name: "bad-agent",
agentJSON: `{"Config": {"Version": "1.0.0"}}`,
serviceJSON: `[{"ServiceMeta" : {"version": "1.10.0"}}]`,
},
{
name: "bad-server",
agentJSON: `{"Config": {"Version": "1.10.0"}}`,
serviceJSON: `[{"ServiceMeta" : {"version": "1.0.0"}}]`,
},
{
name: "no-server-version-meta",
agentJSON: `{"Config": {"Version": "1.10.0"}}`,
serviceJSON: `[{"ServiceMeta" : {}}]`,
shouldPass: true, // logs a warning
},
}
defer s.Stop()

agent := testAgent(t, func(c *Config) {
c.HTTPAddr = s.HTTPAddr
c.Tag = "test"
})
defer agent.Shutdown()

err = agent.VerifyConsulCompatibility()
if err != nil {
t.Fatalf("unexpected error: %s", err)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testVersion(t, tc)
})
}
}

Expand Down
70 changes: 59 additions & 11 deletions leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"reflect"
"sort"
"strings"
"time"

"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -256,22 +257,69 @@ func (a *Agent) watchServiceInstances(instanceCh chan []*api.ServiceEntry, stopC
case <-stopCh:
return
case <-time.After(retryTime / 10):
// Sleep here to limit how much load we put on the Consul servers. We can
// wait a lot less than the normal retry time here because the ESM service instance
// list is relatively small and cheap to query.
// Sleep here to limit how much load we put on the Consul servers.
// We can wait a lot less than the normal retry time here because
// the ESM service instance list is relatively small and cheap to
// query.
}

healthyInstances, meta, err := a.client.Health().Service(a.config.Service, a.config.Tag, true, opts)
switch healthyInstances, err := a.getServiceInstances(opts); err {
case nil:
instanceCh <- healthyInstances
default:
a.logger.Warn("[WARN] Error querying for health check info",
"error", err)
continue // not needed, but nice to be explicit
}
}
}

// getServiceInstances retuns a list of services with a 'passing' (healthy) state.
// It loops over all available namespaces to get instances from each.
func (a *Agent) getServiceInstances(opts *api.QueryOptions) ([]*api.ServiceEntry, error) {
var healthyInstances []*api.ServiceEntry
var meta *api.QueryMeta

namespaces, err := namespacesList(a.client)
if err != nil {
return nil, err
}

for _, ns := range namespaces {
if ns.Name != "" {
a.logger.Info("checking namespaces for services", "name", ns.Name)
}
opts.Namespace = ns.Name
healthy, m, err := a.client.Health().Service(a.config.Service,
a.config.Tag, true, opts)
if err != nil {
a.logger.Warn("[WARN] Error querying for health check info", "error", err)
continue
return nil, err
}
sort.Slice(healthyInstances, func(a, b int) bool {
return healthyInstances[a].Service.ID < healthyInstances[b].Service.ID
})
meta = m // keep last good meta
for _, h := range healthy {
healthyInstances = append(healthyInstances, h)
}
}
opts.WaitIndex = meta.LastIndex

opts.WaitIndex = meta.LastIndex
sort.Slice(healthyInstances, func(a, b int) bool {
return healthyInstances[a].Service.ID < healthyInstances[b].Service.ID
})

return healthyInstances, nil
}

instanceCh <- healthyInstances
// namespacesList returns a list of all accessable namespaces.
// Returns namespace "" (none) if none found for consul OSS compatibility.
func namespacesList(client *api.Client) ([]*api.Namespace, error) {
ossErr := "Unexpected response code: 404" // error snippet OSS consul returns
namespaces, _, err := client.Namespaces().List(nil)
switch {
case err == nil:
case strings.Contains(err.Error(), ossErr):
namespaces = []*api.Namespace{{Name: ""}}
case err != nil: // default, but more explicit
return nil, err
}
return namespaces, nil
}
Loading