Skip to content

Commit

Permalink
refactor: Improve rollback on peer P2P collection error (#1461)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #1389 

## Description

This PR improves the rollback of P2P collection manipulation errors. It
ensures that a rollback is applied if the transaction commit fails.
  • Loading branch information
fredcarle authored May 9, 2023
1 parent b6bd97a commit 5df70af
Showing 1 changed file with 56 additions and 47 deletions.
103 changes: 56 additions & 47 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,26 @@ type EvtPubSub struct {
Peer peer.ID
}

// rollbackAddPubSubTopics removes the given topics from the pubsub system.
func (p *Peer) rollbackAddPubSubTopics(topics []string, cause error) error {
for _, topic := range topics {
if err := p.server.removePubSubTopic(topic); err != nil {
return errors.WithStack(err, errors.NewKV("Cause", cause))
}
}
return cause
}

// rollbackRemovePubSubTopics adds back the given topics from the pubsub system.
func (p *Peer) rollbackRemovePubSubTopics(topics []string, cause error) error {
for _, topic := range topics {
if err := p.server.addPubSubTopic(topic, true); err != nil {
return errors.WithStack(err, errors.NewKV("Cause", cause))
}
}
return cause
}

// AddP2PCollections adds the given collectionIDs to the pubsup topics.
//
// It will error if any of the given collectionIDs are invalid, in such a case some of the
Expand All @@ -778,11 +798,13 @@ func (p *Peer) AddP2PCollections(collections []string) error {
store := p.db.WithTxn(txn)

// first let's make sure the collections actually exists
storeCollections := []client.Collection{}
for _, col := range collections {
_, err := store.GetCollectionBySchemaID(p.ctx, col)
storeCol, err := store.GetCollectionBySchemaID(p.ctx, col)
if err != nil {
return err
}
storeCollections = append(storeCollections, storeCol)
}

// Ensure we can add all the collections to the store on the transaction
Expand All @@ -799,42 +821,34 @@ func (p *Peer) AddP2PCollections(collections []string) error {
for _, col := range collections {
err = p.server.addPubSubTopic(col, true)
if err != nil {
for _, topic := range addedTopics {
e := p.server.removePubSubTopic(topic)
if e != nil {
return errors.WithStack(e, errors.NewKV("Cause", err))
}
}
return err
return p.rollbackAddPubSubTopics(addedTopics, err)
}
addedTopics = append(addedTopics, col)
}

// If adding the collection topics succeeds, we remove the collections' documents
// After adding the collection topics, we remove the collections' documents
// from the pubsub topics to avoid receiving duplicate events.
for _, col := range collections {
c, err := store.GetCollectionBySchemaID(p.ctx, col)
if err != nil {
return err
}
keyChan, err := c.GetAllDocKeys(p.ctx)
removedTopics := []string{}
for _, col := range storeCollections {
keyChan, err := col.GetAllDocKeys(p.ctx)
if err != nil {
return err
}
for key := range keyChan {
err := p.server.removePubSubTopic(key.Key.String())
if err != nil {
log.Info(
p.ctx,
"Failed to remove doc from pubsub topic",
logging.NewKV("DocKey", key.Key.String()),
logging.NewKV("Cause", err),
)
return p.rollbackRemovePubSubTopics(removedTopics, err)
}
removedTopics = append(removedTopics, key.Key.String())
}
}

return txn.Commit(p.ctx)
if err = txn.Commit(p.ctx); err != nil {
err = p.rollbackRemovePubSubTopics(removedTopics, err)
return p.rollbackAddPubSubTopics(addedTopics, err)
}

return nil
}

// RemoveP2PCollections removes the given collectionIDs from the pubsup topics.
Expand All @@ -852,11 +866,13 @@ func (p *Peer) RemoveP2PCollections(collections []string) error {
store := p.db.WithTxn(txn)

// first let's make sure the collections actually exists
storeCollections := []client.Collection{}
for _, col := range collections {
_, err := store.GetCollectionBySchemaID(p.ctx, col)
storeCol, err := store.GetCollectionBySchemaID(p.ctx, col)
if err != nil {
return err
}
storeCollections = append(storeCollections, storeCol)
}

// Ensure we can remove all the collections to the store on the transaction
Expand All @@ -873,62 +889,55 @@ func (p *Peer) RemoveP2PCollections(collections []string) error {
for _, col := range collections {
err = p.server.removePubSubTopic(col)
if err != nil {
for _, topic := range removedTopics {
e := p.server.addPubSubTopic(topic, true)
if e != nil {
return errors.WithStack(e, errors.NewKV("Cause", err))
}
}
return err
return p.rollbackRemovePubSubTopics(removedTopics, err)
}
removedTopics = append(removedTopics, col)
}

// If removing the collection topics succeeds, we add back the collections' documents
// After removing the collection topics, we add back the collections' documents
// to the pubsub topics.
for _, col := range collections {
c, err := store.GetCollectionBySchemaID(p.ctx, col)
if err != nil {
return err
}
keyChan, err := c.GetAllDocKeys(p.ctx)
addedTopics := []string{}
for _, col := range storeCollections {
keyChan, err := col.GetAllDocKeys(p.ctx)
if err != nil {
return err
}
for key := range keyChan {
err := p.server.addPubSubTopic(key.Key.String(), true)
if err != nil {
log.Info(
p.ctx,
"Failed to add doc to pubsub topic",
logging.NewKV("DocKey", key.Key.String()),
logging.NewKV("Cause", err),
)
return p.rollbackAddPubSubTopics(addedTopics, err)
}
addedTopics = append(addedTopics, key.Key.String())
}
}

return txn.Commit(p.ctx)
if err = txn.Commit(p.ctx); err != nil {
err = p.rollbackAddPubSubTopics(addedTopics, err)
return p.rollbackRemovePubSubTopics(removedTopics, err)
}

return nil
}

// GetAllP2PCollections gets all the collectionIDs from the pubsup topics
// GetAllP2PCollections gets all the collectionIDs that have been added to the
// pubsub topics from the system store.
func (p *Peer) GetAllP2PCollections() ([]client.P2PCollection, error) {
txn, err := p.db.NewTxn(p.ctx, false)
if err != nil {
return nil, err
}
defer txn.Discard(p.ctx)
store := p.db.WithTxn(txn)

collections, err := p.db.GetAllP2PCollections(p.ctx)
if err != nil {
txn.Discard(p.ctx)
return nil, err
}

p2pCols := []client.P2PCollection{}
for _, colID := range collections {
col, err := store.GetCollectionBySchemaID(p.ctx, colID)
if err != nil {
txn.Discard(p.ctx)
return nil, err
}
p2pCols = append(p2pCols, client.P2PCollection{
Expand Down

0 comments on commit 5df70af

Please sign in to comment.