Skip to content

Commit

Permalink
repair in batches (#1347)
Browse files Browse the repository at this point in the history
* repair in batches

* fix lint

* fix unit test

* fix batch size

---------

Co-authored-by: Yury <yuderbasov@gmail.com>
  • Loading branch information
Hitenjain14 and dabasov authored Jan 4, 2024
1 parent 2e55c3f commit 4ed3f42
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 164 deletions.
49 changes: 30 additions & 19 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,16 @@ type OperationRequest struct {
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
IsRepair bool // Required for repair operation
IsWebstreaming bool

// Required for uploads
Workdir string
FileMeta FileMeta
FileReader io.Reader
Opts []ChunkedUploadOption
Workdir string
FileMeta FileMeta
FileReader io.Reader
Mask *zboxutil.Uint128 // Required for delete repair operation
DownloadFile bool // Required for upload repair operation
Opts []ChunkedUploadOption
}

func GetReadPriceRange() (PriceRange, error) {
Expand Down Expand Up @@ -377,9 +380,7 @@ func (a *Allocation) UploadFile(workdir, localpath string, remotepath string,
return a.StartChunkedUpload(workdir, localpath, remotepath, status, false, false, "", false, false)
}

func (a *Allocation) RepairFile(file sys.File, remotepath string,
status StatusCallback, mask zboxutil.Uint128, ref *fileref.FileRef) error {

func (a *Allocation) RepairFile(file sys.File, remotepath string, statusCallback StatusCallback, mask zboxutil.Uint128, ref *fileref.FileRef) *OperationRequest {
idr, _ := homedir.Dir()
if Workdir != "" {
idr = Workdir
Expand All @@ -395,22 +396,26 @@ func (a *Allocation) RepairFile(file sys.File, remotepath string,
if ref.EncryptedKey != "" {
opts = []ChunkedUploadOption{
WithMask(mask),
WithStatusCallback(status),
WithEncrypt(true),
WithStatusCallback(statusCallback),
WithEncryptedPoint(ref.EncryptedKeyPoint),
}
} else {
opts = []ChunkedUploadOption{
WithMask(mask),
WithStatusCallback(status),
WithStatusCallback(statusCallback),
}
}
connectionID := zboxutil.NewConnectionId()
chunkedUpload, err := CreateChunkedUpload(idr, a, fileMeta, file, false, true, false, connectionID, opts...)
if err != nil {
return err
op := &OperationRequest{
OperationType: constants.FileOperationInsert,
IsRepair: true,
RemotePath: remotepath,
Workdir: idr,
FileMeta: fileMeta,
Opts: opts,
FileReader: file,
}
return chunkedUpload.Start()
return op
}

// UpdateFileWithThumbnail [Deprecated]please use CreateChunkedUpload
Expand Down Expand Up @@ -795,7 +800,7 @@ func (a *Allocation) RepairRequired(remotepath string) (zboxutil.Uint128, zboxut
return found, deleteMask, !found.Equals(uploadMask), fileRef, nil
}

func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...MultiOperationOption) error {
if len(operations) == 0 {
return nil
}
Expand All @@ -816,6 +821,9 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
consensusThresh: a.consensusThreshold,
fullconsensus: a.fullconsensus,
}
for _, opt := range opts {
opt(&mo)
}
previousPaths := make(map[string]bool)
connectionErrors := make([]error, len(mo.allocationObj.Blobbers))

Expand Down Expand Up @@ -883,13 +891,16 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationInsert:
operation, newConnectionID, err = NewUploadOperation(op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.Opts...)
operation, newConnectionID, err = NewUploadOperation(op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.Opts...)

case constants.FileOperationDelete:
operation = NewDeleteOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

if op.Mask != nil {
operation = NewDeleteOperation(op.RemotePath, *op.Mask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
} else {
operation = NewDeleteOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
}
case constants.FileOperationUpdate:
operation, newConnectionID, err = NewUploadOperation(op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.Opts...)
operation, newConnectionID, err = NewUploadOperation(op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.Opts...)

case constants.FileOperationCreateDir:
operation = NewDirOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
Expand Down
91 changes: 52 additions & 39 deletions zboxcore/sdk/allocation_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,32 +647,32 @@ func TestAllocation_RepairFile(t *testing.T) {
ClientKey: mockClientKey,
}

setupHttpResponses := func(t *testing.T, testName string, numBlobbers, numCorrect int) {
require.True(t, numBlobbers >= numCorrect)
for i := 0; i < numBlobbers; i++ {
var hash string
if i < numCorrect {
hash = mockActualHash
}
frName := mockFileRefName + strconv.Itoa(i)
url := "http://TestAllocation_RepairFile" + testName + mockBlobberUrl + strconv.Itoa(i) + "/v1/file/meta"
mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool {
return strings.HasPrefix(req.URL.String(), url)
})).Return(&http.Response{
StatusCode: http.StatusOK,
Body: func(fileRefName, hash string) io.ReadCloser {
jsonFR, err := json.Marshal(&fileref.FileRef{
ActualFileHash: hash,
Ref: fileref.Ref{
Name: fileRefName,
},
})
require.NoError(t, err)
return ioutil.NopCloser(bytes.NewReader([]byte(jsonFR)))
}(frName, hash),
}, nil)
}
}
// setupHttpResponses := func(t *testing.T, testName string, numBlobbers, numCorrect int) {
// require.True(t, numBlobbers >= numCorrect)
// for i := 0; i < numBlobbers; i++ {
// var hash string
// if i < numCorrect {
// hash = mockActualHash
// }
// frName := mockFileRefName + strconv.Itoa(i)
// url := "http://TestAllocation_RepairFile" + testName + mockBlobberUrl + strconv.Itoa(i) + "/v1/file/meta"
// mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool {
// return strings.HasPrefix(req.URL.String(), url)
// })).Return(&http.Response{
// StatusCode: http.StatusOK,
// Body: func(fileRefName, hash string) io.ReadCloser {
// jsonFR, err := json.Marshal(&fileref.FileRef{
// ActualFileHash: hash,
// Ref: fileref.Ref{
// Name: fileRefName,
// },
// })
// require.NoError(t, err)
// return ioutil.NopCloser(bytes.NewReader([]byte(jsonFR)))
// }(frName, hash),
// }, nil)
// }
// }

setupHttpResponsesWithUpload := func(t *testing.T, testName string, numBlobbers, numCorrect int) {
require.True(t, numBlobbers >= numCorrect)
Expand Down Expand Up @@ -783,6 +783,18 @@ func TestAllocation_RepairFile(t *testing.T) {
return ioutil.NopCloser(bytes.NewReader(respBuf))
}(),
}, nil)

urlCreateConnection := "http://TestAllocation_RepairFile" + testName + mockBlobberUrl + strconv.Itoa(i) + zboxutil.CREATE_CONNECTION_ENDPOINT
urlCreateConnection = strings.TrimRight(urlCreateConnection, "/")
mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool {
return strings.HasPrefix(req.URL.String(), urlCreateConnection)
})).Return(&http.Response{
StatusCode: http.StatusOK,
Body: func() io.ReadCloser {
respBuf, _ := json.Marshal("connection_id")
return ioutil.NopCloser(bytes.NewReader(respBuf))
}(),
}, nil)
}
}

