Skip to content

Commit

Permalink
feat(repo): merge repos' EventLogs using a simple first attempt
Browse files Browse the repository at this point in the history
To merge repos' EventLogs, compare the Events timestamp by timestamp until we find a mismatch.
Treat that mismatch as a conflict if the two events cannot be resolved, otherwise tell the merging
peers that they updates that need to be added to their EventLogs.
  • Loading branch information
dustmop committed May 10, 2018
1 parent 52285c1 commit e5997bb
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 3 deletions.
110 changes: 110 additions & 0 deletions repo/actions/merge_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package actions

import (
"github.com/qri-io/qri/repo"
)

// MergeResultSet contains information about how to merge a collection of EventLogs.
type MergeResultSet struct {
peers []MergeResultEntry
}

// MergeResultEntry contains information about how a single peer should update its EventLog.
type MergeResultEntry struct {
conflicts int
updates int
}

// Peer gets a MegeResultEntry for a single peer.
func (s MergeResultSet) Peer(i int) MergeResultEntry {
return s.peers[i]
}

// NumConflicts gets the number of conflicts.
func (e MergeResultEntry) NumConflicts() int {
return e.conflicts
}

// NumUpdates gets the number of updates.
func (e MergeResultEntry) NumUpdates() int {
return e.updates
}

// MergeRepoEvents tries to merge multiple EventLogs.
func MergeRepoEvents(one repo.Repo, two repo.Repo) (MergeResultSet, error) {
resultSet := MergeResultSet{}
resultSet.peers = make([]MergeResultEntry, 2)
oneLog := one.(repo.EventLog)
twoLog := two.(repo.EventLog)
// TODO: Handle any length of EventLogs.
oneEvents, _ := oneLog.Events(100, 0)
twoEvents, _ := twoLog.Events(100, 0)
var possibleConflictEvent *repo.Event
// Events are stored in reverse timestamp order.
i := len(oneEvents) - 1
j := len(twoEvents) - 1
for i >= 0 && j >= 0 {
// TODO: It may be correct to find the first point of divergence, and then
// check each pair-wise elements using CanResolveEvents. This loop, which works
// like a zip, incorrectly assumes that a conflict can occur, followed by more
// matching elements, but there's really no use case where that can happen.
oneEv := oneEvents[i]
twoEv := twoEvents[j]
if oneEv.Time == twoEv.Time {
i--
j--
continue
} else if oneEv.Time.Before(twoEv.Time) {
// TODO: Handle more than one conflict at a time.
possibleConflictEvent = oneEv
i--
continue
} else if twoEv.Time.Before(oneEv.Time) {
possibleConflictEvent = twoEv
j--
continue
}
}
// Handle any leftovers.
for i >= 0 {
oneEv := oneEvents[i]
if possibleConflictEvent == nil {
// TODO: Add data from oneEv into the update.
resultSet.peers[1].updates++
} else if CanResolveEvents(*possibleConflictEvent, *oneEv) {
resultSet.peers[0].updates++
resultSet.peers[1].updates++
} else {
resultSet.peers[0].conflicts++
resultSet.peers[1].conflicts++
}
i--
}
for j >= 0 {
twoEv := twoEvents[j]
if possibleConflictEvent == nil {
// TODO: Add data from twoEv into the update.
resultSet.peers[0].updates++
} else if CanResolveEvents(*possibleConflictEvent, *twoEv) {
resultSet.peers[0].updates++
resultSet.peers[1].updates++
} else {
resultSet.peers[0].conflicts++
resultSet.peers[1].conflicts++
}
j--
}
return resultSet, nil
}

// CanResolveEvents determines whether two Events can be resolved, or if they conflict.
func CanResolveEvents(left repo.Event, right repo.Event) bool {
// TODO: Handle more cases.
if left.Type == repo.ETDsRenamed && right.Type == repo.ETDsRenamed {
return false
} else if left.Type == repo.ETDsRenamed || right.Type == repo.ETDsRenamed {
return true
} else {
return false
}
}
204 changes: 204 additions & 0 deletions repo/actions/merge_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package actions

import (
"github.com/qri-io/cafs"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
"testing"
)

