diff --git a/consul/config.go b/consul/config.go index 9cb1944cbcd7..4e834b178ccc 100644 --- a/consul/config.go +++ b/consul/config.go @@ -158,6 +158,29 @@ type Config struct { // "allow" can be used to allow all requests. This is not recommended. ACLDownPolicy string + // TombstoneTTL is used to control how long KV tombstones are retained. + // This provides a window of time where the X-Consul-Index is monotonic. + // Outside this window, the index may not be monotonic. This is a result + // of a few trade offs: + // 1) The index is defined by the data view and not globally. This is a + // performance optimization that prevents any write from incrementing the + // index for all data views. + // 2) Tombstones are not kept indefinitely, since otherwise storage required + // is also monotonic. This prevents deletes from reducing the disk space + // used. + // In theory, neither of these are intrinsic limitations, however for the + // purposes of building a practical system, they are reaonable trade offs. + // + // It is also possible to set this to an incredibly long time, thereby + // simulating infinite retention. This is not recommended however. + // + TombstoneTTL time.Duration + + // TombstoneTTLGranularity is used to control how granular the timers are + // for the Tombstone GC. This is used to batch the GC of many keys together + // to reduce overhead. It is unlikely a user would ever need to tune this. + TombstoneTTLGranularity time.Duration + // ServerUp callback can be used to trigger a notification that // a Consul server is now up and known about. ServerUp func() @@ -205,17 +228,19 @@ func DefaultConfig() *Config { } conf := &Config{ - Datacenter: DefaultDC, - NodeName: hostname, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLANConfig: serf.DefaultConfig(), - SerfWANConfig: serf.DefaultConfig(), - ReconcileInterval: 60 * time.Second, - ProtocolVersion: ProtocolVersionMax, - ACLTTL: 30 * time.Second, - ACLDefaultPolicy: "allow", - ACLDownPolicy: "extend-cache", + Datacenter: DefaultDC, + NodeName: hostname, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: serf.DefaultConfig(), + SerfWANConfig: serf.DefaultConfig(), + ReconcileInterval: 60 * time.Second, + ProtocolVersion: ProtocolVersionMax, + ACLTTL: 30 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "extend-cache", + TombstoneTTL: 15 * time.Minute, + TombstoneTTLGranularity: 30 * time.Second, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/fsm.go b/consul/fsm.go index 903137044824..7980b98856c2 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -8,6 +8,7 @@ import ( "log" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/raft" @@ -21,6 +22,7 @@ type consulFSM struct { logger *log.Logger path string state *StateStore + gc *TombstoneGC } // consulSnapshot is used to provide a snapshot of the current @@ -38,7 +40,7 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { +func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) { // Create a temporary path for the state store tmpPath, err := ioutil.TempDir(path, "state") if err != nil { @@ -46,7 +48,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { } // Create a state store - state, err := NewStateStorePath(tmpPath, logOutput) + state, err := NewStateStorePath(gc, tmpPath, logOutput) if err != nil { return nil, err } @@ -56,6 +58,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { logger: log.New(logOutput, "", log.LstdFlags), path: path, state: state, + gc: gc, } return fsm, nil } @@ -83,6 +86,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applySessionOperation(buf[1:], log.Index) case structs.ACLRequestType: return c.applyACLOperation(buf[1:], log.Index) + case structs.TombstoneRequestType: + return c.applyTombstoneOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -97,6 +102,7 @@ func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} { } func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) // Apply all updates in a single transaction if err := c.state.EnsureRegistration(index, req); err != nil { c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err) @@ -106,6 +112,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) in } func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now()) var req structs.DeregisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -136,6 +143,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now()) switch req.Op { case structs.KVSSet: return c.state.KVSSet(index, &req.DirEnt) @@ -176,6 +184,7 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now()) switch req.Op { case structs.SessionCreate: if err := c.state.SessionCreate(index, &req.Session); err != nil { @@ -196,6 +205,7 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now()) switch req.Op { case structs.ACLForceSet: fallthrough @@ -213,6 +223,21 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { } } +func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { + var req structs.TombstoneRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now()) + switch req.Op { + case structs.TombstoneReap: + return c.state.ReapTombstones(req.ReapIndex) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op) + return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op) + } +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) @@ -236,7 +261,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } // Create a new state store - state, err := NewStateStorePath(tmpPath, c.logOutput) + state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput) if err != nil { return err } @@ -299,6 +324,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { return err } + case structs.TombstoneRequestType: + var req structs.DirEntry + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.TombstoneRestore(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -308,6 +342,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { + defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now()) // Register the nodes encoder := codec.NewEncoder(sink, msgpackHandle) @@ -339,6 +374,11 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + + if err := s.persistTombstones(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -444,6 +484,33 @@ func (s *consulSnapshot) persistKV(sink raft.SnapshotSink, } } +func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + streamCh := make(chan interface{}, 256) + errorCh := make(chan error) + go func() { + if err := s.state.TombstoneDump(streamCh); err != nil { + errorCh <- err + } + }() + + for { + select { + case raw := <-streamCh: + if raw == nil { + return nil + } + sink.Write([]byte{byte(structs.TombstoneRequestType)}) + if err := encoder.Encode(raw); err != nil { + return err + } + + case err := <-errorCh: + return err + } + } +} + func (s *consulSnapshot) Release() { s.state.Close() } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index db01580e4226..3a069ff46278 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -42,7 +42,7 @@ func TestFSM_RegisterNode(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -82,7 +82,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -139,7 +139,7 @@ func TestFSM_DeregisterService(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -198,7 +198,7 @@ func TestFSM_DeregisterCheck(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -257,7 +257,7 @@ func TestFSM_DeregisterNode(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -328,7 +328,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -357,6 +357,12 @@ func TestFSM_SnapshotRestore(t *testing.T) { acl := &structs.ACL{ID: generateUUID(), Name: "User Token"} fsm.state.ACLSet(10, acl) + fsm.state.KVSSet(11, &structs.DirEntry{ + Key: "/remove", + Value: []byte("foo"), + }) + fsm.state.KVSDelete(12, "/remove") + // Snapshot snap, err := fsm.Snapshot() if err != nil { @@ -372,7 +378,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Try to restore on a new FSM - fsm2, err := NewFSM(path, os.Stderr) + fsm2, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -446,6 +452,15 @@ func TestFSM_SnapshotRestore(t *testing.T) { if idx <= 1 { t.Fatalf("bad index: %d", idx) } + + // Verify tombstones are restored + _, res, err := fsm.state.tombstoneTable.Get("id", "/remove") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 1 { + t.Fatalf("bad: %v", res) + } } func TestFSM_KVSSet(t *testing.T) { @@ -453,7 +468,7 @@ func TestFSM_KVSSet(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -492,7 +507,7 @@ func TestFSM_KVSDelete(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -542,7 +557,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -593,7 +608,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -654,7 +669,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -738,7 +753,7 @@ func TestFSM_KVSLock(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -787,7 +802,7 @@ func TestFSM_KVSUnlock(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -854,7 +869,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -925,3 +940,46 @@ func TestFSM_ACL_Set_Delete(t *testing.T) { t.Fatalf("should be destroyed") } } + +func TestFSM_TombstoneReap(t *testing.T) { + path, err := ioutil.TempDir("", "fsm") + if err != nil { + t.Fatalf("err: %v", err) + } + fsm, err := NewFSM(nil, path, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + // Create some tombstones + fsm.state.KVSSet(11, &structs.DirEntry{ + Key: "/remove", + Value: []byte("foo"), + }) + fsm.state.KVSDelete(12, "/remove") + + // Create a new reap request + req := structs.TombstoneRequest{ + Datacenter: "dc1", + Op: structs.TombstoneReap, + ReapIndex: 12, + } + buf, err := structs.Encode(structs.TombstoneRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if err, ok := resp.(error); ok { + t.Fatalf("resp: %v", err) + } + + // Verify the tombstones are gone + _, res, err := fsm.state.tombstoneTable.Get("id") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 0 { + t.Fatalf("bad: %v", res) + } +} diff --git a/consul/issue_test.go b/consul/issue_test.go index 77bca7ce9873..5676c6a1d563 100644 --- a/consul/issue_test.go +++ b/consul/issue_test.go @@ -15,7 +15,7 @@ func TestHealthCheckRace(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 53ed238be15b..e4b5a36b6f59 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -135,13 +135,14 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e &reply.QueryMeta, state.QueryTables("KVSList"), func() error { - index, ent, err := state.KVSList(args.Key) + tombIndex, index, ent, err := state.KVSList(args.Key) if err != nil { return err } if acl != nil { ent = FilterDirEnt(acl, ent) } + if len(ent) == 0 { // Must provide non-zero index to prevent blocking // Index 1 is impossible anyways (due to Raft internals) @@ -150,7 +151,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e } else { reply.Index = index } - reply.Entries = nil } else { // Determine the maximum affected index var maxIndex uint64 @@ -159,7 +159,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e maxIndex = e.ModifyIndex } } - + if tombIndex > maxIndex { + maxIndex = tombIndex + } reply.Index = maxIndex reply.Entries = ent } diff --git a/consul/leader.go b/consul/leader.go index 7c9484b8f3f2..c0871120435c 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -50,16 +50,15 @@ func (s *Server) monitorLeadership() { // leaderLoop runs as long as we are the leader to run various // maintence activities func (s *Server) leaderLoop(stopCh chan struct{}) { + // Ensure we revoke leadership on stepdown + defer s.revokeLeadership() + // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err) } - // Clear the session timers on either shutdown or step down, since we - // are no longer responsible for session expirations. - defer s.clearAllSessionTimers() - // Reconcile channel is only used once initial reconcile // has succeeded var reconcileCh chan serf.Member @@ -112,6 +111,8 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) + case index := <-s.tombstoneGC.ExpireCh(): + go s.reapTombstones(index) } } } @@ -121,6 +122,14 @@ WAIT: // previously inflight transactions have been commited and that our // state is up-to-date. func (s *Server) establishLeadership() error { + // Hint the tombstone expiration timer. When we freshly establish leadership + // we become the authoritative timer, and so we need to start the clock + // on any pending GC events. + s.tombstoneGC.SetEnabled(true) + lastIndex := s.raft.LastIndex() + s.tombstoneGC.Hint(lastIndex) + s.logger.Printf("[DEBUG] consul: reset tombstone GC to index %d", lastIndex) + // Setup ACLs if we are the leader and need to if err := s.initializeACL(); err != nil { s.logger.Printf("[ERR] consul: ACL initialization failed: %v", err) @@ -144,6 +153,21 @@ func (s *Server) establishLeadership() error { return nil } +// revokeLeadership is invoked once we step down as leader. +// This is used to cleanup any state that may be specific to a leader. +func (s *Server) revokeLeadership() error { + // Disable the tombstone GC, since it is only useful as a leader + s.tombstoneGC.SetEnabled(false) + + // Clear the session timers on either shutdown or step down, since we + // are no longer responsible for session expirations. + if err := s.clearAllSessionTimers(); err != nil { + s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err) + return err + } + return nil +} + // initializeACL is used to setup the ACLs if we are the leader // and need to do this. func (s *Server) initializeACL() error { @@ -518,3 +542,24 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { } return nil } + +// reapTombstones is invoked by the current leader to manage garbage +// collection of tombstones. When a key is deleted, we trigger a tombstone +// GC clock. Once the expiration is reached, this routine is invoked +// to clear all tombstones before this index. This must be replicated +// through Raft to ensure consistency. We do this outside the leader loop +// to avoid blocking. +func (s *Server) reapTombstones(index uint64) { + defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now()) + req := structs.TombstoneRequest{ + Datacenter: s.config.Datacenter, + Op: structs.TombstoneReap, + ReapIndex: index, + WriteRequest: structs.WriteRequest{Token: s.config.ACLToken}, + } + _, err := s.raftApply(structs.TombstoneRequestType, &req) + if err != nil { + s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v", + index, err) + } +} diff --git a/consul/leader_test.go b/consul/leader_test.go index 1bd910858bda..7c51dc32a637 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -436,3 +436,127 @@ func TestLeader_MultiBootstrap(t *testing.T) { } } } + +func TestLeader_TombstoneGC_Reset(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.raftPeers.Peers() + return len(peers) == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + var leader *Server + for _, s := range servers { + if s.IsLeader() { + leader = s + break + } + } + if leader == nil { + t.Fatalf("Should have a leader") + } + + // Check that the leader has a pending GC expiration + if !leader.tombstoneGC.PendingExpiration() { + t.Fatalf("should have pending expiration") + } + + // Kill the leader + leader.Shutdown() + time.Sleep(100 * time.Millisecond) + + // Wait for a new leader + leader = nil + testutil.WaitForResult(func() (bool, error) { + for _, s := range servers { + if s.IsLeader() { + leader = s + return true, nil + } + } + return false, nil + }, func(err error) { + t.Fatalf("should have leader") + }) + + // Check that the new leader has a pending GC expiration + testutil.WaitForResult(func() (bool, error) { + return leader.tombstoneGC.PendingExpiration(), nil + }, func(err error) { + t.Fatalf("should have pending expiration") + }) +} + +func TestLeader_ReapTombstones(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.TombstoneTTL = 50 * time.Millisecond + c.TombstoneTTLGranularity = 10 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + client := rpcClient(t, s1) + testutil.WaitForLeader(t, client.Call, "dc1") + + // Create a KV entry + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Value: []byte("test"), + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the KV entry (tombstoned) + arg.Op = structs.KVSDelete + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a tombstone + _, res, err := s1.fsm.State().tombstoneTable.Get("id") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) == 0 { + t.Fatalf("missing tombstones") + } + + // Check that the new leader has a pending GC expiration + testutil.WaitForResult(func() (bool, error) { + _, res, err := s1.fsm.State().tombstoneTable.Get("id") + return len(res) == 0, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 23856fc5f6b2..37eb52503842 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -389,10 +389,32 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac // Accumulate the results var results []interface{} - err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { obj := t.Decoder(res) results = append(results, obj) - return false + return false, false + }) + + return results, err +} + +// GetTxnLimit is like GetTxn limits the maximum number of +// rows it will return +func (t *MDBTable) GetTxnLimit(tx *MDBTxn, limit int, index string, parts ...string) ([]interface{}, error) { + // Get the associated index + idx, key, err := t.getIndex(index, parts) + if err != nil { + return nil, err + } + + // Accumulate the results + var results []interface{} + num := 0 + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { + num++ + obj := t.Decoder(res) + results = append(results, obj) + return false, num == limit }) return results, err @@ -412,10 +434,10 @@ func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string } // Stream the results - err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { obj := t.Decoder(res) stream <- obj - return false + return false, false }) return err @@ -508,7 +530,7 @@ func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) ( }() // Delete everything as we iterate - err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { // Get the object obj := t.Decoder(res) @@ -542,7 +564,7 @@ func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) ( // Delete the object num++ - return true + return true, false }) if err != nil { return 0, err @@ -644,7 +666,7 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte { // and invoking the cb with each row. We dereference the rowid, // and only return the object row func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, - cb func(encRowId, res []byte) bool) error { + cb func(encRowId, res []byte) (bool, bool)) error { table := tx.dbis[i.table.Name] // If virtual, use the correct DBI @@ -667,8 +689,9 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, var key, encRowId, objBytes []byte first := true + shouldStop := false shouldDelete := false - for { + for !shouldStop { if first && len(prefix) > 0 { first = false key, encRowId, err = cursor.Get(prefix, mdb.SET_RANGE) @@ -708,7 +731,8 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, } // Invoke the cb - if shouldDelete = cb(encRowId, objBytes); shouldDelete { + shouldDelete, shouldStop = cb(encRowId, objBytes) + if shouldDelete { if err := cursor.Del(0); err != nil { return fmt.Errorf("delete failed: %v", err) } diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index e70c9131f09b..73e4001d12e1 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -2,12 +2,13 @@ package consul import ( "bytes" - "github.com/armon/gomdb" - "github.com/hashicorp/go-msgpack/codec" "io/ioutil" "os" "reflect" "testing" + + "github.com/armon/gomdb" + "github.com/hashicorp/go-msgpack/codec" ) type MockData struct { @@ -970,3 +971,78 @@ func TestMDBTableStream(t *testing.T) { t.Fatalf("bad index: %d", idx) } } + +func TestMDBTableGetTxnLimit(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "name": &MDBIndex{ + Fields: []string{"First", "Last"}, + }, + "country": &MDBIndex{ + Fields: []string{"Country"}, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Kevin", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "Kevin", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "Bernardo", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(idx + 1)); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Start a readonly txn + tx, err := table.StartTxn(true, nil) + if err != nil { + panic(err) + } + defer tx.Abort() + + // Verify with some gets + res, err := table.GetTxnLimit(tx, 2, "id") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 2 { + t.Fatalf("expect 2 result: %#v", res) + } +} diff --git a/consul/server.go b/consul/server.go index 9cdfa0b53b71..88096a175a23 100644 --- a/consul/server.go +++ b/consul/server.go @@ -134,6 +134,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // tombstoneGC is used to track the pending GC invocations + // for the KV tombstones + tombstoneGC *TombstoneGC + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -189,6 +193,12 @@ func NewServer(config *Config) (*Server, error) { // Create a logger logger := log.New(config.LogOutput, "", log.LstdFlags) + // Create the tombstone GC + gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) + if err != nil { + return nil, err + } + // Create server s := &Server{ config: config, @@ -201,6 +211,7 @@ func NewServer(config *Config) (*Server, error) { remoteConsuls: make(map[string][]*serverParts), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, + tombstoneGC: gc, shutdownCh: make(chan struct{}), } @@ -320,7 +331,7 @@ func (s *Server) setupRaft() error { // Create the FSM var err error - s.fsm, err = NewFSM(statePath, s.config.LogOutput) + s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput) if err != nil { return err } diff --git a/consul/state_store.go b/consul/state_store.go index 273311435fc9..825c0131ae21 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -20,6 +20,7 @@ const ( dbServices = "services" dbChecks = "checks" dbKVS = "kvs" + dbTombstone = "tombstones" dbSessions = "sessions" dbSessionChecks = "sessionChecks" dbACLs = "acls" @@ -54,6 +55,7 @@ type StateStore struct { serviceTable *MDBTable checkTable *MDBTable kvsTable *MDBTable + tombstoneTable *MDBTable sessionTable *MDBTable sessionCheckTable *MDBTable aclTable *MDBTable @@ -76,6 +78,10 @@ type StateStore struct { // is never questioned. lockDelay map[string]time.Time lockDelayLock sync.RWMutex + + // GC is when we create tombstones to track their time-to-live. + // The GC is consumed upstream to manage clearing of tombstones. + gc *TombstoneGC } // StateSnapshot is used to provide a point-in-time snapshot @@ -102,18 +108,18 @@ func (s *StateSnapshot) Close() error { } // NewStateStore is used to create a new state store -func NewStateStore(logOutput io.Writer) (*StateStore, error) { +func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { // Create a new temp dir path, err := ioutil.TempDir("", "consul") if err != nil { return nil, err } - return NewStateStorePath(path, logOutput) + return NewStateStorePath(gc, path, logOutput) } // NewStateStorePath is used to create a new state store at a given path // The path is cleared on closing. -func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) { +func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { // Open the env env, err := mdb.NewEnv() if err != nil { @@ -126,6 +132,7 @@ func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) { env: env, watch: make(map[*MDBTable]*NotifyGroup), lockDelay: make(map[string]time.Time), + gc: gc, } // Ensure we can initialize @@ -283,6 +290,29 @@ func (s *StateStore) initialize() error { }, } + s.tombstoneTable = &MDBTable{ + Name: dbTombstone, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"Key"}, + IdxFunc: DefaultIndexPrefixFunc, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.DirEntry) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + s.sessionTable = &MDBTable{ Name: dbSessions, Indexes: map[string]*MDBIndex{ @@ -340,7 +370,8 @@ func (s *StateStore) initialize() error { // Store the set of tables s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, - s.kvsTable, s.sessionTable, s.sessionCheckTable, s.aclTable} + s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable, + s.aclTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder @@ -1084,18 +1115,45 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { } // KVSList is used to list all KV entries with a prefix -func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { - idx, res, err := s.kvsTable.Get("id_prefix", prefix) +func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) { + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) + if err != nil { + return 0, 0, nil, err + } + defer tx.Abort() + + idx, err := tables.LastIndexTxn(tx) + if err != nil { + return 0, 0, nil, err + } + + res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix) + if err != nil { + return 0, 0, nil, err + } ents := make(structs.DirEntries, len(res)) for idx, r := range res { ents[idx] = r.(*structs.DirEntry) } - return idx, ents, err + + // Check for the higest index in the tombstone table + var maxIndex uint64 + res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix) + for _, r := range res { + ent := r.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } + + return maxIndex, idx, ents, err } // KVSListKeys is used to list keys with a prefix, and up to a given seperator func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { - tx, err := s.kvsTable.StartTxn(true, nil) + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) if err != nil { return 0, nil, err } @@ -1115,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er // Aggregate the stream stream := make(chan interface{}, 128) + streamTomb := make(chan interface{}, 128) done := make(chan struct{}) var keys []string var maxIndex uint64 @@ -1148,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er keys = append(keys, ent.Key) } } + + // Handle the tombstones for any index updates + for raw := range streamTomb { + ent := raw.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } close(done) }() // Start the stream, and wait for completion - err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) + if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } + if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } <-done // Use the maxIndex if we have any keys if maxIndex != 0 { idx = maxIndex } - return idx, keys, err + return idx, keys, nil } // KVSDelete is used to delete a KVS entry @@ -1177,25 +1249,64 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { // kvsDeleteWithIndex does a delete with either the id or id_prefix func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { - // Start a new txn - tx, err := s.kvsTable.StartTxn(false, nil) + tx, err := s.tables.StartTxn(false) if err != nil { return err } defer tx.Abort() - - num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) - if err != nil { + if err := s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...); err != nil { return err } + return tx.Commit() +} + +// kvsDeleteWithIndexTxn does a delete within an existing transaction +func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error { + num := 0 + for { + // Get some number of entries to delete + pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...) + if err != nil { + return err + } + + // Create the tombstones and delete + for _, raw := range pairs { + ent := raw.(*structs.DirEntry) + ent.ModifyIndex = index // Update the index + ent.Value = nil // Reduce storage required + ent.Session = "" + if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { + return err + } + if num, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { + return err + } else if num != 1 { + return fmt.Errorf("Failed to delete key '%s'", ent.Key) + } + } + + // Increment the total number + num += len(pairs) + if len(pairs) == 0 { + break + } + } if num > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + tx.Defer(func() { + s.watch[s.kvsTable].Notify() + if s.gc != nil { + // If GC is configured, then we hint that this index + // required expiration. + s.gc.Hint(index) + } + }) } - return tx.Commit() + return nil } // KVSCheckAndSet is used to perform an atomic check-and-set @@ -1319,6 +1430,72 @@ func (s *StateStore) kvsSet( return true, tx.Commit() } +// ReapTombstones is used to delete all the tombstones with a ModifyTime +// less than or equal to the given index. This is used to prevent unbounded +// storage growth of the tombstones. +func (s *StateStore) ReapTombstones(index uint64) error { + tx, err := s.tombstoneTable.StartTxn(false, nil) + if err != nil { + return fmt.Errorf("failed to start txn: %v", err) + } + defer tx.Abort() + + // Scan the tombstone table for all the entries that are + // eligble for GC. This could be improved by indexing on + // ModifyTime and doing a less-than-equals scan, however + // we don't currently support numeric indexes internally. + // Luckily, this is a low frequency operation. + var toDelete []string + streamCh := make(chan interface{}, 128) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for raw := range streamCh { + ent := raw.(*structs.DirEntry) + if ent.ModifyIndex <= index { + toDelete = append(toDelete, ent.Key) + } + } + }() + if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil { + s.logger.Printf("[ERR] consul.state: failed to scan tombstones: %v", err) + return fmt.Errorf("failed to scan tombstones: %v", err) + } + <-doneCh + + // Delete each tombstone + if len(toDelete) > 0 { + s.logger.Printf("[DEBUG] consul.state: reaping %d tombstones up to %d", len(toDelete), index) + } + for _, key := range toDelete { + num, err := s.tombstoneTable.DeleteTxn(tx, "id", key) + if err != nil { + s.logger.Printf("[ERR] consul.state: failed to delete tombstone: %v", err) + return fmt.Errorf("failed to delete tombstone: %v", err) + } + if num != 1 { + return fmt.Errorf("failed to delete tombstone '%s'", key) + } + } + return tx.Commit() +} + +// TombstoneRestore is used to restore a tombstone. +// It should only be used when doing a restore. +func (s *StateStore) TombstoneRestore(d *structs.DirEntry) error { + // Start a new txn + tx, err := s.tombstoneTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if err := s.tombstoneTable.InsertTxn(tx, d); err != nil { + return err + } + return tx.Commit() +} + // SessionCreate is used to create a new session. The // ID will be populated on a successful return func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { @@ -1537,7 +1714,7 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro if session.Behavior == structs.SessionKeysDelete { // delete the keys held by the session - if err := s.deleteKeys(index, tx, id); err != nil { + if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil { return err } @@ -1618,23 +1795,6 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, return nil } -// deleteKeys is used to delete all the keys created by a session -// within a given txn. All tables should be locked in the tx. -func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error { - num, err := s.kvsTable.DeleteTxn(tx, "session", id) - if err != nil { - return err - } - - if num > 0 { - if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { - return err - } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) - } - return nil -} - // ACLSet is used to create or update an ACL entry func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { // Check for an ID @@ -1802,6 +1962,13 @@ func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { return s.store.kvsTable.StreamTxn(stream, s.tx, "id") } +// TombstoneDump is used to dump all tombstone entries. It takes a channel and streams +// back *struct.DirEntry objects. This will block and should be invoked +// in a goroutine. +func (s *StateSnapshot) TombstoneDump(stream chan<- interface{}) error { + return s.store.tombstoneTable.StreamTxn(stream, s.tx, "id") +} + // SessionList is used to list all the open sessions func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { res, err := s.store.sessionTable.GetTxn(s.tx, "id") diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 888b4a343249..c115939f0417 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -11,7 +11,7 @@ import ( ) func testStateStore() (*StateStore, error) { - return NewStateStore(os.Stderr) + return NewStateStore(nil, os.Stderr) } func TestEnsureRegistration(t *testing.T) { @@ -688,23 +688,32 @@ func TestStoreSnapshot(t *testing.T) { if err := store.KVSSet(15, d); err != nil { t.Fatalf("err: %v", err) } + d = &structs.DirEntry{Key: "/web/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(16, d); err != nil { + t.Fatalf("err: %v", err) + } + // Create a tombstone + // TODO: Change to /web/c causes failure? + if err := store.KVSDelete(17, "/web/a"); err != nil { + t.Fatalf("err: %v", err) + } // Add some sessions session := &structs.Session{ID: generateUUID(), Node: "foo"} - if err := store.SessionCreate(16, session); err != nil { + if err := store.SessionCreate(18, session); err != nil { t.Fatalf("err: %v", err) } session = &structs.Session{ID: generateUUID(), Node: "bar"} - if err := store.SessionCreate(17, session); err != nil { + if err := store.SessionCreate(19, session); err != nil { t.Fatalf("err: %v", err) } d.Session = session.ID - if ok, err := store.KVSLock(18, d); err != nil || !ok { + if ok, err := store.KVSLock(20, d); err != nil || !ok { t.Fatalf("err: %v", err) } session = &structs.Session{ID: generateUUID(), Node: "bar", TTL: "60s"} - if err := store.SessionCreate(19, session); err != nil { + if err := store.SessionCreate(21, session); err != nil { t.Fatalf("err: %v", err) } @@ -713,7 +722,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(20, a1); err != nil { + if err := store.ACLSet(21, a1); err != nil { t.Fatalf("err: %v", err) } @@ -722,7 +731,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(21, a2); err != nil { + if err := store.ACLSet(22, a2); err != nil { t.Fatalf("err: %v", err) } @@ -734,7 +743,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 21 { + if idx := snap.LastIndex(); idx != 22 { t.Fatalf("bad: %v", idx) } @@ -786,7 +795,29 @@ func TestStoreSnapshot(t *testing.T) { } <-doneCh if len(ents) != 2 { - t.Fatalf("missing KVS entries!") + t.Fatalf("missing KVS entries! %#v", ents) + } + + // Check we have the tombstone entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.TombstoneDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 1 { + t.Fatalf("missing tombstone entries!") } // Check there are 3 sessions @@ -818,13 +849,13 @@ func TestStoreSnapshot(t *testing.T) { } // Make some changes! - if err := store.EnsureService(22, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { + if err := store.EnsureService(23, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService(23, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { + if err := store.EnsureService(24, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode(24, structs.Node{"baz", "127.0.0.3"}); err != nil { + if err := store.EnsureNode(25, structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } checkAfter := &structs.HealthCheck{ @@ -834,16 +865,16 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthCritical, ServiceID: "db", } - if err := store.EnsureCheck(26, checkAfter); err != nil { + if err := store.EnsureCheck(27, checkAfter); err != nil { t.Fatalf("err: %v", err) } - if err := store.KVSDelete(26, "/web/b"); err != nil { + if err := store.KVSDelete(28, "/web/b"); err != nil { t.Fatalf("err: %v", err) } // Nuke an ACL - if err := store.ACLDelete(27, a1.ID); err != nil { + if err := store.ACLDelete(29, a1.ID); err != nil { t.Fatalf("err: %v", err) } @@ -897,6 +928,28 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing KVS entries!") } + // Check we have the tombstone entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.TombstoneDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 1 { + t.Fatalf("missing tombstone entries!") + } + // Check there are 3 sessions sessions, err = snap.SessionList() if err != nil { @@ -1413,6 +1466,15 @@ func TestKVSDelete(t *testing.T) { } defer store.Close() + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + gc.SetEnabled(true) + store.gc = gc + // Create the entry d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} if err := store.KVSSet(1000, d); err != nil { @@ -1435,6 +1497,25 @@ func TestKVSDelete(t *testing.T) { if d != nil { t.Fatalf("bad: %v", d) } + + // Check tombstone exists + _, res, err := store.tombstoneTable.Get("id", "/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if res == nil || res[0].(*structs.DirEntry).ModifyIndex != 1020 { + t.Fatalf("bad: %#v", d) + } + + // Check that we get a delete + select { + case idx := <-gc.ExpireCh(): + if idx != 1020 { + t.Fatalf("bad %d", idx) + } + case <-time.After(20 * time.Millisecond): + t.Fatalf("should expire") + } } func TestKVSCheckAndSet(t *testing.T) { @@ -1508,7 +1589,7 @@ func TestKVS_List(t *testing.T) { defer store.Close() // Should not exist - idx, ents, err := store.KVSList("/web") + _, idx, ents, err := store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } @@ -1534,7 +1615,7 @@ func TestKVS_List(t *testing.T) { } // Should list - idx, ents, err = store.KVSList("/web") + _, idx, ents, err = store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } @@ -1556,6 +1637,55 @@ func TestKVS_List(t *testing.T) { } } +func TestKVSList_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/c", Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the last node + err = store.KVSDeleteTree(1003, "/web/c") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Add another node + d = &structs.DirEntry{Key: "/other", Value: []byte("test")} + if err := store.KVSSet(1004, d); err != nil { + t.Fatalf("err: %v", err) + } + + // List should properly reflect tombstoned value + tombIdx, idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if tombIdx != 1003 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 2 { + t.Fatalf("bad: %v", ents) + } +} + func TestKVS_ListKeys(t *testing.T) { store, err := testStateStore() if err != nil { @@ -1730,6 +1860,68 @@ func TestKVS_ListKeys_Index(t *testing.T) { } } +func TestKVS_ListKeys_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1003, d); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.KVSDelete(1004, "/baz/c"); err != nil { + t.Fatalf("err: %v", err) + } + + idx, keys, err := store.KVSListKeys("/foo", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/ba", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/nope", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 0 { + t.Fatalf("bad: %v", keys) + } +} + func TestKVSDeleteTree(t *testing.T) { store, err := testStateStore() if err != nil { @@ -1737,6 +1929,15 @@ func TestKVSDeleteTree(t *testing.T) { } defer store.Close() + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + gc.SetEnabled(true) + store.gc = gc + // Should not exist err = store.KVSDeleteTree(1000, "/web") if err != nil { @@ -1764,16 +1965,134 @@ func TestKVSDeleteTree(t *testing.T) { } // Nothing should list - idx, ents, err := store.KVSList("/web") + tombIdx, idx, ents, err := store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } if idx != 1010 { t.Fatalf("bad: %v", idx) } + if tombIdx != 1010 { + t.Fatalf("bad: %v", idx) + } if len(ents) != 0 { t.Fatalf("bad: %v", ents) } + + // Check tombstones exists + _, res, err := store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 3 { + t.Fatalf("bad: %#v", d) + } + for _, r := range res { + if r.(*structs.DirEntry).ModifyIndex != 1010 { + t.Fatalf("bad: %#v", r) + } + } + + // Check that we get a delete + select { + case idx := <-gc.ExpireCh(): + if idx != 1010 { + t.Fatalf("bad %d", idx) + } + case <-time.After(20 * time.Millisecond): + t.Fatalf("should expire") + } +} + +func TestReapTombstones(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + gc.SetEnabled(true) + store.gc = gc + + // Should not exist + err = store.KVSDeleteTree(1000, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke just a + err = store.KVSDelete(1010, "/web/a") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the web tree + err = store.KVSDeleteTree(1020, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Do a reap, should be a noop + if err := store.ReapTombstones(1000); err != nil { + t.Fatalf("err: %v", err) + } + + // Check tombstones exists + _, res, err := store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 3 { + t.Fatalf("bad: %#v", d) + } + + // Do a reap, should remove just /web/a + if err := store.ReapTombstones(1010); err != nil { + t.Fatalf("err: %v", err) + } + + // Check tombstones exists + _, res, err = store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 2 { + t.Fatalf("bad: %#v", d) + } + + // Do a reap, should remove them all + if err := store.ReapTombstones(1025); err != nil { + t.Fatalf("err: %v", err) + } + + // Check no tombstones exists + _, res, err = store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 0 { + t.Fatalf("bad: %#v", d) + } } func TestSessionCreate(t *testing.T) { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 2072780f3696..4b9263c54b97 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -23,6 +23,7 @@ const ( KVSRequestType SessionRequestType ACLRequestType + TombstoneRequestType ) const ( @@ -531,6 +532,24 @@ type EventFireResponse struct { QueryMeta } +type TombstoneOp string + +const ( + TombstoneReap TombstoneOp = "reap" +) + +// TombstoneRequest is used to trigger a reaping of the tombstones +type TombstoneRequest struct { + Datacenter string + Op TombstoneOp + ReapIndex uint64 + WriteRequest +} + +func (r *TombstoneRequest) RequestDatacenter() string { + return r.Datacenter +} + // msgpackHandle is a shared handle for encoding/decoding of structs var msgpackHandle = &codec.MsgpackHandle{} diff --git a/consul/tombstone_gc.go b/consul/tombstone_gc.go new file mode 100644 index 000000000000..8a238409cdc2 --- /dev/null +++ b/consul/tombstone_gc.go @@ -0,0 +1,150 @@ +package consul + +import ( + "fmt" + "sync" + "time" +) + +// TombstoneGC is used to track creation of tombstones +// so that they can be garbage collected after their TTL +// expires. The tombstones allow queries to provide monotonic +// index values within the TTL window. The GC is used to +// prevent monotonic growth in storage usage. This is a trade off +// between the length of the TTL and the storage overhead. +// +// In practice, this is required to fix the issue of delete +// visibility. When data is deleted from the KV store, the +// "latest" row can go backwards if the newest row is removed. +// The tombstones provide a way to ensure time doesn't move +// backwards within some interval. +// +type TombstoneGC struct { + ttl time.Duration + granularity time.Duration + + // enabled controls if we actually setup any timers. + enabled bool + + // expires maps the time of expiration to the highest + // tombstone value that should be expired. + expires map[time.Time]*expireInterval + + // expireCh is used to stream expiration + expireCh chan uint64 + + // lock is used to ensure safe access to all the fields + lock sync.Mutex +} + +// expireInterval is used to track the maximum index +// to expire in a given interval with a timer +type expireInterval struct { + maxIndex uint64 + timer *time.Timer +} + +// NewTombstoneGC is used to construct a new TombstoneGC given +// a TTL for tombstones and a tracking granularity. Longer TTLs +// ensure correct behavior for more time, but use more storage. +// A shorter granularity increases the number of Raft transactions +// and reduce how far past the TTL we perform GC. +func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) { + // Sanity check the inputs + if ttl <= 0 || granularity <= 0 { + return nil, fmt.Errorf("Tombstone TTL and granularity must be positive") + } + + t := &TombstoneGC{ + ttl: ttl, + granularity: granularity, + enabled: false, + expires: make(map[time.Time]*expireInterval), + expireCh: make(chan uint64, 1), + } + return t, nil +} + +// ExpireCh is used to return a channel that streams the next index +// that should be expired +func (t *TombstoneGC) ExpireCh() <-chan uint64 { + return t.expireCh +} + +// SetEnabled is used to control if the tombstone GC is +// enabled. Should only be enabled by the leader node. +func (t *TombstoneGC) SetEnabled(enabled bool) { + t.lock.Lock() + defer t.lock.Unlock() + if enabled == t.enabled { + return + } + + // Stop all the timers and clear + if !enabled { + for _, exp := range t.expires { + exp.timer.Stop() + } + t.expires = make(map[time.Time]*expireInterval) + } + + // Update the status + t.enabled = enabled +} + +// Hint is used to indicate that keys at the given index have been +// deleted, and that their GC should be scheduled. +func (t *TombstoneGC) Hint(index uint64) { + expires := t.nextExpires() + + t.lock.Lock() + defer t.lock.Unlock() + if !t.enabled { + return + } + + // Check for an existing expiration timer + exp, ok := t.expires[expires] + if ok { + // Increment the highest index to be expired at that time + if index > exp.maxIndex { + exp.maxIndex = index + } + return + } + + // Create new expiration time + t.expires[expires] = &expireInterval{ + maxIndex: index, + timer: time.AfterFunc(expires.Sub(time.Now()), func() { + t.expireTime(expires) + }), + } +} + +// PendingExpiration is used to check if any expirations are pending +func (t *TombstoneGC) PendingExpiration() bool { + t.lock.Lock() + defer t.lock.Unlock() + return len(t.expires) > 0 +} + +// nextExpires is used to calculate the next experation time +func (t *TombstoneGC) nextExpires() time.Time { + expires := time.Now().Add(t.ttl) + remain := expires.UnixNano() % int64(t.granularity) + adj := expires.Add(t.granularity - time.Duration(remain)) + return adj +} + +// expireTime is used to expire the entries at the given time +func (t *TombstoneGC) expireTime(expires time.Time) { + // Get the maximum index and clear the entry + t.lock.Lock() + exp := t.expires[expires] + delete(t.expires, expires) + t.lock.Unlock() + + // Notify the expires channel + t.expireCh <- exp.maxIndex +} diff --git a/consul/tombstone_gc_test.go b/consul/tombstone_gc_test.go new file mode 100644 index 000000000000..a9014fa1019a --- /dev/null +++ b/consul/tombstone_gc_test.go @@ -0,0 +1,104 @@ +package consul + +import ( + "testing" + "time" +) + +func TestTombstoneGC_invalid(t *testing.T) { + _, err := NewTombstoneGC(0, 0) + if err == nil { + t.Fatalf("should fail") + } + + _, err = NewTombstoneGC(time.Second, 0) + if err == nil { + t.Fatalf("should fail") + } + + _, err = NewTombstoneGC(0, time.Second) + if err == nil { + t.Fatalf("should fail") + } +} + +func TestTombstoneGC(t *testing.T) { + ttl := 20 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("should fail") + } + gc.SetEnabled(true) + + if gc.PendingExpiration() { + t.Fatalf("should not be pending") + } + + start := time.Now() + gc.Hint(100) + + time.Sleep(2 * gran) + start2 := time.Now() + gc.Hint(120) + gc.Hint(125) + + if !gc.PendingExpiration() { + t.Fatalf("should be pending") + } + + select { + case index := <-gc.ExpireCh(): + end := time.Now() + if end.Sub(start) < ttl { + t.Fatalf("expired early") + } + if index != 100 { + t.Fatalf("bad index: %d", index) + } + + case <-time.After(ttl * 2): + t.Fatalf("should get expiration") + } + + select { + case index := <-gc.ExpireCh(): + end := time.Now() + if end.Sub(start2) < ttl { + t.Fatalf("expired early") + } + if index != 125 { + t.Fatalf("bad index: %d", index) + } + + case <-time.After(ttl * 2): + t.Fatalf("should get expiration") + } +} + +func TestTombstoneGC_Expire(t *testing.T) { + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("should fail") + } + gc.SetEnabled(true) + + if gc.PendingExpiration() { + t.Fatalf("should not be pending") + } + + gc.Hint(100) + gc.SetEnabled(false) + + if gc.PendingExpiration() { + t.Fatalf("should not be pending") + } + + select { + case <-gc.ExpireCh(): + t.Fatalf("shoudl be reset") + case <-time.After(20 * time.Millisecond): + } +}