diff --git a/code/go/0chain.net/blobbercore/challenge/entity.go b/code/go/0chain.net/blobbercore/challenge/entity.go index 411b69760..ab95dec3e 100644 --- a/code/go/0chain.net/blobbercore/challenge/entity.go +++ b/code/go/0chain.net/blobbercore/challenge/entity.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -106,6 +107,7 @@ type ChallengeEntity struct { RoundCreatedAt int64 `gorm:"round_created_at" json:"round_created_at"` CreatedAt common.Timestamp `gorm:"created_at" json:"created"` UpdatedAt time.Time `gorm:"updated_at;type:timestamp without time zone;not null;default:current_timestamp" json:"-"` + statusMutex *sync.Mutex `gorm:"-" json:"-"` } func (ChallengeEntity) TableName() string { diff --git a/code/go/0chain.net/blobbercore/challenge/protocol.go b/code/go/0chain.net/blobbercore/challenge/protocol.go index e1c80ff9d..affb1c613 100644 --- a/code/go/0chain.net/blobbercore/challenge/protocol.go +++ b/code/go/0chain.net/blobbercore/challenge/protocol.go @@ -49,7 +49,9 @@ func (cr *ChallengeEntity) CancelChallenge(ctx context.Context, errReason error) cancellation := time.Now() db := datastore.GetStore().GetTransaction(ctx) deleteChallenge(cr.RoundCreatedAt) + cr.statusMutex.Lock() cr.Status = Cancelled + cr.statusMutex.Unlock() cr.StatusMessage = errReason.Error() cr.UpdatedAt = cancellation.UTC() if err := db.Save(cr).Error; err != nil { @@ -311,7 +313,9 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { accessMu.RUnlock() if numSuccess > (len(cr.Validators) / 2) { cr.Result = ChallengeSuccess + cr.statusMutex.Lock() cr.Status = Processed + cr.statusMutex.Unlock() cr.UpdatedAt = time.Now().UTC() } else { cr.CancelChallenge(ctx, ErrNoConsensusChallenge) @@ -378,7 +382,9 @@ func IsEntityNotFoundError(err error) bool { } func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transaction.Transaction, toAdd bool) { + cr.statusMutex.Lock() cr.Status = Committed + cr.statusMutex.Unlock() cr.StatusMessage = t.TransactionOutput cr.CommitTxnID = t.Hash cr.UpdatedAt = time.Now().UTC() diff --git a/code/go/0chain.net/blobbercore/challenge/worker.go b/code/go/0chain.net/blobbercore/challenge/worker.go index 937cea7cd..c4a913661 100644 --- a/code/go/0chain.net/blobbercore/challenge/worker.go +++ b/code/go/0chain.net/blobbercore/challenge/worker.go @@ -154,7 +154,7 @@ func processChallenge(ctx context.Context, it *ChallengeEntity) { func commitOnChainWorker(ctx context.Context) { defer func() { if r := recover(); r != nil { - logging.Logger.Error("[commitWorker]challenge", zap.Any("err", r)) + logging.Logger.Error("[commitWorker]challenge recovery", zap.Any("err", r)) } }() wg := sync.WaitGroup{} @@ -216,17 +216,33 @@ func getBatch(batchSize int) (chall []ChallengeEntity) { return } + var toClean []int64 it := challengeMap.Iterator() for it.Next() { if len(chall) >= batchSize { break } ticket := it.Value().(*ChallengeEntity) - if ticket.Status != Processed { + ticket.statusMutex.Lock() + switch ticket.Status { + case Committed: + case Cancelled: + logging.Logger.Warn("committing_challenge_tickets: ticket with the final status, ignore it", zap.String("challenge_id", ticket.ChallengeID)) + toClean = append(toClean, ticket.RoundCreatedAt) + continue + case Accepted: + //reached the tail of challenges break + case Processed: + default: } chall = append(chall, *ticket) + ticket.statusMutex.Unlock() } + for _, r := range toClean { + challengeMap.Remove(r) + } + return } @@ -247,6 +263,7 @@ func (it *ChallengeEntity) createChallenge(ctx context.Context) bool { logging.Logger.Info("createChallenge", zap.String("challenge_id", it.ChallengeID), zap.String("status", "already exists")) return false } + it.statusMutex = &sync.Mutex{} challengeMap.Put(it.RoundCreatedAt, it) return true } diff --git a/go.sum b/go.sum index 11b216f17..7cd9c50af 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,6 @@ github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565 h1:z+DtCR8mBsjPnEs github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565/go.mod h1:UyDC8Qyl5z9lGkCnf9RHJPMektnFX8XtCJZHXCCVj8E= github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM= github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc= -github.com/0chain/gosdk v1.10.1-0.20231031062218-5a85b2b91102 h1:xb3pgOMIfApo6LMSuTHuNjM11oLRegq0Sq3jJvY4z80= -github.com/0chain/gosdk v1.10.1-0.20231031062218-5a85b2b91102/go.mod h1:d3YLgnSKvEtZ6I9taIQotb6xcMvUKvd01JnkXS+O1jk= github.com/0chain/gosdk v1.10.1-0.20231109152515-7c8549873ea5 h1:aOigkqa22jF/c7VLWUFCDhJf0kiNG8wr/ybLP2Q4YEE= github.com/0chain/gosdk v1.10.1-0.20231109152515-7c8549873ea5/go.mod h1:d3YLgnSKvEtZ6I9taIQotb6xcMvUKvd01JnkXS+O1jk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=