Skip to content

Commit

Permalink
Merge pull request #8333 from fanminshi/retrieve_keep_from_index
Browse files Browse the repository at this point in the history
mvcc: fix TestHashKVWhenCompacting hash mismatch
  • Loading branch information
Anthony Romano authored Aug 1, 2017
2 parents e0843c6 + df5a3d1 commit 585b1d7
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 55 deletions.
14 changes: 14 additions & 0 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type index interface {
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Equal(b index) bool

Insert(ki *keyIndex)
Expand Down Expand Up @@ -179,6 +180,19 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
return available
}

// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.keep(rev, available)
return true
})
return available
}

func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
Expand Down
16 changes: 11 additions & 5 deletions mvcc/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestIndexRangeSince(t *testing.T) {
}
}

func TestIndexCompact(t *testing.T) {
func TestIndexCompactAndKeep(t *testing.T) {
maxRev := int64(20)
tests := []struct {
key []byte
Expand All @@ -215,7 +215,7 @@ func TestIndexCompact(t *testing.T) {
{[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1},
}

// Continuous Compact
// Continuous Compact and Keep
ti := newTreeIndex()
for _, tt := range tests {
if tt.remove {
Expand All @@ -226,7 +226,10 @@ func TestIndexCompact(t *testing.T) {
}
for i := int64(1); i < maxRev; i++ {
am := ti.Compact(i)

keep := ti.Keep(i)
if !(reflect.DeepEqual(am, keep)) {
t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
}
wti := &treeIndex{tree: btree.New(32)}
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
Expand All @@ -242,7 +245,7 @@ func TestIndexCompact(t *testing.T) {
}
}

// Once Compact
// Once Compact and Keep
for i := int64(1); i < maxRev; i++ {
ti := newTreeIndex()
for _, tt := range tests {
Expand All @@ -253,7 +256,10 @@ func TestIndexCompact(t *testing.T) {
}
}
am := ti.Compact(i)

keep := ti.Keep(i)
if !(reflect.DeepEqual(am, keep)) {
t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
}
wti := &treeIndex{tree: btree.New(32)}
for _, tt := range tests {
if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
Expand Down
61 changes: 43 additions & 18 deletions mvcc/key_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,42 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}

genIdx, revIndex := ki.doCompact(atRev, available)

g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove the previous contents.
if revIndex != -1 {
g.revs = g.revs[revIndex:]
}
// remove any tombstone
if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[0])
genIdx++
}
}

// remove the previous generations.
ki.generations = ki.generations[genIdx:]
}

// keep finds the revision to be kept if compact is called at given atRev.
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
return
}

genIdx, revIndex := ki.doCompact(atRev, available)
g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove any tombstone
if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[revIndex])
}
}
}

func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision that has an revision smaller or equal to
// the atRev.
// add it to the available map
Expand All @@ -198,30 +234,19 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
return true
}

i, g := 0, &ki.generations[0]
genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for i < len(ki.generations)-1 {
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
i++
g = &ki.generations[i]
genIdx++
g = &ki.generations[genIdx]
}

if !g.isEmpty() {
n := g.walk(f)
// remove the previous contents.
if n != -1 {
g.revs = g.revs[n:]
}
// remove any tombstone
if len(g.revs) == 1 && i != len(ki.generations)-1 {
delete(available, g.revs[0])
i++
}
}
// remove the previous generations.
ki.generations = ki.generations[i:]
revIndex = g.walk(f)

return genIdx, revIndex
}

func (ki *keyIndex) isEmpty() bool {
Expand Down
52 changes: 48 additions & 4 deletions mvcc/key_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestKeyIndexTombstone(t *testing.T) {
}
}

