Skip to content

Commit

Permalink
First stab at enabling ACLs for a nomad cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
apollo13 committed Aug 28, 2023
1 parent 254d9af commit 17b832c
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 69 deletions.
175 changes: 115 additions & 60 deletions pkg/clients/nomad/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
//go:generate mockery --name Nomad --filename nomad.go
type Nomad interface {
// SetConfig for the client, path is a valid Nomad JSON config file
SetConfig(address string, port, nodes int) error
SetConfig(address string, port, nodes int, acl_token string) error
// Create jobs in the provided files
Create(files []string) error
// Stop jobs in the provided files
Expand All @@ -30,9 +30,11 @@ type Nomad interface {
// HealthCheckAPI uses the Nomad API to check that all servers and nodes
// are ready. The function will block until either all nodes are healthy or the
// timeout period elapses.
HealthCheckAPI(time.Duration) error
HealthCheckAPI(time.Duration, bool) error
// Endpoints returns a list of endpoints for a cluster
Endpoints(job, group, task string) ([]map[string]string, error)
// Bootstrap ACLs
BootstrapACLs() error
}

// NomadImpl is an implementation of the Nomad interface
Expand All @@ -43,6 +45,7 @@ type NomadImpl struct {
address string
port int
clientNodes int
aclToken string
}

// NewNomad creates a new Nomad client
Expand All @@ -59,17 +62,52 @@ type createRequest struct {
Job string
}

func (n *NomadImpl) setAuthHeaders(rq *http.Request) {
if n.aclToken != "" {
rq.Header.Set("X-Nomad-Token", n.aclToken)
}
}

// SetConfig loads the Nomad config from a file
func (n *NomadImpl) SetConfig(address string, port, nodes int) error {
func (n *NomadImpl) SetConfig(address string, port, nodes int, acl_token string) error {
n.address = address
n.port = port
n.clientNodes = nodes
n.aclToken = acl_token

return nil
}

// HealthCheckAPI executes a HTTP heath check for a Nomad cluster
func (n *NomadImpl) HealthCheckAPI(timeout time.Duration) error {
func (n *NomadImpl) BootstrapACLs() error {
if n.aclToken != "" {
n.l.Debug("Bootstrapping ACLs", "address", n.address)

jsonBody := []byte(fmt.Sprintf(`{"BootstrapSecret":"%s"}`, n.aclToken))
bodyReader := bytes.NewReader(jsonBody)

rq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s:%d/v1/acl/bootstrap", n.address, n.port), bodyReader)
if err != nil {
return err
}

resp, err := n.httpClient.Do(rq)
if err != nil {
return xerrors.Errorf("Unable to bootstrap ACLs: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
// try to read the body for the error
d, _ := ioutil.ReadAll(resp.Body)
return xerrors.Errorf("Error bootstrapping ACLs, got status code %d, error: %s", resp.StatusCode, string(d))
}

}
return nil
}

// HealthCheckAPI executes a HTTP health check for a Nomad cluster
func (n *NomadImpl) HealthCheckAPI(timeout time.Duration, simple bool) error {
n.l.Debug("Performing Nomad health check", "address", n.address)
st := time.Now()
for {
Expand All @@ -79,75 +117,87 @@ func (n *NomadImpl) HealthCheckAPI(timeout time.Duration) error {
return fmt.Errorf("Timeout waiting for Nomad healthcheck %s", n.address)
}

rq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/nodes", n.address, n.port), nil)
if err != nil {
return err
}

resp, err := n.httpClient.Do(rq)
if err == nil && resp.StatusCode == 200 {
nodes := []map[string]interface{}{}
// check number of nodes
json.NewDecoder(resp.Body).Decode(&nodes)

// loop nodes and check ready
readyCount := 0
for _, node := range nodes {
// get the node status
nodeStatus := node["Status"].(string)
nodeName := node["Name"].(string)
nodeEligable := node["SchedulingEligibility"].(string)

n.l.Debug("Node status", "node", nodeName, "status", nodeStatus, "eligible", nodeEligable)
// get the driver status
drivers, ok := node["Drivers"].(map[string]interface{})
if !ok {
continue
}

var driversHealthy = true
var dockerDetected = false
for k, v := range drivers {
driver, ok := v.(map[string]interface{})
if !ok {
continue
}
if simple {
rq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/status/leader", n.address, n.port), nil)
if err != nil {
return err
}
resp, err := n.httpClient.Do(rq)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
} else {
rq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/nodes", n.address, n.port), nil)
n.setAuthHeaders(rq)
if err != nil {
return err
}

healthy, ok := driver["Healthy"].(bool)
resp, err := n.httpClient.Do(rq)
if err == nil && resp.StatusCode == http.StatusOK {
nodes := []map[string]interface{}{}
// check number of nodes
json.NewDecoder(resp.Body).Decode(&nodes)

// loop nodes and check ready
readyCount := 0
for _, node := range nodes {
// get the node status
nodeStatus := node["Status"].(string)
nodeName := node["Name"].(string)
nodeEligable := node["SchedulingEligibility"].(string)

n.l.Debug("Node status", "node", nodeName, "status", nodeStatus, "eligible", nodeEligable)
// get the driver status
drivers, ok := node["Drivers"].(map[string]interface{})
if !ok {
continue
}

detected, ok := driver["Detected"].(bool)
if !ok || !detected {
continue
}
var driversHealthy = true
var dockerDetected = false
for k, v := range drivers {
driver, ok := v.(map[string]interface{})
if !ok {
continue
}

healthy, ok := driver["Healthy"].(bool)
if !ok {
continue
}

detected, ok := driver["Detected"].(bool)
if !ok || !detected {
continue
}

// we need to make a special case to check the docker driver is
// present as if the nomad server starts before docker then the
// presence of docker will not be detected
if k == "docker" {
dockerDetected = true
}

n.l.Debug("Driver status", "node", nodeName, "driver", k, "healthy", healthy)
if !healthy {
driversHealthy = false
}

// we need to make a special case to check the docker driver is
// present as if the nomad server starts before docker then the
// presence of docker will not be detected
if k == "docker" {
dockerDetected = true
}

n.l.Debug("Driver status", "node", nodeName, "driver", k, "healthy", healthy)
if !healthy {
driversHealthy = false
if nodeStatus == "ready" && nodeEligable == "eligible" && driversHealthy && dockerDetected {
readyCount++
}

}

if nodeStatus == "ready" && nodeEligable == "eligible" && driversHealthy && dockerDetected {
readyCount++
if readyCount == n.clientNodes {
n.l.Debug("Nomad check complete", "address", n.address)
return nil
}
}

if readyCount == n.clientNodes {
n.l.Debug("Nomad check complete", "address", n.address)
return nil
n.l.Debug("Nodes not ready", "ready", readyCount, "nodes", n.clientNodes)
}

n.l.Debug("Nodes not ready", "ready", readyCount, "nodes", n.clientNodes)
}

// backoff
Expand All @@ -171,6 +221,7 @@ func (n *NomadImpl) Create(files []string) error {
cr := fmt.Sprintf(`{"Job": %s}`, string(jsonJob))

r, err := http.NewRequest(http.MethodPost, addr, bytes.NewReader([]byte(cr)))
n.setAuthHeaders(r)
if err != nil {
return xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -201,6 +252,7 @@ func (n *NomadImpl) Stop(files []string) error {

// stop the job
r, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s:%d/v1/job/%s", n.address, n.port, id), nil)
n.setAuthHeaders(r)
if err != nil {
return xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -235,6 +287,7 @@ func (n *NomadImpl) ParseJob(file string) ([]byte, error) {

// validate the config with the Nomad API
r, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s:%d/v1/jobs/parse", n.address, n.port), bytes.NewReader(jobData))
n.setAuthHeaders(r)
if err != nil {
return nil, xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -313,6 +366,7 @@ func (n *NomadImpl) Endpoints(job, group, task string) ([]map[string]string, err
}

r, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/allocation/%s", n.address, n.port, j["ID"]), nil)
n.setAuthHeaders(r)
if err != nil {
return nil, xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -393,6 +447,7 @@ func (n *NomadImpl) Endpoints(job, group, task string) ([]map[string]string, err
func (n *NomadImpl) getJobAllocations(job string) ([]map[string]interface{}, error) {
// get the allocations for the job
r, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/job/%s/allocations", n.address, n.port, job), nil)
n.setAuthHeaders(r)
if err != nil {
return nil, xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/clients/nomad/nomad_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func setupNomadTests(t *testing.T) (Nomad, string, *mocks.HTTP) {
)

c := NewNomad(mh, 1*time.Millisecond, logger.NewTestLogger(t))
c.SetConfig("local", 4646, 1)
c.SetConfig("local", 4646, 1, "")

return c, tmpDir, mh
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/config/resources/nomad/provider_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (p *ClusterProvider) Refresh() error {

wg.Wait()

p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1)
err := p.nomadClient.HealthCheckAPI(startTimeout)
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1, p.config.ACLToken)
err := p.nomadClient.HealthCheckAPI(startTimeout, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -174,8 +174,8 @@ func (p *ClusterProvider) Refresh() error {
p.log.Debug("Successfully created client node", "ref", p.config.ID, "client", fqdn)
}

p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1)
err := p.nomadClient.HealthCheckAPI(startTimeout)
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1, p.config.ACLToken)
err := p.nomadClient.HealthCheckAPI(startTimeout, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -391,8 +391,16 @@ func (p *ClusterProvider) createNomad() error {
}

// ensure all client nodes are up
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, clientNodes)
err = p.nomadClient.HealthCheckAPI(startTimeout)
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, clientNodes, p.config.ACLToken)
err = p.nomadClient.HealthCheckAPI(startTimeout, true)
if err != nil {
return err
}
err = p.nomadClient.BootstrapACLs()
if err != nil {
return err
}
err = p.nomadClient.HealthCheckAPI(startTimeout, false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/resources/nomad/provider_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *JobProvider) Create() error {
nomadCluster := p.config.Cluster

// load the config
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes)
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes, nomadCluster.ACLToken)

err := p.client.Create(p.config.Paths)
if err != nil {
Expand Down Expand Up @@ -99,7 +99,7 @@ func (p *JobProvider) Destroy() error {
nomadCluster := p.config.Cluster

// load the config
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes)
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes, nomadCluster.ACLToken)

err := p.client.Stop(p.config.Paths)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/resources/nomad/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type NomadCluster struct {
Ports ctypes.Ports `hcl:"port,block" json:"ports,omitempty"` // ports to expose
PortRanges ctypes.PortRanges `hcl:"port_range,block" json:"port_ranges,omitempty"` // range of ports to expose

ACLToken string `hcl:"acl_token,optional" json:"acl_token,omitempty"`

// Output Parameters

// The APIPort the server is running on
Expand Down

0 comments on commit 17b832c

Please sign in to comment.