Expand All @@ -801,17 +813,17 @@ func TestAllocation_RepairFile(t *testing.T) {
wantRepair bool
errMsg string
}{
{
name: "Test_Repair_Not_Required_Failed",
parameters: parameters{
localPath: mockLocalPath,
remotePath: "/",
},
numBlobbers: 4,
numCorrect: 4,
setup: setupHttpResponses,
wantRepair: false,
},
// {
// name: "Test_Repair_Not_Required_Failed",
// parameters: parameters{
// localPath: mockLocalPath,
// remotePath: "/",
// },
// numBlobbers: 4,
// numCorrect: 4,
// setup: setupHttpResponses,
// wantRepair: false,
// },
{
name: "Test_Repair_Required_Success",
parameters: parameters{
Expand Down Expand Up @@ -845,13 +857,13 @@ func TestAllocation_RepairFile(t *testing.T) {
a.mutex = &sync.Mutex{}
a.initialized = true
sdkInitialized = true
setupMockAllocation(t, a)
for i := 0; i < tt.numBlobbers; i++ {
a.Blobbers = append(a.Blobbers, &blockchain.StorageNode{
ID: mockBlobberId + strconv.Itoa(i),
Baseurl: "http://TestAllocation_RepairFile" + tt.name + mockBlobberUrl + strconv.Itoa(i),
})
}
setupMockAllocation(t, a)
tt.setup(t, tt.name, tt.numBlobbers, tt.numCorrect)
found, _, isRequired, ref, err := a.RepairRequired(tt.parameters.remotePath)
require.Nil(err)
Expand All @@ -865,7 +877,8 @@ func TestAllocation_RepairFile(t *testing.T) {
require.Nil(err)
require.NotNil(sz)
ref.ActualSize = sz.Size()
err = a.RepairFile(f, tt.parameters.remotePath, tt.parameters.status, found, ref)
op := a.RepairFile(f, tt.parameters.remotePath, tt.parameters.status, found, ref)
err = a.DoMultiOperation([]OperationRequest{*op}, WithRepair())
if tt.wantErr {
require.NotNil(err)
} else {
Expand Down
1 change: 1 addition & 0 deletions zboxcore/sdk/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2338,6 +2338,7 @@ func setupMockAllocation(t *testing.T, a *Allocation) {
a.downloadProgressMap = make(map[string]*DownloadRequest)
a.mutex = &sync.Mutex{}
a.FileOptions = uint16(63) // 0011 1111 All allowed
InitCommitWorker(a.Blobbers)
a.initialized = true
if a.DataShards != 0 {
a.fullconsensus, a.consensusThreshold = a.getConsensuses()
Expand Down
11 changes: 5 additions & 6 deletions zboxcore/sdk/commitworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,18 @@ func (commitreq *CommitRequest) processCommit() {
for _, change := range commitreq.changes {
paths = append(paths, change.GetAffectedPath()...)
}
if len(paths) == 0 {
l.Logger.Info("Nothing to commit")
commitreq.result = SuccessCommitResult()
return
}
var req *http.Request
var lR ReferencePathResult
req, err := zboxutil.NewReferencePathRequest(commitreq.blobber.Baseurl, commitreq.allocationID, commitreq.allocationTx, paths)
if err != nil {
l.Logger.Error("Creating ref path req", err)
return
}
if len(paths) == 0 {
l.Logger.Info("Nothing to commit")
commitreq.result = SuccessCommitResult()
return
}

ctx, cncl := context.WithTimeout(context.Background(), (time.Second * 30))
err = zboxutil.HttpDo(ctx, cncl, req, func(resp *http.Response, err error) error {
if err != nil {
Expand Down
71 changes: 43 additions & 28 deletions zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ const (

var BatchSize = 6

type MultiOperationOption func(mo *MultiOperation)

func WithRepair() MultiOperationOption {
return func(mo *MultiOperation) {
mo.Consensus.consensusThresh = 0
mo.isRepair = true
}
}

type Operationer interface {
Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error)
buildChange(refs []fileref.RefEntity, uid uuid.UUID) []allocationchange.AllocationChange
Expand All @@ -48,7 +57,8 @@ type MultiOperation struct {
operationMask zboxutil.Uint128
maskMU *sync.Mutex
Consensus
changes [][]allocationchange.AllocationChange
changes [][]allocationchange.AllocationChange
isRepair bool
}

func (mo *MultiOperation) createConnectionObj(blobberIdx int) (err error) {
Expand Down Expand Up @@ -220,37 +230,40 @@ func (mo *MultiOperation) Process() error {
}
logger.Logger.Info("[writemarkerLocked]", time.Since(start).Milliseconds())
start = time.Now()
status, err := mo.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status", err)
writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck
return fmt.Errorf("Check allocation status failed: %s", err.Error())
}
if status == Repair {
logger.Logger.Info("Repairing allocation")
writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck
statusBar := NewRepairBar(mo.allocationObj.ID)
if statusBar == nil {
status := Commit
if !mo.isRepair {
status, err = mo.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status", err)
writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck
return fmt.Errorf("Check allocation status failed: %s", err.Error())
}
if status == Repair {
logger.Logger.Info("Repairing allocation")
writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck
statusBar := NewRepairBar(mo.allocationObj.ID)
if statusBar == nil {
for _, op := range mo.operations {
op.Error(mo.allocationObj, 0, ErrRetryOperation)
}
return ErrRetryOperation
}
statusBar.wg.Add(1)
err = mo.allocationObj.RepairAlloc(statusBar)
if err != nil {
return err
}
statusBar.wg.Wait()
if statusBar.success {
l.Logger.Info("Repair success")
} else {
l.Logger.Error("Repair failed")
}
for _, op := range mo.operations {
op.Error(mo.allocationObj, 0, ErrRetryOperation)
}
return ErrRetryOperation
}
statusBar.wg.Add(1)
err = mo.allocationObj.RepairAlloc(statusBar)
if err != nil {
return err
}
statusBar.wg.Wait()
if statusBar.success {
l.Logger.Info("Repair success")
} else {
l.Logger.Error("Repair failed")
}
for _, op := range mo.operations {
op.Error(mo.allocationObj, 0, ErrRetryOperation)
}
return ErrRetryOperation
}
defer writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck
if status != Commit {
Expand Down Expand Up @@ -293,7 +306,9 @@ func (mo *MultiOperation) Process() error {
if commitReq.result != nil {
if commitReq.result.Success {
l.Logger.Info("Commit success", commitReq.blobber.Baseurl)
rollbackMask = rollbackMask.Or(zboxutil.NewUint128(1).Lsh(commitReq.blobberInd))
if !mo.isRepair {
rollbackMask = rollbackMask.Or(zboxutil.NewUint128(1).Lsh(commitReq.blobberInd))
}
mo.consensus++
} else {
l.Logger.Info("Commit failed", commitReq.blobber.Baseurl, commitReq.result.ErrorMessage)
Expand Down
Loading

0 comments on commit 4ed3f42

Please sign in to comment.