Skip to content

Commit

Permalink
Fix empty alloc cleanup (#1252)
Browse files Browse the repository at this point in the history
* Fix replace blobber

* Fix empty alloc cleanup
  • Loading branch information
Jayashsatolia403 authored Sep 19, 2023
1 parent c494c6f commit f1fc7d1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 1 deletion.
3 changes: 3 additions & 0 deletions code/go/0chain.net/blobber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func setupConfig(configDir string, deploymentMode int) {
config.Configuration.UpdateAllocationsInterval =
viper.GetDuration("update_allocations_interval")

config.Configuration.FinalizeAllocationsInterval =
viper.GetDuration("finalize_allocations_interval")

config.Configuration.MaxAllocationDirFiles =
viper.GetInt("max_dirs_files")
if config.Configuration.MaxAllocationDirFiles < 50000 {
Expand Down
1 change: 1 addition & 0 deletions code/go/0chain.net/blobber/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func setupWorkers(ctx context.Context) {
readmarker.SetupWorkers(ctx)
writemarker.SetupWorkers(ctx)
allocation.StartUpdateWorker(ctx, config.Configuration.UpdateAllocationsInterval)
allocation.StartFinalizeWorker(ctx, config.Configuration.FinalizeAllocationsInterval)
allocation.SetupWorkers(ctx)
updateCCTWorker(ctx)
}
Expand Down
55 changes: 55 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func StartUpdateWorker(ctx context.Context, interval time.Duration) {
go UpdateWorker(ctx, interval)
}

func StartFinalizeWorker(ctx context.Context, interval time.Duration) {
go FinalizeAllocationsWorker(ctx, interval)
}

// UpdateWorker updates all not finalized and not cleaned allocations
// requesting SC through REST API. The worker required to fetch allocations
// updates in DB.
Expand Down Expand Up @@ -55,6 +59,30 @@ func UpdateWorker(ctx context.Context, interval time.Duration) {
}
}

func FinalizeAllocationsWorker(ctx context.Context, interval time.Duration) {
logging.Logger.Info("start finalize allocations worker")

var tk = time.NewTicker(interval)
defer tk.Stop()

var (
tick = tk.C
quit = ctx.Done()
)

for {
select {
case <-tick:
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
finalizeExpiredAllocations(ctx)
return nil
})
case <-quit:
return
}
}
}

func waitOrQuit(ctx context.Context, d time.Duration) (quit bool) {
var tm = time.NewTimer(d)
defer tm.Stop()
Expand Down Expand Up @@ -157,6 +185,7 @@ func updateAllocation(ctx context.Context, a *Allocation, selfBlobberID string)
// send finalize allocation transaction
if shouldFinalize(sa) {
sendFinalizeAllocation(a.ID)
cleanupAllocation(ctx, a)
return
}

Expand All @@ -166,6 +195,18 @@ func updateAllocation(ctx context.Context, a *Allocation, selfBlobberID string)
}
}

func finalizeExpiredAllocations(ctx context.Context) {
var allocs, err = requestExpiredAllocations()
if err != nil {
logging.Logger.Error("requesting expired allocations from SC", zap.Error(err))
return
}

for _, allocID := range allocs {
sendFinalizeAllocation(allocID)
}
}

func requestAllocation(allocID string) (sa *transaction.StorageAllocation, err error) {
var b []byte
b, err = transaction.MakeSCRestAPICall(
Expand All @@ -181,6 +222,20 @@ func requestAllocation(allocID string) (sa *transaction.StorageAllocation, err e
return
}

func requestExpiredAllocations() (allocs []string, err error) {
var b []byte
b, err = transaction.MakeSCRestAPICall(
transaction.STORAGE_CONTRACT_ADDRESS,
"/expired-allocations",
map[string]string{"blobber_id": node.Self.ID},
chain.GetServerChain())
if err != nil {
return
}
err = json.Unmarshal(b, &allocs)
return
}

func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.StorageAllocation) (ua *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)
var changed bool = a.Tx != sa.Tx
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func SetupDefaultConfig() {
viper.SetDefault("service_charge", 0.3)

viper.SetDefault("update_allocations_interval", time.Duration(-1))
viper.SetDefault("finalize_allocations_interval", time.Duration(-1))
}

/*SetupConfig - setup the configuration system */
Expand Down Expand Up @@ -101,7 +102,8 @@ type Config struct {
// WriteMarkerLockTimeout lock is released automatically if it is timeout
WriteMarkerLockTimeout time.Duration

UpdateAllocationsInterval time.Duration
UpdateAllocationsInterval time.Duration
FinalizeAllocationsInterval time.Duration

MaxAllocationDirFiles int

Expand Down
3 changes: 3 additions & 0 deletions config/0chain_blobber.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ price_worker_in_hours: 12
# update_allocations_interval used to refresh known allocation objects from SC
update_allocations_interval: 1m

#finalize_allocations_interval used to get and finalize empty allocations
finalize_allocations_interval: 24h

# maximum limit on the number of combined directories and files on each allocation
max_dirs_files: 50000

Expand Down
3 changes: 3 additions & 0 deletions docker.local/conductor-config/0chain_blobber.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ write_lock_timeout: 1m
# update_allocations_interval used to refresh known allocation objects from SC
update_allocations_interval: 1m

#finalize_allocations_interval used to get and finalize empty allocations
finalize_allocations_interval: 24h

# maximum limit on the number of combined directories and files on each allocation
max_dirs_files: 50000

Expand Down

0 comments on commit f1fc7d1

Please sign in to comment.