Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: parallel repair #131

Closed
25 changes: 24 additions & 1 deletion extendeddatacrossword.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -83,6 +84,7 @@ func (eds *ExtendedDataSquare) solveCrossword(
// Track if a single iteration of this loop made progress
progressMade := false

var trackerMutex sync.Mutex
rahulghangas marked this conversation as resolved.
Show resolved Hide resolved
// Loop through every row and column, attempt to rebuild each row or column if incomplete
for i := 0; i < int(eds.width); i++ {
i := i
Expand All @@ -93,23 +95,34 @@ func (eds *ExtendedDataSquare) solveCrossword(
return err
}

trackerMutex.Lock()
defer trackerMutex.Unlock()
solved = solved && solvedRow
progressMade = progressMade || progressMadeRow
return nil
})

}
if err := errs.Wait(); err != nil {
return err
}

for i := 0; i < int(eds.width); i++ {
i := i

errs.Go(func() error {
solvedCol, progressMadeCol, err := eds.solveCrosswordCol(i, rowRoots, colRoots)
if err != nil {
return err
}

trackerMutex.Lock()
defer trackerMutex.Unlock()
solved = solved && solvedCol
progressMade = progressMade || progressMadeCol
return nil
})
}

if err := errs.Wait(); err != nil {
return err
}
Expand Down Expand Up @@ -173,18 +186,23 @@ func (eds *ExtendedDataSquare) solveCrosswordRow(
if col[r] != nil {
continue // not newly completed
}

eds.rowColMutex[c].Lock()
col[r] = rebuiltShares[c]
if noMissingData(col) { // not completed
err := eds.verifyAgainstColRoots(colRoots, uint(c), col)
if err != nil {
return false, false, err
}
}
eds.rowColMutex[c].Unlock()
}

// Insert rebuilt shares into square.
for c, s := range rebuiltShares {
eds.rowColMutex[c].Lock()
eds.setCell(uint(r), uint(c), s)
eds.rowColMutex[c].Unlock()
}

return true, true, nil
Expand Down Expand Up @@ -235,18 +253,23 @@ func (eds *ExtendedDataSquare) solveCrosswordCol(
if row[c] != nil {
continue // not newly completed
}

eds.rowColMutex[r].Lock()
row[c] = rebuiltShares[r]
if noMissingData(row) { // not completed
err := eds.verifyAgainstRowRoots(rowRoots, uint(r), row)
if err != nil {
return false, false, err
}
}
eds.rowColMutex[r].Unlock()
}

// Insert rebuilt shares into square.
for r, s := range rebuiltShares {
eds.rowColMutex[r].Lock()
eds.setCell(uint(r), uint(c), s)
eds.rowColMutex[r].Unlock()
}

return true, true, nil
Expand Down
6 changes: 4 additions & 2 deletions extendeddatasquare.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"sync"

"golang.org/x/sync/errgroup"
)
Expand All @@ -14,6 +15,7 @@ type ExtendedDataSquare struct {
*dataSquare
codec Codec
originalDataWidth uint
rowColMutex []sync.Mutex
}

// ComputeExtendedDataSquare computes the extended data square for some chunks of data.
Expand All @@ -31,7 +33,7 @@ func ComputeExtendedDataSquare(
return nil, err
}

eds := ExtendedDataSquare{dataSquare: ds, codec: codec}
eds := ExtendedDataSquare{dataSquare: ds, codec: codec, rowColMutex: make([]sync.Mutex, ds.width)}
err = eds.erasureExtendSquare(codec)
if err != nil {
return nil, err
Expand All @@ -55,7 +57,7 @@ func ImportExtendedDataSquare(
return nil, err
}

eds := ExtendedDataSquare{dataSquare: ds, codec: codec}
eds := ExtendedDataSquare{dataSquare: ds, codec: codec, rowColMutex: make([]sync.Mutex, ds.width)}
if eds.width%2 != 0 {
return nil, errors.New("square width must be even")
}
Expand Down