Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding KV tombstones to fix non-monotonic index on deletes #577

Merged
merged 30 commits into from
Jan 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1d0aba2
consul: Adding TombstoneGC config
armon Dec 1, 2014
139568b
consul: Create tombstones before key deletes
armon Dec 1, 2014
a5ddae0
consul: Rename TombstoneGC to TombstoneTTL
armon Dec 11, 2014
b54aa8a
consul: Adding TombstoneGC to track TTLs
armon Dec 11, 2014
99e2416
consul: Support reset of tombstone GC
armon Dec 11, 2014
b79be04
consul: Adding GetTxnLimit to MDBTable
armon Dec 11, 2014
019d511
consul: Fixing tombstone creation and hinting of GC
armon Dec 11, 2014
103112b
consul: Fixing accidental commit of transaction
armon Dec 11, 2014
10604a6
consul: Thread Tombstone GC through
armon Dec 11, 2014
3bcf957
consul: Adding PendingExpiration
armon Dec 15, 2014
1120e6f
consul: Leader should reset the tombstone GC clock
armon Dec 15, 2014
d4b1f36
consul: Adding new request to reap tombstones
armon Dec 15, 2014
76e5237
consul: Generate a raft operation to reap tombstones
armon Dec 15, 2014
bcb10cf
consul: TombstoneReapRequestType -> TombstoneRequestType
armon Dec 15, 2014
bf74361
consul: First pass at tombstone reaping
armon Dec 15, 2014
4492ad0
consul: Persist tombstones
armon Dec 15, 2014
089b765
consul: Test tombstone creation
armon Dec 18, 2014
a4dad44
consul: Testing tombstone reaping
armon Dec 18, 2014
2a388aa
consul: Testing tombstone snapshot
armon Dec 18, 2014
c22f172
consul: Test FSM restore of tombstones
armon Dec 18, 2014
55ac84c
consul: Test FSM Reap operations
armon Dec 18, 2014
e02e8a4
consul: Testing leader issue of reap command
armon Dec 19, 2014
eb2df41
consul: Mesure time for reapTombstones
armon Dec 19, 2014
b429264
consul: Ensure KVS List handles tombstones
armon Dec 19, 2014
e7abf17
consul: List Keys should handle tombstones
armon Dec 19, 2014
62d6fe5
consul: Reverting some index compute logic
armon Dec 19, 2014
ce6cbab
consul: Improve log message
armon Dec 19, 2014
d842b6d
consul: Disable tombstones as follower
armon Jan 5, 2015
3d45d76
consul: Fixing the KVS tests
armon Jan 5, 2015
73db8cd
consul: Adding more useful metrics
armon Jan 5, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ type Config struct {
// "allow" can be used to allow all requests. This is not recommended.
ACLDownPolicy string

// TombstoneTTL is used to control how long KV tombstones are retained.
// This provides a window of time where the X-Consul-Index is monotonic.
// Outside this window, the index may not be monotonic. This is a result
// of a few trade offs:
// 1) The index is defined by the data view and not globally. This is a
// performance optimization that prevents any write from incrementing the
// index for all data views.
// 2) Tombstones are not kept indefinitely, since otherwise storage required
// is also monotonic. This prevents deletes from reducing the disk space
// used.
// In theory, neither of these are intrinsic limitations, however for the
// purposes of building a practical system, they are reaonable trade offs.
//
// It is also possible to set this to an incredibly long time, thereby
// simulating infinite retention. This is not recommended however.
//
TombstoneTTL time.Duration

// TombstoneTTLGranularity is used to control how granular the timers are
// for the Tombstone GC. This is used to batch the GC of many keys together
// to reduce overhead. It is unlikely a user would ever need to tune this.
TombstoneTTLGranularity time.Duration

// ServerUp callback can be used to trigger a notification that
// a Consul server is now up and known about.
ServerUp func()
Expand Down Expand Up @@ -205,17 +228,19 @@ func DefaultConfig() *Config {
}

conf := &Config{
Datacenter: DefaultDC,
NodeName: hostname,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: serf.DefaultConfig(),
SerfWANConfig: serf.DefaultConfig(),
ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersionMax,
ACLTTL: 30 * time.Second,
ACLDefaultPolicy: "allow",
ACLDownPolicy: "extend-cache",
Datacenter: DefaultDC,
NodeName: hostname,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: serf.DefaultConfig(),
SerfWANConfig: serf.DefaultConfig(),
ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersionMax,
ACLTTL: 30 * time.Second,
ACLDefaultPolicy: "allow",
ACLDownPolicy: "extend-cache",
TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second,
}

// Increase our reap interval to 3 days instead of 24h.
Expand Down
73 changes: 70 additions & 3 deletions consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"
Expand All @@ -21,6 +22,7 @@ type consulFSM struct {
logger *log.Logger
path string
state *StateStore
gc *TombstoneGC
}

// consulSnapshot is used to provide a snapshot of the current
Expand All @@ -38,15 +40,15 @@ type snapshotHeader struct {
}

// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
// Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(path, "state")
if err != nil {
return nil, err
}

// Create a state store
state, err := NewStateStorePath(tmpPath, logOutput)
state, err := NewStateStorePath(gc, tmpPath, logOutput)
if err != nil {
return nil, err
}
Expand All @@ -56,6 +58,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
logger: log.New(logOutput, "", log.LstdFlags),
path: path,
state: state,
gc: gc,
}
return fsm, nil
}
Expand Down Expand Up @@ -83,6 +86,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applySessionOperation(buf[1:], log.Index)
case structs.ACLRequestType:
return c.applyACLOperation(buf[1:], log.Index)
case structs.TombstoneRequestType:
return c.applyTombstoneOperation(buf[1:], log.Index)
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
Expand All @@ -97,6 +102,7 @@ func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
}

func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
// Apply all updates in a single transaction
if err := c.state.EnsureRegistration(index, req); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err)
Expand All @@ -106,6 +112,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) in
}

func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
Expand Down Expand Up @@ -136,6 +143,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now())
switch req.Op {
case structs.KVSSet:
return c.state.KVSSet(index, &req.DirEnt)
Expand Down Expand Up @@ -176,6 +184,7 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now())
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
Expand All @@ -196,6 +205,7 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now())
switch req.Op {
case structs.ACLForceSet:
fallthrough
Expand All @@ -213,6 +223,21 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
}
}

func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
var req structs.TombstoneRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now())
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
}
}

func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
Expand All @@ -236,7 +261,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}

// Create a new state store
state, err := NewStateStorePath(tmpPath, c.logOutput)
state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
if err != nil {
return err
}
Expand Down Expand Up @@ -299,6 +324,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err
}

case structs.TombstoneRequestType:
var req structs.DirEntry
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.TombstoneRestore(&req); err != nil {
return err
}

default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
Expand All @@ -308,6 +342,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}

func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
// Register the nodes
encoder := codec.NewEncoder(sink, msgpackHandle)

Expand Down Expand Up @@ -339,6 +374,11 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}

if err := s.persistTombstones(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -444,6 +484,33 @@ func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
}
}

func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.TombstoneDump(streamCh); err != nil {
errorCh <- err
}
}()

for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
sink.Write([]byte{byte(structs.TombstoneRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}

case err := <-errorCh:
return err
}
}
}

func (s *consulSnapshot) Release() {
s.state.Close()
}
Loading