func TestKeyIndexCompact(t *testing.T) {
func TestKeyIndexCompactAndKeep(t *testing.T) {
tests := []struct {
compact int64

Expand Down Expand Up @@ -441,10 +441,19 @@ func TestKeyIndexCompact(t *testing.T) {
},
}

// Continuous Compaction
// Continuous Compaction and finding Keep
ki := newTestKeyIndex()
for i, tt := range tests {
am := make(map[revision]struct{})
kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
}
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
Expand All @@ -454,11 +463,20 @@ func TestKeyIndexCompact(t *testing.T) {
}
}

// Jump Compaction
// Jump Compaction and finding Keep
ki = newTestKeyIndex()
for i, tt := range tests {
if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) {
am := make(map[revision]struct{})
kiclone := cloneKeyIndex(ki)
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiclone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
}
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
Expand All @@ -469,10 +487,19 @@ func TestKeyIndexCompact(t *testing.T) {
}
}

// Once Compaction
kiClone := newTestKeyIndex()
// Once Compaction and finding Keep
for i, tt := range tests {
ki := newTestKeyIndex()
am := make(map[revision]struct{})
ki.keep(tt.compact, am)
if !reflect.DeepEqual(ki, kiClone) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone)
}
if !reflect.DeepEqual(am, tt.wam) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
Expand All @@ -483,6 +510,23 @@ func TestKeyIndexCompact(t *testing.T) {
}
}

func cloneKeyIndex(ki *keyIndex) *keyIndex {
generations := make([]generation, len(ki.generations))
for i, gen := range ki.generations {
generations[i] = *cloneGeneration(&gen)
}
return &keyIndex{ki.key, ki.modified, generations}
}

func cloneGeneration(g *generation) *generation {
if g.revs == nil {
return &generation{g.ver, g.created, nil}
}
tmp := make([]revision, len(g.revs))
copy(tmp, g.revs)
return &generation{g.ver, g.created, tmp}
}

// test that compact on version that higher than last modified version works well
func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
Expand Down
32 changes: 5 additions & 27 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ var (
ErrClosed = errors.New("mvcc: closed")

plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")

emptyKeep = make(map[revision]struct{})
)

const (
Expand Down Expand Up @@ -101,12 +99,6 @@ type store struct {
fifoSched schedule.Scheduler

stopc chan struct{}

// keepMu protects keep
keepMu sync.RWMutex
// keep contains all revisions <= compactMainRev to be kept for the
// ongoing compaction; nil otherwise.
keep map[revision]struct{}
}

// NewStore returns a new store. It is useful to create a store inside
Expand Down Expand Up @@ -170,33 +162,25 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
}

func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
s.mu.Lock()
s.mu.RLock()
s.revMu.RLock()
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()

if rev > 0 && rev <= compactRev {
s.mu.Unlock()
s.mu.RUnlock()
return 0, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
s.mu.Unlock()
s.mu.RUnlock()
return 0, currentRev, 0, ErrFutureRev
}

s.keepMu.Lock()
if s.keep == nil {
// ForceCommit ensures that txnRead begins after backend
// has committed all the changes from the prev completed compaction.
s.b.ForceCommit()
s.keep = emptyKeep
}
keep := s.keep
s.keepMu.Unlock()
keep := s.kvindex.Keep(rev)

tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
s.mu.Unlock()
s.mu.RUnlock()

if rev == 0 {
rev = currentRev
Expand Down Expand Up @@ -257,9 +241,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.b.ForceCommit()

keep := s.kvindex.Compact(rev)
s.keepMu.Lock()
s.keep = keep
s.keepMu.Unlock()
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -271,9 +252,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
return
}
close(ch)
s.keepMu.Lock()
s.keep = nil
s.keepMu.Unlock()
}

s.fifoSched.Schedule(j)
Expand Down
6 changes: 5 additions & 1 deletion mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
s := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

rev := 1000
rev := 10000
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
Expand Down Expand Up @@ -767,6 +767,10 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Equal(b index) bool { return false }

func (i *fakeIndex) Insert(ki *keyIndex) {
Expand Down

0 comments on commit 585b1d7

Please sign in to comment.