Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect: ensure intention replication continues to work when the replication ACL token changes #6288

Merged
merged 1 commit into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions agent/consul/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5314,6 +5314,18 @@ func retrieveTestToken(codec rpc.ClientCodec, masterToken string, datacenter str
return &out, nil
}

func deleteTestToken(codec rpc.ClientCodec, masterToken string, datacenter string, tokenAccessor string) error {
arg := structs.ACLTokenDeleteRequest{
Datacenter: datacenter,
TokenID: tokenAccessor,
WriteRequest: structs.WriteRequest{Token: masterToken},
}

var ignored string
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenDelete", &arg, &ignored)
return err
}

func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, policyID string) error {
arg := structs.ACLPolicyDeleteRequest{
Datacenter: datacenter,
Expand Down
6 changes: 4 additions & 2 deletions agent/consul/leader_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,15 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
// the intentions there to the local state.
func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{Token: s.tokens.ReplicationToken()},
Datacenter: s.config.PrimaryDatacenter,
}

s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter)

retryLoopBackoff(stopCh, func() error {
// Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken()

var remote structs.IndexedIntentions
if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil {
return err
Expand Down
56 changes: 47 additions & 9 deletions agent/consul/leader_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
uuid "github.com/hashicorp/go-uuid"
Expand Down Expand Up @@ -463,28 +464,56 @@ func TestLeader_ReplicateIntentions(t *testing.T) {

assert := assert.New(t)
require := require.New(t)
dir1, s1 := testServer(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

testrpc.WaitForLeader(t, s1.RPC, "dc1")

s1.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)

// create some tokens
replToken1, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`)
require.NoError(err)

replToken2, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`)
require.NoError(err)

// dc2 as a secondary DC
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLDefaultPolicy = "deny"
c.ACLTokenReplication = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()

s2.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)

// start out with one token
s2.tokens.UpdateReplicationToken(replToken1.SecretID, tokenStore.TokenSourceConfig)

// Create the WAN link
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s2.RPC, "dc2")

// Create an intention in dc1
ixn := structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Datacenter: "dc1",
WriteRequest: structs.WriteRequest{Token: "root"},
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
SourceNS: structs.IntentionDefaultNamespace,
SourceName: "test",
Expand All @@ -504,8 +533,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
ixn.Intention.ID = reply
retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{
Datacenter: "dc2",
IntentionID: ixn.Intention.ID,
Datacenter: "dc2",
QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID,
}
var resp structs.IndexedIntentions
r.Check(s2.RPC("Intention.Get", req, &resp))
Expand All @@ -519,6 +549,12 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
// Sleep a bit so that the UpdatedAt field will definitely be different
time.Sleep(1 * time.Millisecond)

// delete underlying acl token being used for replication
require.NoError(deleteTestToken(codec, "root", "dc1", replToken1.AccessorID))

// switch to the other token
s2.tokens.UpdateReplicationToken(replToken2.SecretID, tokenStore.TokenSourceConfig)

// Update the intention in dc1
ixn.Op = structs.IntentionOpUpdate
ixn.Intention.ID = reply
Expand All @@ -530,8 +566,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
var resp structs.IndexedIntentions
retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{
Datacenter: "dc2",
IntentionID: ixn.Intention.ID,
Datacenter: "dc2",
QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID,
}
r.Check(s2.RPC("Intention.Get", req, &resp))
if len(resp.Intentions) != 1 {
Expand Down Expand Up @@ -559,8 +596,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
// Wait for the delete to be replicated
retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{
Datacenter: "dc2",
IntentionID: ixn.Intention.ID,
Datacenter: "dc2",
QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID,
}
var resp structs.IndexedIntentions
err := s2.RPC("Intention.Get", req, &resp)
Expand Down