Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: updates to the agent token trigger anti-entropy full syncs #6577

Merged
merged 2 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package agent
import (
"encoding/json"
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"log"
"net/http"
"path/filepath"
"strconv"
"strings"

"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"

"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/debug"
Expand Down Expand Up @@ -1188,12 +1189,19 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in

// Figure out the target token.
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
triggerAntiEntropySync := false
switch target {
case "acl_token", "default":
s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}

case "acl_agent_token", "agent":
s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}

case "acl_agent_master_token", "agent_master":
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
Expand All @@ -1207,6 +1215,10 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in
return nil, nil
}

if triggerAntiEntropySync {
s.agent.sync.SyncFull.Trigger()
}

if s.agent.config.ACLEnableTokenPersistence {
tokens := persistedTokens{}

Expand Down
108 changes: 108 additions & 0 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -4153,6 +4154,113 @@ func TestAgent_Monitor_ACLDeny(t *testing.T) {
// here.
}

func TestAgent_TokenTriggersFullSync(t *testing.T) {
t.Parallel()

body := func(token string) io.Reader {
return jsonReader(&api.AgentToken{Token: token})
}

createNodePolicy := func(t *testing.T, a *TestAgent, policyName string) *structs.ACLPolicy {
policy := &structs.ACLPolicy{
Name: policyName,
Rules: `node_prefix "" { policy = "write" }`,
}

req, err := http.NewRequest("PUT", "/v1/acl/policy?token=root", jsonBody(policy))
require.NoError(t, err)

resp := httptest.NewRecorder()
obj, err := a.srv.ACLPolicyCreate(resp, req)
require.NoError(t, err)

policy, ok := obj.(*structs.ACLPolicy)
require.True(t, ok)
return policy
}

createNodeToken := func(t *testing.T, a *TestAgent, policyName string) *structs.ACLToken {
createNodePolicy(t, a, policyName)

token := &structs.ACLToken{
Description: "test",
Policies: []structs.ACLTokenPolicyLink{
structs.ACLTokenPolicyLink{Name: policyName},
},
}

req, err := http.NewRequest("PUT", "/v1/acl/token?token=root", jsonBody(token))
require.NoError(t, err)

resp := httptest.NewRecorder()
obj, err := a.srv.ACLTokenCreate(resp, req)
require.NoError(t, err)

token, ok := obj.(*structs.ACLToken)
require.True(t, ok)
return token
}

cases := []struct {
path string
tokenGetFn func(*token.Store) string
}{
{
path: "acl_agent_token",
tokenGetFn: (*token.Store).AgentToken,
},
{
path: "agent",
tokenGetFn: (*token.Store).AgentToken,
},
{
path: "acl_token",
tokenGetFn: (*token.Store).UserToken,
},
{
path: "default",
tokenGetFn: (*token.Store).UserToken,
},
}

for _, tt := range cases {
tt := tt
t.Run(tt.path, func(t *testing.T) {
url := fmt.Sprintf("/v1/agent/token/%s?token=root", tt.path)

a := NewTestAgent(t, t.Name(), TestACLConfig()+`
acl {
tokens {
default = ""
agent = ""
agent_master = ""
replication = ""
}
}
`)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

// create node policy and token
token := createNodeToken(t, a, "test")

req, err := http.NewRequest("PUT", url, body(token.SecretID))
require.NoError(t, err)

resp := httptest.NewRecorder()
_, err = a.srv.AgentToken(resp, req)
require.NoError(t, err)

require.Equal(t, http.StatusOK, resp.Code)
require.Equal(t, token.SecretID, tt.tokenGetFn(a.tokens))

testrpc.WaitForTestAgent(t, a.RPC, "dc1",
testrpc.WithToken("root"),
testrpc.WaitForAntiEntropySync())
})
}
}

func TestAgent_Token(t *testing.T) {
t.Parallel()

Expand Down
20 changes: 16 additions & 4 deletions agent/token/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,35 +52,47 @@ type Store struct {
}

// UpdateUserToken replaces the current user token in the store.
func (t *Store) UpdateUserToken(token string, source TokenSource) {
// Returns true if it was changed.
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.userToken != token || t.userTokenSource != source)
t.userToken = token
t.userTokenSource = source
t.l.Unlock()
return changed
}

// UpdateAgentToken replaces the current agent token in the store.
func (t *Store) UpdateAgentToken(token string, source TokenSource) {
// Returns true if it was changed.
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.agentToken != token || t.agentTokenSource != source)
t.agentToken = token
t.agentTokenSource = source
t.l.Unlock()
return changed
}

// UpdateAgentMasterToken replaces the current agent master token in the store.
func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) {
// Returns true if it was changed.
func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source)
t.agentMasterToken = token
t.agentMasterTokenSource = source
t.l.Unlock()
return changed
}

// UpdateReplicationToken replaces the current replication token in the store.
func (t *Store) UpdateReplicationToken(token string, source TokenSource) {
// Returns true if it was changed.
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.replicationToken != token || t.replicationTokenSource != source)
t.replicationToken = token
t.replicationTokenSource = source
t.l.Unlock()
return changed
}

// UserToken returns the best token to use for user operations.
Expand Down
14 changes: 10 additions & 4 deletions agent/token/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,16 @@ func TestStore_RegularTokens(t *testing.T) {
t.Parallel()

s := new(Store)
s.UpdateUserToken(tt.set.user, tt.set.userSource)
s.UpdateAgentToken(tt.set.agent, tt.set.agentSource)
s.UpdateReplicationToken(tt.set.repl, tt.set.replSource)
s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource)
require.True(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
require.True(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
require.True(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
require.True(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource))

// If they don't change then they return false.
require.False(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
require.False(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
require.False(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
require.False(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource))

require.Equal(t, tt.effective.user, s.UserToken())
require.Equal(t, tt.effective.agent, s.AgentToken())
Expand Down
22 changes: 19 additions & 3 deletions testrpc/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,34 @@ func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) {
}

type waitOption struct {
Token string
Token string
WaitForAntiEntropySync bool
}

func WithToken(token string) waitOption {
return waitOption{Token: token}
}

func WaitForAntiEntropySync() waitOption {
return waitOption{WaitForAntiEntropySync: true}
}

// WaitForTestAgent ensures we have a node with serfHealth check registered
func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
var nodes structs.IndexedNodes
var checks structs.IndexedHealthChecks

// first extra arg is an optional acl token
var token string
var (
token string
waitForAntiEntropySync bool
)
for _, opt := range options {
if opt.Token != "" {
token = opt.Token
}
if opt.WaitForAntiEntropySync {
waitForAntiEntropySync = true
}
}

retry.Run(t, func(r *retry.R) {
Expand All @@ -73,6 +83,12 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption)
r.Fatalf("No registered nodes")
}

if waitForAntiEntropySync {
if len(nodes.Nodes[0].TaggedAddresses) == 0 {
r.Fatalf("Not synced via anti entropy yet")
}
}

// This assumes that there is a single agent per dc, typically a TestAgent
nodeReq := &structs.NodeSpecificRequest{
Datacenter: dc,
Expand Down