-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(repo): merge repos' EventLogs using a simple first attempt
To merge repos' EventLogs, compare the Events timestamp by timestamp until we find a mismatch. Treat that mismatch as a conflict if the two events cannot be resolved, otherwise tell the merging peers that they updates that need to be added to their EventLogs.
- Loading branch information
Showing
3 changed files
with
336 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package actions | ||
|
||
import ( | ||
"github.com/qri-io/qri/repo" | ||
) | ||
|
||
// MergeResultSet contains information about how to merge a collection of EventLogs. | ||
type MergeResultSet struct { | ||
peers []MergeResultEntry | ||
} | ||
|
||
// MergeResultEntry contains information about how a single peer should update its EventLog. | ||
type MergeResultEntry struct { | ||
conflicts int | ||
updates int | ||
} | ||
|
||
// Peer gets a MegeResultEntry for a single peer. | ||
func (s MergeResultSet) Peer(i int) MergeResultEntry { | ||
return s.peers[i] | ||
} | ||
|
||
// NumConflicts gets the number of conflicts. | ||
func (e MergeResultEntry) NumConflicts() int { | ||
return e.conflicts | ||
} | ||
|
||
// NumUpdates gets the number of updates. | ||
func (e MergeResultEntry) NumUpdates() int { | ||
return e.updates | ||
} | ||
|
||
// MergeRepoEvents tries to merge multiple EventLogs. | ||
func MergeRepoEvents(one repo.Repo, two repo.Repo) (MergeResultSet, error) { | ||
resultSet := MergeResultSet{} | ||
resultSet.peers = make([]MergeResultEntry, 2) | ||
oneLog := one.(repo.EventLog) | ||
twoLog := two.(repo.EventLog) | ||
// TODO: Handle any length of EventLogs. | ||
oneEvents, _ := oneLog.Events(100, 0) | ||
twoEvents, _ := twoLog.Events(100, 0) | ||
var possibleConflictEvent *repo.Event | ||
// Events are stored in reverse timestamp order. | ||
i := len(oneEvents) - 1 | ||
j := len(twoEvents) - 1 | ||
for i >= 0 && j >= 0 { | ||
// TODO: It may be correct to find the first point of divergence, and then | ||
// check each pair-wise elements using CanResolveEvents. This loop, which works | ||
// like a zip, incorrectly assumes that a conflict can occur, followed by more | ||
// matching elements, but there's really no use case where that can happen. | ||
oneEv := oneEvents[i] | ||
twoEv := twoEvents[j] | ||
if oneEv.Time == twoEv.Time { | ||
i-- | ||
j-- | ||
continue | ||
} else if oneEv.Time.Before(twoEv.Time) { | ||
// TODO: Handle more than one conflict at a time. | ||
possibleConflictEvent = oneEv | ||
i-- | ||
continue | ||
} else if twoEv.Time.Before(oneEv.Time) { | ||
possibleConflictEvent = twoEv | ||
j-- | ||
continue | ||
} | ||
} | ||
// Handle any leftovers. | ||
for i >= 0 { | ||
oneEv := oneEvents[i] | ||
if possibleConflictEvent == nil { | ||
// TODO: Add data from oneEv into the update. | ||
resultSet.peers[1].updates++ | ||
} else if CanResolveEvents(*possibleConflictEvent, *oneEv) { | ||
resultSet.peers[0].updates++ | ||
resultSet.peers[1].updates++ | ||
} else { | ||
resultSet.peers[0].conflicts++ | ||
resultSet.peers[1].conflicts++ | ||
} | ||
i-- | ||
} | ||
for j >= 0 { | ||
twoEv := twoEvents[j] | ||
if possibleConflictEvent == nil { | ||
// TODO: Add data from twoEv into the update. | ||
resultSet.peers[0].updates++ | ||
} else if CanResolveEvents(*possibleConflictEvent, *twoEv) { | ||
resultSet.peers[0].updates++ | ||
resultSet.peers[1].updates++ | ||
} else { | ||
resultSet.peers[0].conflicts++ | ||
resultSet.peers[1].conflicts++ | ||
} | ||
j-- | ||
} | ||
return resultSet, nil | ||
} | ||
|
||
// CanResolveEvents determines whether two Events can be resolved, or if they conflict. | ||
func CanResolveEvents(left repo.Event, right repo.Event) bool { | ||
// TODO: Handle more cases. | ||
if left.Type == repo.ETDsRenamed && right.Type == repo.ETDsRenamed { | ||
return false | ||
} else if left.Type == repo.ETDsRenamed || right.Type == repo.ETDsRenamed { | ||
return true | ||
} else { | ||
return false | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
package actions | ||
|
||
import ( | ||
"github.com/qri-io/cafs" | ||
"github.com/qri-io/qri/repo" | ||
"github.com/qri-io/qri/repo/profile" | ||
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" | ||
"testing" | ||
) | ||
|
||
const refPath0 = "/ipfs/Qmaau1d1WjnQdTYfRYfFVsCS97cgD8ATyrKuNoEfexL7JZ" | ||
const refPath1 = "/ipfs/QmafgXF3u3QSWErQoZ2URmQp5PFG384htoE7J338nS2H7T" | ||
const refPath2 = "/ipfs/QmbNinL4ErzM73BxQSNf8q2vPmAM4v3MNQM2DmDqDjt47D" | ||
const refPath3 = "/ipfs/Qmc5do1bC3JH73MThEgKgKkgNKLmqrvh2uaE6919yDmUaa" | ||
|
||
const peerAID = "QmenK8PgcpM2KYzEKnGx1LyN1QXawswM2F6HktCbWKuC1b" | ||
const peerBID = "Qmf2fBLzCvFxs3vvZaoQyKSTv2rjApek4F1kzRxAfK6P4P" | ||
|
||
const profileAID = "QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv" | ||
const profileBID = "QmSk46nSD78YiuNojYMS8NW6hSCjH95JajqqzzoychZgef" | ||
|
||
func createReposAndLogs() (repo.Repo, repo.Repo, *repo.MemEventLog, *repo.MemEventLog) { | ||
aRepo, _ := repo.NewMemRepo(&profile.Profile{ | ||
ID: profile.ID(profileAID), | ||
Peername: "test-peer-0", | ||
}, cafs.NewMapstore(), profile.MemStore{}) | ||
bRepo, _ := repo.NewMemRepo(&profile.Profile{ | ||
ID: profile.ID(profileBID), | ||
Peername: "test-peer-0", | ||
}, cafs.NewMapstore(), profile.MemStore{}) | ||
aLog := aRepo.(*repo.MemRepo).MemEventLog | ||
bLog := bRepo.(*repo.MemRepo).MemEventLog | ||
return aRepo, bRepo, aLog, bLog | ||
} | ||
|
||
func TestNewChangesThatCanMerge(t *testing.T) { | ||
aRepo, bRepo, aLog, bLog := createReposAndLogs() | ||
|
||
// Going to make 3 updates after the initial creation, but dataset name never changes. | ||
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0} | ||
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath1} | ||
ref2 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath2} | ||
ref3 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath3} | ||
|
||
peerAID, _ := peer.IDB58Decode(peerAID) | ||
peerBID, _ := peer.IDB58Decode(peerBID) | ||
|
||
// Events for A. | ||
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
aLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil) | ||
// Events for B (same exact events). | ||
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
bLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil) | ||
// New stuff on B, can be merged. | ||
bLog.LogEventDetails(repo.ETDsCreated, 1010, peerBID, ref2, nil) | ||
bLog.LogEventDetails(repo.ETDsCreated, 1011, peerBID, ref3, nil) | ||
|
||
resultSet, err := MergeRepoEvents(aRepo, bRepo) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
if resultSet.Peer(0).NumConflicts() != 0 { | ||
t.Errorf("Expected no conflicts") | ||
} | ||
if resultSet.Peer(0).NumUpdates() != 2 { | ||
t.Errorf("Expected 2 updates for Peer A") | ||
} | ||
if resultSet.Peer(1).NumConflicts() != 0 { | ||
t.Errorf("Expected no conflicts") | ||
} | ||
if resultSet.Peer(1).NumUpdates() != 0 { | ||
t.Errorf("Expected 0 updates for Peer B") | ||
} | ||
} | ||
|
||
func TestBothMadeChanges(t *testing.T) { | ||
aRepo, bRepo, aLog, bLog := createReposAndLogs() | ||
|
||
// Going to make 1 update after the initial creation, then an update on each repo. | ||
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0} | ||
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath1} | ||
ref2 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath2} | ||
ref3 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath3} | ||
|
||
peerAID, _ := peer.IDB58Decode(peerAID) | ||
peerBID, _ := peer.IDB58Decode(peerBID) | ||
|
||
// Events for A. | ||
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
aLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil) | ||
// Events for B (same exact events). | ||
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
bLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil) | ||
// New stuff on A. | ||
aLog.LogEventDetails(repo.ETDsCreated, 1010, peerAID, ref2, nil) | ||
// Also new stuff on B, this is a conflict. | ||
bLog.LogEventDetails(repo.ETDsCreated, 1020, peerBID, ref3, nil) | ||
|
||
resultSet, err := MergeRepoEvents(aRepo, bRepo) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
if resultSet.Peer(0).NumConflicts() != 1 { | ||
t.Errorf("Expected 1 conflict") | ||
} | ||
if resultSet.Peer(0).NumUpdates() != 0 { | ||
t.Errorf("Expected 0 updates for Peer A, got %d", resultSet.Peer(0).NumUpdates()) | ||
} | ||
if resultSet.Peer(1).NumConflicts() != 1 { | ||
t.Errorf("Expected 1 conflict") | ||
} | ||
if resultSet.Peer(1).NumUpdates() != 0 { | ||
t.Errorf("Expected 0 updates for Peer B, got %d", resultSet.Peer(1).NumUpdates()) | ||
} | ||
} | ||
|
||
func TestDeleteAfterRename(t *testing.T) { | ||
aRepo, bRepo, aLog, bLog := createReposAndLogs() | ||
|
||
peerAID, _ := peer.IDB58Decode(peerAID) | ||
peerBID, _ := peer.IDB58Decode(peerBID) | ||
|
||
// Going to make 1 update after the initial creation, which is a rename. | ||
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0} | ||
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-1", Path: refPath0} | ||
|
||
// Events for A | ||
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
// Events for B (same exact events). | ||
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
|
||
// B renames. | ||
bLog.LogEventDetails(repo.ETDsRenamed, 1010, peerBID, ref1, | ||
[2]string{"test-dataset-0", "test-dataset-1"}) | ||
// A deletes (should apply to new name). | ||
aLog.LogEventDetails(repo.ETDsDeleted, 1020, peerAID, ref, nil) | ||
|
||
resultSet, err := MergeRepoEvents(aRepo, bRepo) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
if resultSet.Peer(0).NumConflicts() != 0 { | ||
t.Errorf("Expected no conflicts") | ||
} | ||
// Maybe, Peer A doesn't have any updates because it deleted the dataset, | ||
// so doesn't need to do any work. | ||
if resultSet.Peer(0).NumUpdates() != 1 { | ||
t.Errorf("Expected 1 updates for Peer A") | ||
} | ||
if resultSet.Peer(1).NumConflicts() != 0 { | ||
t.Errorf("Expected no conflicts") | ||
} | ||
if resultSet.Peer(1).NumUpdates() != 1 { | ||
t.Errorf("Expected 1 updates for Peer B") | ||
} | ||
} | ||
|
||
func TestRenameAndAddContent(t *testing.T) { | ||
aRepo, bRepo, aLog, bLog := createReposAndLogs() | ||
|
||
peerAID, _ := peer.IDB58Decode(peerAID) | ||
peerBID, _ := peer.IDB58Decode(peerBID) | ||
|
||
// Going to make 1 update after the initial creation, which is a rename. | ||
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0} | ||
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-1", Path: refPath0} | ||
ref2 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath2} | ||
|
||
// Events for A | ||
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
// Events for B (same exact events). | ||
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil) | ||
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil) | ||
|
||
// B renames. | ||
bLog.LogEventDetails(repo.ETDsRenamed, 1010, peerBID, ref1, | ||
[2]string{"test-dataset-0", "test-dataset-1"}) | ||
// A adds content. | ||
aLog.LogEventDetails(repo.ETDsCreated, 1020, peerAID, ref2, nil) | ||
|
||
resultSet, err := MergeRepoEvents(aRepo, bRepo) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
if resultSet.Peer(0).NumConflicts() != 0 { | ||
t.Errorf("Expected no conflicts") | ||
} | ||
if resultSet.Peer(0).NumUpdates() != 1 { | ||
t.Errorf("Expected 1 updates for Peer A") | ||
} | ||
if resultSet.Peer(1).NumConflicts() != 0 { | ||
t.Errorf("Expected no conflicts") | ||
} | ||
if resultSet.Peer(1).NumUpdates() != 1 { | ||
t.Errorf("Expected 1 updates for Peer B") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters