Skip to content

Commit

Permalink
Merge branch 'sprint-1.11' into fix/empty-alloc-cleanup
Browse files Browse the repository at this point in the history
# Conflicts:
#	code/go/0chain.net/blobbercore/allocation/workers.go
  • Loading branch information
Jayashsatolia403 committed Sep 19, 2023
2 parents 23a788d + c494c6f commit ed51d79
Show file tree
Hide file tree
Showing 39 changed files with 851 additions and 506 deletions.
3 changes: 3 additions & 0 deletions code/go/0chain.net/blobber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func setupConfig(configDir string, deploymentMode int) {
config.Configuration.MinConfirmation = 100
}

config.Configuration.BlockLimitDaily = viper.GetInt64("rate_limiters.block_limit_daily")
config.Configuration.BlockLimitRequest = viper.GetInt64("rate_limiters.block_limit_request")

transaction.MinConfirmation = config.Configuration.MinConfirmation

fmt.Print(" [OK]\n")
Expand Down
40 changes: 30 additions & 10 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"github.com/remeh/sizedwaitgroup"

"go.uber.org/zap"
"gorm.io/gorm"
Expand All @@ -29,7 +30,7 @@ const (

// AllocationChangeProcessor request transaction of file operation. it is president in postgres, and can be rebuilt for next http reqeust(eg CommitHandler)
type AllocationChangeProcessor interface {
CommitToFileStore(ctx context.Context) error
CommitToFileStore(ctx context.Context, mut *sync.Mutex) error
DeleteTempFile() error
ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, allocationRoot string,
ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error)
Expand Down Expand Up @@ -204,31 +205,50 @@ func (cc *AllocationChangeCollector) ComputeProperties() {
}

func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocationRoot string,
ts common.Timestamp, fileIDMeta map[string]string) error {
ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {
rootRef, err := cc.GetRootRef(ctx)
logging.Logger.Info("GetRootRef", zap.Any("rootRef", rootRef))
if err != nil {
return err
return rootRef, err
}
for idx, change := range cc.Changes {
changeProcessor := cc.AllocationChanges[idx]
_, err := changeProcessor.ApplyChange(ctx, rootRef, change, allocationRoot, ts, fileIDMeta)
if err != nil {
return err
return rootRef, err
}
}
_, err = rootRef.CalculateHash(ctx, true)
return err
collector := reference.NewCollector(len(cc.Changes))
_, err = rootRef.CalculateHash(ctx, true, collector)
if err != nil {
return rootRef, err
}
err = collector.Finalize(ctx)
return rootRef, err
}

func (a *AllocationChangeCollector) CommitToFileStore(ctx context.Context) error {

commitCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Can be configured at runtime, this number will depend on the number of active allocations
swg := sizedwaitgroup.New(5)
mut := &sync.Mutex{}
for _, change := range a.AllocationChanges {
err := change.CommitToFileStore(ctx)
if err != nil {
return err
select {
case <-commitCtx.Done():
return fmt.Errorf("commit to filestore failed")
default:
}
swg.Add()
go func(change AllocationChangeProcessor) {
err := change.CommitToFileStore(ctx, mut)
if err != nil && !errors.Is(common.ErrFileWasDeleted, err) {
cancel()
}
swg.Done()
}(change)
}
swg.Wait()
return nil
}

Expand Down
Loading

0 comments on commit ed51d79

Please sign in to comment.