Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ACL replication. #2237

Merged
merged 18 commits into from
Aug 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions acl/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// FaultFunc is a function used to fault in the parent,
// rules for an ACL given it's ID
// rules for an ACL given its ID
type FaultFunc func(id string) (string, string, error)

// aclEntry allows us to store the ACL with it's policy ID
Expand All @@ -21,19 +21,32 @@ type aclEntry struct {
// Cache is used to implement policy and ACL caching
type Cache struct {
faultfn FaultFunc
aclCache *lru.Cache // Cache id -> acl
policyCache *lru.Cache // Cache policy -> acl
ruleCache *lru.Cache // Cache rules -> policy
aclCache *lru.TwoQueueCache // Cache id -> acl
policyCache *lru.TwoQueueCache // Cache policy -> acl
ruleCache *lru.TwoQueueCache // Cache rules -> policy
}

// NewCache constructs a new policy and ACL cache of a given size
func NewCache(size int, faultfn FaultFunc) (*Cache, error) {
if size <= 0 {
return nil, fmt.Errorf("Must provide positive cache size")
}
rc, _ := lru.New(size)
pc, _ := lru.New(size)
ac, _ := lru.New(size)

rc, err := lru.New2Q(size)
if err != nil {
return nil, err
}

pc, err := lru.New2Q(size)
if err != nil {
return nil, err
}

ac, err := lru.New2Q(size)
if err != nil {
return nil, err
}

c := &Cache{
faultfn: faultfn,
aclCache: ac,
Expand All @@ -46,7 +59,7 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) {
// GetPolicy is used to get a potentially cached policy set.
// If not cached, it will be parsed, and then cached.
func (c *Cache) GetPolicy(rules string) (*Policy, error) {
return c.getPolicy(c.ruleID(rules), rules)
return c.getPolicy(RuleID(rules), rules)
}

// getPolicy is an internal method to get a cached policy,
Expand All @@ -66,8 +79,8 @@ func (c *Cache) getPolicy(id, rules string) (*Policy, error) {

}

// ruleID is used to generate an ID for a rule
func (c *Cache) ruleID(rules string) string {
// RuleID is used to generate an ID for a rule
func RuleID(rules string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(rules)))
}

Expand Down Expand Up @@ -112,7 +125,7 @@ func (c *Cache) GetACL(id string) (ACL, error) {
if err != nil {
return nil, err
}
ruleID := c.ruleID(rules)
ruleID := RuleID(rules)

// Check for a compiled ACL
policyID := c.policyID(parentID, ruleID)
Expand Down
44 changes: 37 additions & 7 deletions acl/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

func TestCache_GetPolicy(t *testing.T) {
c, err := NewCache(1, nil)
c, err := NewCache(2, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -24,11 +24,23 @@ func TestCache_GetPolicy(t *testing.T) {
t.Fatalf("should be cached")
}

// Cache a new policy
// Work with some new policies to evict the original one
_, err = c.GetPolicy(testSimplePolicy)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = c.GetPolicy(testSimplePolicy)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = c.GetPolicy(testSimplePolicy2)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = c.GetPolicy(testSimplePolicy2)
if err != nil {
t.Fatalf("err: %v", err)
}

// Test invalidation of p
p3, err := c.GetPolicy("")
Expand All @@ -44,12 +56,13 @@ func TestCache_GetACL(t *testing.T) {
policies := map[string]string{
"foo": testSimplePolicy,
"bar": testSimplePolicy2,
"baz": testSimplePolicy3,
}
faultfn := func(id string) (string, string, error) {
return "deny", policies[id], nil
}

c, err := NewCache(1, faultfn)
c, err := NewCache(2, faultfn)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -80,6 +93,18 @@ func TestCache_GetACL(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = c.GetACL("bar")
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = c.GetACL("baz")
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = c.GetACL("baz")
if err != nil {
t.Fatalf("err: %v", err)
}

acl3, err := c.GetACL("foo")
if err != nil {
Expand All @@ -100,7 +125,7 @@ func TestCache_ClearACL(t *testing.T) {
return "deny", policies[id], nil
}

c, err := NewCache(1, faultfn)
c, err := NewCache(16, faultfn)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -135,7 +160,7 @@ func TestCache_Purge(t *testing.T) {
return "deny", policies[id], nil
}

c, err := NewCache(1, faultfn)
c, err := NewCache(16, faultfn)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -167,7 +192,7 @@ func TestCache_GetACLPolicy(t *testing.T) {
faultfn := func(id string) (string, string, error) {
return "deny", policies[id], nil
}
c, err := NewCache(1, faultfn)
c, err := NewCache(16, faultfn)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -220,7 +245,7 @@ func TestCache_GetACL_Parent(t *testing.T) {
return "", "", nil
}

c, err := NewCache(1, faultfn)
c, err := NewCache(16, faultfn)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -296,3 +321,8 @@ key "bar/" {
policy = "read"
}
`
var testSimplePolicy3 = `
key "baz/" {
policy = "read"
}
`
17 changes: 17 additions & 0 deletions command/agent/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,20 @@ func (s *HTTPServer) ACLList(resp http.ResponseWriter, req *http.Request) (inter
}
return out.ACLs, nil
}

func (s *HTTPServer) ACLReplicationStatus(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Note that we do not forward to the ACL DC here. This is a query for
// any DC that's doing replication.
args := structs.DCSpecificRequest{}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

// Make the request.
var out structs.ACLReplicationStatus
if err := s.agent.RPC("ACL.ReplicationStatus", &args, &out); err != nil {
return nil, err
}
return out, nil
}
15 changes: 15 additions & 0 deletions command/agent/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,18 @@ func TestACLList(t *testing.T) {
}
})
}

func TestACLReplicationStatus(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
req, err := http.NewRequest("GET", "/v1/acl/replication", nil)
resp := httptest.NewRecorder()
obj, err := srv.ACLReplicationStatus(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
_, ok := obj.(structs.ACLReplicationStatus)
if !ok {
t.Fatalf("should work")
}
})
}
3 changes: 3 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.ACLDownPolicy != "" {
base.ACLDownPolicy = a.config.ACLDownPolicy
}
if a.config.ACLReplicationToken != "" {
base.ACLReplicationToken = a.config.ACLReplicationToken
}
if a.config.SessionTTLMinRaw != "" {
base.SessionTTLMin = a.config.SessionTTLMin
}
Expand Down
9 changes: 9 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ type Config struct {
// this acts like deny.
ACLDownPolicy string `mapstructure:"acl_down_policy"`

// ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in
// order to replicate them locally. Setting this to a non-empty value
// also enables replication. Replication is only available in datacenters
// other than the ACLDatacenter.
ACLReplicationToken string `mapstructure:"acl_replication_token" json:"-"`

// Watches are used to monitor various endpoints and to invoke a
// handler to act appropriately. These are managed entirely in the
// agent layer using the standard APIs.
Expand Down Expand Up @@ -1319,6 +1325,9 @@ func MergeConfig(a, b *Config) *Config {
if b.ACLDefaultPolicy != "" {
result.ACLDefaultPolicy = b.ACLDefaultPolicy
}
if b.ACLReplicationToken != "" {
result.ACLReplicationToken = b.ACLReplicationToken
}
if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...)
}
Expand Down
7 changes: 6 additions & 1 deletion command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,8 @@ func TestDecodeConfig(t *testing.T) {
// ACLs
input = `{"acl_token": "1234", "acl_datacenter": "dc2",
"acl_ttl": "60s", "acl_down_policy": "deny",
"acl_default_policy": "deny", "acl_master_token": "2345"}`
"acl_default_policy": "deny", "acl_master_token": "2345",
"acl_replication_token": "8675309"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
Expand All @@ -646,6 +647,9 @@ func TestDecodeConfig(t *testing.T) {
if config.ACLDefaultPolicy != "deny" {
t.Fatalf("bad: %#v", config)
}
if config.ACLReplicationToken != "8675309" {
t.Fatalf("bad: %#v", config)
}

// Watches
input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}`
Expand Down Expand Up @@ -1432,6 +1436,7 @@ func TestMergeConfig(t *testing.T) {
ACLTTLRaw: "15s",
ACLDownPolicy: "deny",
ACLDefaultPolicy: "deny",
ACLReplicationToken: "8765309",
Watches: []map[string]interface{}{
map[string]interface{}{
"type": "keyprefix",
Expand Down
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,15 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet))
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone))
s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList))
s.mux.HandleFunc("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
} else {
s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/destroy/", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/replication", s.wrap(aclDisabled))
}

s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
Expand Down
Loading