const refPath0 = "/ipfs/Qmaau1d1WjnQdTYfRYfFVsCS97cgD8ATyrKuNoEfexL7JZ"
const refPath1 = "/ipfs/QmafgXF3u3QSWErQoZ2URmQp5PFG384htoE7J338nS2H7T"
const refPath2 = "/ipfs/QmbNinL4ErzM73BxQSNf8q2vPmAM4v3MNQM2DmDqDjt47D"
const refPath3 = "/ipfs/Qmc5do1bC3JH73MThEgKgKkgNKLmqrvh2uaE6919yDmUaa"

const peerAID = "QmenK8PgcpM2KYzEKnGx1LyN1QXawswM2F6HktCbWKuC1b"
const peerBID = "Qmf2fBLzCvFxs3vvZaoQyKSTv2rjApek4F1kzRxAfK6P4P"

const profileAID = "QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv"
const profileBID = "QmSk46nSD78YiuNojYMS8NW6hSCjH95JajqqzzoychZgef"

func createReposAndLogs() (repo.Repo, repo.Repo, *repo.MemEventLog, *repo.MemEventLog) {
aRepo, _ := repo.NewMemRepo(&profile.Profile{
ID: profile.ID(profileAID),
Peername: "test-peer-0",
}, cafs.NewMapstore(), profile.MemStore{})
bRepo, _ := repo.NewMemRepo(&profile.Profile{
ID: profile.ID(profileBID),
Peername: "test-peer-0",
}, cafs.NewMapstore(), profile.MemStore{})
aLog := aRepo.(*repo.MemRepo).MemEventLog
bLog := bRepo.(*repo.MemRepo).MemEventLog
return aRepo, bRepo, aLog, bLog
}

func TestNewChangesThatCanMerge(t *testing.T) {
aRepo, bRepo, aLog, bLog := createReposAndLogs()

// Going to make 3 updates after the initial creation, but dataset name never changes.
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0}
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath1}
ref2 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath2}
ref3 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath3}

peerAID, _ := peer.IDB58Decode(peerAID)
peerBID, _ := peer.IDB58Decode(peerBID)

// Events for A.
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)
aLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil)
// Events for B (same exact events).
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)
bLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil)
// New stuff on B, can be merged.
bLog.LogEventDetails(repo.ETDsCreated, 1010, peerBID, ref2, nil)
bLog.LogEventDetails(repo.ETDsCreated, 1011, peerBID, ref3, nil)

resultSet, err := MergeRepoEvents(aRepo, bRepo)
if err != nil {
log.Fatal(err)
}
if resultSet.Peer(0).NumConflicts() != 0 {
t.Errorf("Expected no conflicts")
}
if resultSet.Peer(0).NumUpdates() != 2 {
t.Errorf("Expected 2 updates for Peer A")
}
if resultSet.Peer(1).NumConflicts() != 0 {
t.Errorf("Expected no conflicts")
}
if resultSet.Peer(1).NumUpdates() != 0 {
t.Errorf("Expected 0 updates for Peer B")
}
}

func TestBothMadeChanges(t *testing.T) {
aRepo, bRepo, aLog, bLog := createReposAndLogs()

// Going to make 1 update after the initial creation, then an update on each repo.
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0}
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath1}
ref2 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath2}
ref3 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath3}

peerAID, _ := peer.IDB58Decode(peerAID)
peerBID, _ := peer.IDB58Decode(peerBID)

// Events for A.
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)
aLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil)
// Events for B (same exact events).
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)
bLog.LogEventDetails(repo.ETDsCreated, 1002, peerAID, ref1, nil)
// New stuff on A.
aLog.LogEventDetails(repo.ETDsCreated, 1010, peerAID, ref2, nil)
// Also new stuff on B, this is a conflict.
bLog.LogEventDetails(repo.ETDsCreated, 1020, peerBID, ref3, nil)

resultSet, err := MergeRepoEvents(aRepo, bRepo)
if err != nil {
log.Fatal(err)
}
if resultSet.Peer(0).NumConflicts() != 1 {
t.Errorf("Expected 1 conflict")
}
if resultSet.Peer(0).NumUpdates() != 0 {
t.Errorf("Expected 0 updates for Peer A, got %d", resultSet.Peer(0).NumUpdates())
}
if resultSet.Peer(1).NumConflicts() != 1 {
t.Errorf("Expected 1 conflict")
}
if resultSet.Peer(1).NumUpdates() != 0 {
t.Errorf("Expected 0 updates for Peer B, got %d", resultSet.Peer(1).NumUpdates())
}
}

