Skip to content

Commit

Permalink
Merge pull request #1394 from 0chain/sprint-1.13
Browse files Browse the repository at this point in the history
Sprint 1.13
  • Loading branch information
dabasov authored Feb 28, 2024
2 parents dc9d4c4 + ebbb3d9 commit c2bd391
Show file tree
Hide file tree
Showing 23 changed files with 571 additions and 429 deletions.
38 changes: 11 additions & 27 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"github.com/remeh/sizedwaitgroup"
"golang.org/x/sync/errgroup"

"go.uber.org/zap"
"gorm.io/gorm"
Expand Down Expand Up @@ -236,35 +236,19 @@ func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocatio
}

func (a *AllocationChangeCollector) CommitToFileStore(ctx context.Context) error {
commitCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Can be configured at runtime, this number will depend on the number of active allocations
swg := sizedwaitgroup.New(5)
// Limit can be configured at runtime, this number will depend on the number of active allocations
eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(5)
mut := &sync.Mutex{}
var (
commitError error
errorMutex sync.Mutex
)
for _, change := range a.AllocationChanges {
select {
case <-commitCtx.Done():
return fmt.Errorf("commit to filestore failed: %s", commitError.Error())
default:
}
swg.Add()
go func(change AllocationChangeProcessor) {
err := change.CommitToFileStore(ctx, mut)
if err != nil && !errors.Is(common.ErrFileWasDeleted, err) {
cancel()
errorMutex.Lock()
commitError = err
errorMutex.Unlock()
}
swg.Done()
}(change)
allocChange := change
eg.Go(func() error {
return allocChange.CommitToFileStore(ctx, mut)
})
}
swg.Wait()
return commitError
logging.Logger.Info("Waiting for commit to filestore", zap.String("allocation_id", a.AllocationID))

return eg.Wait()
}

func (a *AllocationChangeCollector) DeleteChanges(ctx context.Context) {
Expand Down
8 changes: 8 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (mfs *MockFileStore) WriteFile(allocID, connID string,
}, nil
}

func (mfs *MockFileStore) WriteDataToTree(allocID, connID, fileName, filePathHash string, hasher *filestore.CommitHasher) error {
return nil
}

func (mfs *MockFileStore) CommitWrite(allocID, connID string, fileData *filestore.FileInputData) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -64,6 +68,10 @@ func (mfs *MockFileStore) GetFilePathSize(allocID, contentHash, thumbHash string
return 0, 0, nil
}

func (mfs *MockFileStore) GetTempFilePath(allocID, connID, fileName, filePathHash string) string {
return ""
}

func (mfs *MockFileStore) GetBlocksMerkleTreeForChallenge(cir *filestore.ChallengeReadBlockInput) (*filestore.ChallengeResponse, error) {
return nil, nil
}
Expand Down
Loading

0 comments on commit c2bd391

Please sign in to comment.