From a400d43df7832c8c529b4f8461e65d53fe8bc707 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Tue, 30 Apr 2019 10:53:41 +0530 Subject: [PATCH 1/7] resolve inconsistencies detected during ring merge Fixes #1962 --- ipam/ring/ring.go | 59 ++++++++++++++++++++++++++++++---------- ipam/ring/ring_test.go | 62 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index bdbd8d8c41..30edfd0076 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -238,7 +238,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) { return false, ErrDifferentRange } - result, updated, err := r.Entries.merge(gossip.Entries, r.Peer) + result, updated, err := r.Entries.merge(gossip.Entries, r.Peer, r) if err != nil { return false, err @@ -260,7 +260,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) { // entries belonging to ourPeer. Returns the merged entries and an // indication whether the merge resulted in any changes, i.e. the // result differs from the original. -func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, updated bool, err error) { +func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result entries, updated bool, err error) { var mine, theirs *entry var previousOwner *mesh.PeerName addToResult := func(e entry) { result = append(result, &e) } @@ -270,6 +270,14 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, u previousOwner = nil } + checkEntryHasAllocations := func(current, next *entry) bool { + size := r.distance(current.Token, next.Token) + if current.Free < size { + return true + } + return false + } + // i is index into es; j is index into other var i, j int for i < len(es) && j < len(other) { @@ -282,29 +290,52 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, u case mine.Token > theirs.Token: // insert, checking that a range owned by us hasn't been split if previousOwner != nil && *previousOwner == ourPeer && theirs.Peer != ourPeer { - err = errEntryInMyRange(theirs) - return + // check we have no allocations in the range that got split + if checkEntryHasAllocations(es.entry(i-1), mine) { + err = errEntryInMyRange(theirs) + return + } } addTheirs(*theirs) j++ case mine.Token == theirs.Token: + common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version))) // merge switch { case mine.Version >= theirs.Version: if mine.Version == theirs.Version && !mine.Equal(theirs) { - err = errInconsistentEntry(mine, theirs) - return + if mine.Peer == ourPeer { + // if we own the entry and has allocations + if checkEntryHasAllocations(mine, es.entry(i+1)) { + err = errInconsistentEntry(mine, theirs) + return + } + } + // tie-break here, pick the entry with the highest free count + if mine.Free >= theirs.Free { + addToResult(*mine) + previousOwner = &mine.Peer + } else { + addTheirs(*theirs) + } + } else { + addToResult(*mine) + previousOwner = &mine.Peer } - addToResult(*mine) - previousOwner = &mine.Peer - common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version))) case mine.Version < theirs.Version: - if mine.Peer == ourPeer { // We shouldn't receive updates to our own tokens - err = errNewerVersion(mine, theirs) - return + if mine.Peer == ourPeer { + // We received update to our own tokens accept the received entry + // if either it belongs to a different peer and we do not have allocations + // in the range effectively given away, or it belongs to our own peer + if theirs.Peer != ourPeer && !checkEntryHasAllocations(mine, es.entry(i+1)) || theirs.Peer == ourPeer { + addTheirs(*theirs) + } else { + err = errNewerVersion(mine, theirs) + return + } + } else { + addTheirs(*theirs) } - addTheirs(*theirs) - common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version))) } i++ j++ diff --git a/ipam/ring/ring_test.go b/ipam/ring/ring_test.go index ca95744d48..d80ec2db53 100644 --- a/ipam/ring/ring_test.go +++ b/ipam/ring/ring_test.go @@ -219,6 +219,61 @@ func TestMergeSimple(t *testing.T) { require.Equal(t, ring2.Entries, ring1.Entries) } +func TestMergeWithConflicts(t *testing.T) { + + // received update to entry with a token and version identical to an entry we have + // but but holding different content, if there are no allocations, resolve the + // conflict by picking entry one with high free count + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) + ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256, Version: 1}} + ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}} + require.NoError(t, merge(ring1, ring2)) + + // received update to entry with a token and version identical to an entry we have + // but but holding different content, if there are allocations, then reject the recieved + // entry + ring1 = NewRing(start, end, peer1name) + ring2 = NewRing(start, end, peer2name) + ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128, Version: 1}} + ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}} + require.Error(t, merge(ring1, ring2), "Expected error") + + // received an entry with update to one of our own tokens and with new version, + // accept the received entry if its still going to belong to us + ring1 = NewRing(start, end, peer1name) + ring2 = NewRing(start, end, peer2name) + ring1.Entries = []*entry{{Token: start, Peer: peer1name}} + ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}} + require.NoError(t, merge(ring1, ring2)) + + // received an entry with update to one of our own tokens and with new version, + // but belongs to a different peer, accept the received entry if we do not + // have allocations in the range + ring1 = NewRing(start, end, peer1name) + ring2 = NewRing(start, end, peer2name) + ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256}} + ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}} + require.NoError(t, merge(ring1, ring2)) + + // received an entry with update to one of our own tokens and with new version, + // reject received entry if we have allocations in the range + ring1 = NewRing(start, end, peer1name) + ring2 = NewRing(start, end, peer2name) + ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128}} + ring2.Entries = []*entry{{Token: start, Peer: peer2name, Version: 1}} + require.Error(t, merge(ring1, ring2), "Expected error") + + // we receive an entry that splits one of our ranges, giving some of it + // away to another peer. accept the entry provided that we have no allocations + // in the range that got given away. + ring1 = NewRing(start, end, peer1name) + ring2 = NewRing(start, end, peer2name) + ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256, Version: 1}} + ring2.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128, Version: 2}, {Token: middle, Peer: peer2name, Free: 128}} + require.NoError(t, merge(ring1, ring2)) +} + func TestMergeErrors(t *testing.T) { // Cannot Merge in an invalid ring ring1 := NewRing(start, end, peer1name) @@ -231,13 +286,6 @@ func TestMergeErrors(t *testing.T) { ring2.Entries = []*entry{} require.True(t, merge(ring1, ring2) == ErrDifferentRange, "Expected ErrDifferentRange") - // Cannot Merge newer version of entry I own - ring2 = NewRing(start, end, peer2name) - ring1.Entries = []*entry{{Token: start, Peer: peer1name}} - ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}} - fmt.Println(merge(ring1, ring2)) - require.Error(t, merge(ring1, ring2), "Expected error") - // Cannot Merge two entries with same version but different hosts ring1.Entries = []*entry{{Token: start, Peer: peer1name}} ring2.Entries = []*entry{{Token: start, Peer: peer2name}} From 11c55a3a07dbf59736f4783974aa7977ab085cb0 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Tue, 30 Apr 2019 10:57:41 +0530 Subject: [PATCH 2/7] fix lint --- ipam/ring/ring_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipam/ring/ring_test.go b/ipam/ring/ring_test.go index d80ec2db53..3a79869ca8 100644 --- a/ipam/ring/ring_test.go +++ b/ipam/ring/ring_test.go @@ -231,7 +231,7 @@ func TestMergeWithConflicts(t *testing.T) { require.NoError(t, merge(ring1, ring2)) // received update to entry with a token and version identical to an entry we have - // but but holding different content, if there are allocations, then reject the recieved + // but but holding different content, if there are allocations, then reject the received // entry ring1 = NewRing(start, end, peer1name) ring2 = NewRing(start, end, peer2name) From 89db74338e9476c7d58f7cbf8f0c04c408d21738 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Thu, 9 May 2019 16:05:22 +0530 Subject: [PATCH 3/7] during merge we receive update to an entry that ring owns, and has a newer version, set our version to the one received plus one, effectively imposing our existing entry --- ipam/ring/ring.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index 30edfd0076..945074afc9 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -326,9 +326,14 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e if mine.Peer == ourPeer { // We received update to our own tokens accept the received entry // if either it belongs to a different peer and we do not have allocations - // in the range effectively given away, or it belongs to our own peer - if theirs.Peer != ourPeer && !checkEntryHasAllocations(mine, es.entry(i+1)) || theirs.Peer == ourPeer { + // in the range effectively given away, or it belongs to our own peer, in + // which case we should set our version to the one received plus one, + // effectively imposing our existing entry. + if theirs.Peer != ourPeer && !checkEntryHasAllocations(mine, es.entry(i+1)) { addTheirs(*theirs) + } else if theirs.Peer == ourPeer { + mine.Version = theirs.Version + 1 + addToResult(*mine) } else { err = errNewerVersion(mine, theirs) return @@ -359,6 +364,16 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e addTheirs(*theirs) } + // reset the free space for entries if there is invalid free space + // due to accepting an unexpected update from the peers + for i := 0; i < len(result); i++ { + distance := r.distance(result.entry(i).Token, result.entry(i+1).Token) + if result.entry(i).Peer == r.Peer && result.entry(i).Free > distance { + // case that can arise when a range that we own that got split and had no allocations + result.entry(i).Free = distance + } + } + return } From 51d7f455c98b5689850658db0ec12ff3d00f5c59 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 10 May 2019 17:23:45 +0530 Subject: [PATCH 4/7] move code to reset the entry free space from entries.merge() to Ring.Merge() --- ipam/ring/ring.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index 945074afc9..2ab8291b3d 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -244,6 +244,16 @@ func (r *Ring) Merge(gossip Ring) (bool, error) { return false, err } + // reset the free space for entries if there is invalid free space + // due to accepting an unexpected update from the peers + for i := 0; i < len(result); i++ { + distance := r.distance(result.entry(i).Token, result.entry(i+1).Token) + if result.entry(i).Peer == r.Peer && result.entry(i).Free > distance { + // case that can arise when a range that we own that got split and had no allocations + result.entry(i).Free = distance + } + } + if err := r.checkEntries(result); err != nil { return false, fmt.Errorf("Merge of incoming data causes: %s", err) } @@ -364,16 +374,6 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e addTheirs(*theirs) } - // reset the free space for entries if there is invalid free space - // due to accepting an unexpected update from the peers - for i := 0; i < len(result); i++ { - distance := r.distance(result.entry(i).Token, result.entry(i+1).Token) - if result.entry(i).Peer == r.Peer && result.entry(i).Free > distance { - // case that can arise when a range that we own that got split and had no allocations - result.entry(i).Free = distance - } - } - return } From 2d912263b134470a5be281d8304dd6fef442f411 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Mon, 13 May 2019 12:43:19 +0530 Subject: [PATCH 5/7] add ring debug log during merge() --- ipam/ring/ring.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index 2ab8291b3d..bb293a99da 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -292,6 +292,7 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e var i, j int for i < len(es) && j < len(other) { mine, theirs = es[i], other[j] + common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge mine.Token=%s theirs.Token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, theirs.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version))) switch { case mine.Token < theirs.Token: addToResult(*mine) From 5e69e8a7b486e99bda6d27985644c5b44b13d717 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Mon, 13 May 2019 20:02:36 +0530 Subject: [PATCH 6/7] check for allocations in the range that got transffered when its split --- ipam/ring/ring.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index bb293a99da..b77bc37c35 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -302,7 +302,7 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e // insert, checking that a range owned by us hasn't been split if previousOwner != nil && *previousOwner == ourPeer && theirs.Peer != ourPeer { // check we have no allocations in the range that got split - if checkEntryHasAllocations(es.entry(i-1), mine) { + if checkEntryHasAllocations(theirs, mine) { err = errEntryInMyRange(theirs) return } From 90ccc4a014a3fada596a31366fd1b1a498573dfa Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 7 Jun 2019 18:12:35 +0530 Subject: [PATCH 7/7] use space to check if entry in a range has allocations while doing ring merge --- ipam/allocator.go | 6 +++++- ipam/ring/ring.go | 20 ++++++-------------- ipam/ring/ring_test.go | 20 +++++++++++++++++++- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/ipam/allocator.go b/ipam/allocator.go index bfff91f0fe..9a63339925 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -814,6 +814,10 @@ func (alloc *Allocator) sendRingUpdate(dest mesh.PeerName) { alloc.gossip.GossipUnicast(dest, msg) } +func (alloc *Allocator) checkRangeHasAllocations(r address.Range) bool { + return alloc.space.NumFreeAddressesInRange(r) != r.Size() +} + func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error { reader := bytes.NewReader(msg) decoder := gob.NewDecoder(reader) @@ -832,7 +836,7 @@ func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error { // If someone sent us a ring, merge it into ours. Note this will move us // out of the awaiting-consensus state if we didn't have a ring already. case data.Ring != nil: - updated, err := alloc.ring.Merge(*data.Ring) + updated, err := alloc.ring.Merge(*data.Ring, alloc.checkRangeHasAllocations) switch err { case nil: if updated { diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index b77bc37c35..aeaa548251 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -213,7 +213,7 @@ func (r *Ring) GrantRangeToHost(start, end address.Address, peer mesh.PeerName) // Merge the given ring into this ring and indicate whether this ring // got updated as a result. -func (r *Ring) Merge(gossip Ring) (bool, error) { +func (r *Ring) Merge(gossip Ring, checkRangeHasAllocations func(r address.Range) bool) (bool, error) { r.assertInvariants() defer r.trackUpdates()() @@ -238,7 +238,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) { return false, ErrDifferentRange } - result, updated, err := r.Entries.merge(gossip.Entries, r.Peer, r) + result, updated, err := r.Entries.merge(gossip.Entries, r.Peer, r, checkRangeHasAllocations) if err != nil { return false, err @@ -270,7 +270,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) { // entries belonging to ourPeer. Returns the merged entries and an // indication whether the merge resulted in any changes, i.e. the // result differs from the original. -func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result entries, updated bool, err error) { +func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring, checkRangeHasAllocations func(r address.Range) bool) (result entries, updated bool, err error) { var mine, theirs *entry var previousOwner *mesh.PeerName addToResult := func(e entry) { result = append(result, &e) } @@ -280,14 +280,6 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e previousOwner = nil } - checkEntryHasAllocations := func(current, next *entry) bool { - size := r.distance(current.Token, next.Token) - if current.Free < size { - return true - } - return false - } - // i is index into es; j is index into other var i, j int for i < len(es) && j < len(other) { @@ -302,7 +294,7 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e // insert, checking that a range owned by us hasn't been split if previousOwner != nil && *previousOwner == ourPeer && theirs.Peer != ourPeer { // check we have no allocations in the range that got split - if checkEntryHasAllocations(theirs, mine) { + if checkRangeHasAllocations(address.Range{Start: theirs.Token, End: mine.Token}) { err = errEntryInMyRange(theirs) return } @@ -317,7 +309,7 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e if mine.Version == theirs.Version && !mine.Equal(theirs) { if mine.Peer == ourPeer { // if we own the entry and has allocations - if checkEntryHasAllocations(mine, es.entry(i+1)) { + if checkRangeHasAllocations(address.Range{Start: mine.Token, End: es.entry(i + 1).Token}) { err = errInconsistentEntry(mine, theirs) return } @@ -340,7 +332,7 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result e // in the range effectively given away, or it belongs to our own peer, in // which case we should set our version to the one received plus one, // effectively imposing our existing entry. - if theirs.Peer != ourPeer && !checkEntryHasAllocations(mine, es.entry(i+1)) { + if theirs.Peer != ourPeer && !checkRangeHasAllocations(address.Range{Start: mine.Token, End: es.entry(i + 1).Token}) { addTheirs(*theirs) } else if theirs.Peer == ourPeer { mine.Version = theirs.Version + 1 diff --git a/ipam/ring/ring_test.go b/ipam/ring/ring_test.go index 3a79869ca8..859a411e58 100644 --- a/ipam/ring/ring_test.go +++ b/ipam/ring/ring_test.go @@ -32,7 +32,25 @@ func ParseIP(s string) address.Address { } func merge(r1, r2 *Ring) error { - _, err := r1.Merge(*r2) + distance := func(start, end address.Address) address.Count { + if end > start { + return address.Count(end - start) + } + return address.Count((r1.End - start) + (end - r1.Start)) + } + checkEntryHasAllocations := func(r address.Range) bool { + size := distance(r.Start, r.End) + entry, found := r1.Entries.get(r.Start) + if !found { + return false + } + if entry.Free < size { + return true + } + return false + } + + _, err := r1.Merge(*r2, checkEntryHasAllocations) return err }