func TestDeleteAfterRename(t *testing.T) {
aRepo, bRepo, aLog, bLog := createReposAndLogs()

peerAID, _ := peer.IDB58Decode(peerAID)
peerBID, _ := peer.IDB58Decode(peerBID)

// Going to make 1 update after the initial creation, which is a rename.
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0}
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-1", Path: refPath0}

// Events for A
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)
// Events for B (same exact events).
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)

// B renames.
bLog.LogEventDetails(repo.ETDsRenamed, 1010, peerBID, ref1,
[2]string{"test-dataset-0", "test-dataset-1"})
// A deletes (should apply to new name).
aLog.LogEventDetails(repo.ETDsDeleted, 1020, peerAID, ref, nil)

resultSet, err := MergeRepoEvents(aRepo, bRepo)
if err != nil {
log.Fatal(err)
}
if resultSet.Peer(0).NumConflicts() != 0 {
t.Errorf("Expected no conflicts")
}
// Maybe, Peer A doesn't have any updates because it deleted the dataset,
// so doesn't need to do any work.
if resultSet.Peer(0).NumUpdates() != 1 {
t.Errorf("Expected 1 updates for Peer A")
}
if resultSet.Peer(1).NumConflicts() != 0 {
t.Errorf("Expected no conflicts")
}
if resultSet.Peer(1).NumUpdates() != 1 {
t.Errorf("Expected 1 updates for Peer B")
}
}

func TestRenameAndAddContent(t *testing.T) {
aRepo, bRepo, aLog, bLog := createReposAndLogs()

peerAID, _ := peer.IDB58Decode(peerAID)
peerBID, _ := peer.IDB58Decode(peerBID)

// Going to make 1 update after the initial creation, which is a rename.
ref := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath0}
ref1 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-1", Path: refPath0}
ref2 := repo.DatasetRef{Peername: "test-peer-0", Name: "test-dataset-0", Path: refPath2}

// Events for A
aLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
aLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)
// Events for B (same exact events).
bLog.LogEventDetails(repo.ETDsCreated, 1000, peerAID, ref, nil)
bLog.LogEventDetails(repo.ETDsPinned, 1001, peerAID, ref, nil)

// B renames.
bLog.LogEventDetails(repo.ETDsRenamed, 1010, peerBID, ref1,
[2]string{"test-dataset-0", "test-dataset-1"})
// A adds content.
aLog.LogEventDetails(repo.ETDsCreated, 1020, peerAID, ref2, nil)

resultSet, err := MergeRepoEvents(aRepo, bRepo)
if err != nil {
log.Fatal(err)
}
if resultSet.Peer(0).NumConflicts() != 0 {
t.Errorf("Expected no conflicts")
}
if resultSet.Peer(0).NumUpdates() != 1 {
t.Errorf("Expected 1 updates for Peer A")
}
if resultSet.Peer(1).NumConflicts() != 0 {
t.Errorf("Expected no conflicts")
}
if resultSet.Peer(1).NumUpdates() != 1 {
t.Errorf("Expected 1 updates for Peer B")
}
}
25 changes: 22 additions & 3 deletions repo/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package repo

import (
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
"sort"
"time"
)
Expand All @@ -14,9 +15,11 @@ type EventLog interface {

// Event is a list of details for logging a query
type Event struct {
Time time.Time
Type EventType
Ref DatasetRef
Time time.Time
Type EventType
Ref DatasetRef
PeerID peer.ID
Params interface{}
}

// EventType classifies types of events that can be logged
Expand Down Expand Up @@ -54,6 +57,22 @@ func (log *MemEventLog) LogEvent(t EventType, ref DatasetRef) error {
return nil
}

// LogEventDetails adds an entry to the log
// TODO: Update LogEvent to work like this, update callers.
func (log *MemEventLog) LogEventDetails(t EventType, when int64, peerID peer.ID, ref DatasetRef, params interface{}) error {
e := &Event{
Time: time.Unix(when, 0),
Type: t,
Ref: ref,
PeerID: peerID,
Params: params,
}
logs := append([]*Event{e}, *log...)
sort.Slice(logs, func(i, j int) bool { return logs[i].Time.After(logs[j].Time) })
*log = logs
return nil
}

// Events grabs a set of Events from the store
func (log MemEventLog) Events(limit, offset int) ([]*Event, error) {
if offset > len(log) {
Expand Down

0 comments on commit e5997bb

Please sign in to comment.