Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/loop break #1322

Merged
merged 8 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobbercore/challenge/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
Expand Down Expand Up @@ -106,6 +107,7 @@ type ChallengeEntity struct {
RoundCreatedAt int64 `gorm:"round_created_at" json:"round_created_at"`
CreatedAt common.Timestamp `gorm:"created_at" json:"created"`
UpdatedAt time.Time `gorm:"updated_at;type:timestamp without time zone;not null;default:current_timestamp" json:"-"`
statusMutex *sync.Mutex `gorm:"-" json:"-"`
}

func (ChallengeEntity) TableName() string {
Expand Down
6 changes: 6 additions & 0 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (cr *ChallengeEntity) CancelChallenge(ctx context.Context, errReason error)
cancellation := time.Now()
db := datastore.GetStore().GetTransaction(ctx)
deleteChallenge(cr.RoundCreatedAt)
cr.statusMutex.Lock()
cr.Status = Cancelled
cr.statusMutex.Unlock()
cr.StatusMessage = errReason.Error()
cr.UpdatedAt = cancellation.UTC()
if err := db.Save(cr).Error; err != nil {
Expand Down Expand Up @@ -311,7 +313,9 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
accessMu.RUnlock()
if numSuccess > (len(cr.Validators) / 2) {
cr.Result = ChallengeSuccess
cr.statusMutex.Lock()
cr.Status = Processed
cr.statusMutex.Unlock()
cr.UpdatedAt = time.Now().UTC()
} else {
cr.CancelChallenge(ctx, ErrNoConsensusChallenge)
Expand Down Expand Up @@ -378,7 +382,9 @@ func IsEntityNotFoundError(err error) bool {
}

func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transaction.Transaction, toAdd bool) {
cr.statusMutex.Lock()
cr.Status = Committed
cr.statusMutex.Unlock()
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
cr.UpdatedAt = time.Now().UTC()
Expand Down
21 changes: 19 additions & 2 deletions code/go/0chain.net/blobbercore/challenge/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func processChallenge(ctx context.Context, it *ChallengeEntity) {
func commitOnChainWorker(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
logging.Logger.Error("[commitWorker]challenge", zap.Any("err", r))
logging.Logger.Error("[commitWorker]challenge recovery", zap.Any("err", r))
}
}()
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -216,17 +216,33 @@ func getBatch(batchSize int) (chall []ChallengeEntity) {
return
}

var toClean []int64
it := challengeMap.Iterator()
for it.Next() {
if len(chall) >= batchSize {
break
}
ticket := it.Value().(*ChallengeEntity)
if ticket.Status != Processed {
ticket.statusMutex.Lock()
switch ticket.Status {
case Committed:
case Cancelled:
logging.Logger.Warn("committing_challenge_tickets: ticket with the final status, ignore it", zap.String("challenge_id", ticket.ChallengeID))
toClean = append(toClean, ticket.RoundCreatedAt)
continue
case Accepted:
//reached the tail of challenges
break
case Processed:
default:
}
chall = append(chall, *ticket)
ticket.statusMutex.Unlock()
}
for _, r := range toClean {
challengeMap.Remove(r)
}

return
}

Expand All @@ -247,6 +263,7 @@ func (it *ChallengeEntity) createChallenge(ctx context.Context) bool {
logging.Logger.Info("createChallenge", zap.String("challenge_id", it.ChallengeID), zap.String("status", "already exists"))
return false
}
it.statusMutex = &sync.Mutex{}
challengeMap.Put(it.RoundCreatedAt, it)
return true
}
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565 h1:z+DtCR8mBsjPnEs
github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565/go.mod h1:UyDC8Qyl5z9lGkCnf9RHJPMektnFX8XtCJZHXCCVj8E=
github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM=
github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc=
github.com/0chain/gosdk v1.10.1-0.20231031062218-5a85b2b91102 h1:xb3pgOMIfApo6LMSuTHuNjM11oLRegq0Sq3jJvY4z80=
github.com/0chain/gosdk v1.10.1-0.20231031062218-5a85b2b91102/go.mod h1:d3YLgnSKvEtZ6I9taIQotb6xcMvUKvd01JnkXS+O1jk=
github.com/0chain/gosdk v1.10.1-0.20231109152515-7c8549873ea5 h1:aOigkqa22jF/c7VLWUFCDhJf0kiNG8wr/ybLP2Q4YEE=
github.com/0chain/gosdk v1.10.1-0.20231109152515-7c8549873ea5/go.mod h1:d3YLgnSKvEtZ6I9taIQotb6xcMvUKvd01JnkXS+O1jk=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down