Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/challenge issue #774

Merged
merged 28 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
315c23e
Avoid dot importing
lpoli Jul 31, 2022
b050a7f
Fix connection leakage
lpoli Jul 31, 2022
8af5adb
Use goroutines to request validators
lpoli Jul 31, 2022
a8c432d
Add lock for allocation changes
lpoli Jul 31, 2022
1d58eab
Remove unnecessary check
lpoli Jul 31, 2022
1b18c48
Avoid returning error on blank allocation
lpoli Jul 31, 2022
9c9798a
Fix tag
lpoli Jul 31, 2022
a82d998
Fix tag
lpoli Jul 31, 2022
fb11e09
Merge branch 'staging' into fix/challenge-issue
lpoli Aug 1, 2022
31f14e8
Fix stat for empty allocation challenge pass
lpoli Aug 1, 2022
0eea088
Fix unlock issue
lpoli Aug 1, 2022
66e3547
Add error log and remove stupid comment
lpoli Aug 2, 2022
74421e4
Remove lock to check system-tests
lpoli Aug 2, 2022
669d74b
Commit/Rollback in defer function
lpoli Aug 2, 2022
f50d0c3
Undo lock comment
lpoli Aug 2, 2022
62664a3
Reduce transaction creation for select query
lpoli Aug 3, 2022
2fd0179
Avoid unrequired use of db transaction
lpoli Aug 3, 2022
dc51cf9
Rename logging variable
lpoli Aug 3, 2022
a7fcedb
Use function to update map
lpoli Aug 3, 2022
abf6292
Merge branch 'staging' into fix/challenge-issue
lpoli Aug 3, 2022
8109937
Update as per staging branch
lpoli Aug 3, 2022
662a196
Merge branch 'staging' into fix/challenge-issue
lpoli Aug 3, 2022
b44a40a
Fix nil pointer issue
lpoli Aug 4, 2022
d5e8eba
Fix pinter issue
lpoli Aug 4, 2022
2a0a3c1
chore(git): merge staging
cnlangzi Aug 5, 2022
d0b2104
Add status to challenges without validators
lpoli Aug 5, 2022
26a4a6c
Merge branch 'fix/challenge-issue' of https://github.com/0chain/blobb…
lpoli Aug 5, 2022
907f825
Add time taken log to submit challenge to BC
lpoli Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ func findAllocations(ctx context.Context, offset int64) (allocs []*Allocation, c

err = tx.Model(&Allocation{}).Where(query).Count(&count).Error
if err != nil {
Logger.Error(err.Error())
return
}

allocs = make([]*Allocation, 0) // have to make for the GROM (stupid GORM)
allocs = make([]*Allocation, 0)
err = tx.Model(&Allocation{}).
Where(query).
Limit(UPDATE_LIMIT).
Expand Down
134 changes: 65 additions & 69 deletions code/go/0chain.net/blobbercore/challenge/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func syncOpenChallenges(ctx context.Context) {

var blobberChallenges BCChallengeResponse
blobberChallenges.Challenges = make([]*ChallengeEntity, 0)

startTime := time.Now()
retBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain())
retBytes, err := transaction.MakeSCRestAPICall(
transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain())

if err != nil {
logging.Logger.Error("[challenge]open: ", zap.Error(err))
Expand All @@ -54,6 +54,7 @@ func syncOpenChallenges(ctx context.Context) {
bytesReader := bytes.NewBuffer(retBytes)
d := json.NewDecoder(bytesReader)
d.UseNumber()

if err := d.Decode(&blobberChallenges); err != nil {
logging.Logger.Error("[challenge]json: ", zap.String("resp", string(retBytes)), zap.Error(err))
return
Expand Down Expand Up @@ -135,38 +136,38 @@ func processAccepted(ctx context.Context) {
}()

db := datastore.GetStore().GetDB()

challenges := make([]*ChallengeEntity, 0)
db.Where(ChallengeEntity{Status: Accepted}).Find(&challenges)
if len(challenges) > 0 {

startTime := time.Now()
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
startTime := time.Now()
for _, c := range challenges {
logging.Logger.Info("[challenge]process: ",
zap.String("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt))

swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
for _, c := range challenges {
err := c.UnmarshalFields()
if err != nil {
logging.Logger.Info("[challenge]process: ",
zap.String("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt))
err := c.UnmarshalFields()
if err != nil {
logging.Logger.Error("[challenge]process: ",
zap.String("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt),
zap.String("validators", string(c.ValidatorsString)),
zap.String("lastCommitTxnList", string(c.LastCommitTxnList)),
zap.String("validationTickets", string(c.ValidationTicketsString)),
zap.String("ObjectPath", string(c.ObjectPathString)),
zap.Error(err))
continue
}
swg.Add()
go validateChallenge(&swg, c)
zap.Time("created", c.CreatedAt),
zap.String("validators", string(c.ValidatorsString)),
zap.String("lastCommitTxnList", string(c.LastCommitTxnList)),
zap.String("validationTickets", string(c.ValidationTicketsString)),
zap.String("ObjectPath", string(c.ObjectPathString)),
zap.Error(err))
continue
}
swg.Wait()

logging.Logger.Info("[challenge]elapsed:process ",
zap.Int("count", len(challenges)),
zap.String("save", time.Since(startTime).String()))
swg.Add()
go validateChallenge(&swg, c)
}
swg.Wait()

logging.Logger.Info("[challenge]elapsed:process ",
zap.Int("count", len(challenges)),
zap.String("save", time.Since(startTime).String()))
}

func validateChallenge(swg *sizedwaitgroup.SizedWaitGroup, c *ChallengeEntity) {
Expand All @@ -175,24 +176,23 @@ func validateChallenge(swg *sizedwaitgroup.SizedWaitGroup, c *ChallengeEntity) {
startTime := time.Now()

ctx := datastore.GetStore().CreateTransaction(context.TODO())
defer ctx.Done()
tx := datastore.GetStore().GetTransaction(ctx)

db := datastore.GetStore().GetTransaction(ctx)
if err := c.LoadValidationTickets(ctx); err != nil {
logging.Logger.Error("[challenge]validate: ",
zap.Any("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt),
zap.Error(err))
db.Rollback()
return
}
var err error
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
}()

if err := db.Commit().Error; err != nil {
logging.Logger.Error("[challenge]validate(Commit): ",
err = c.LoadValidationTickets(ctx)
if err != nil {
logging.Logger.Error("[challenge]validate: ",
zap.Any("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt),
zap.Error(err))
db.Rollback()
return
}

Expand All @@ -214,31 +214,29 @@ func commitProcessed(ctx context.Context) {
}
}()

db := datastore.GetStore().GetDB()
var challenges []*ChallengeEntity

db := datastore.GetStore().GetDB()
db.Where(ChallengeEntity{Status: Processed}).
Order("sequence").
Find(&challenges)

if len(challenges) > 0 {

startTime := time.Now()

swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
for _, challenge := range challenges {
swg.Add()
go func(c *ChallengeEntity) {
defer swg.Done()
commitChallenge(c)
}(challenge)
}
swg.Wait()
startTime := time.Now()

logging.Logger.Info("[challenge]elapsed:commit ",
zap.Int("count", len(challenges)),
zap.String("save", time.Since(startTime).String()))
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
for _, challenge := range challenges {
swg.Add()
go func(c *ChallengeEntity) {
defer swg.Done()
commitChallenge(c)
}(challenge)
}
swg.Wait()

logging.Logger.Info("[challenge]elapsed:commit ",
zap.Int("count", len(challenges)),
zap.String("save", time.Since(startTime).String()))
swg.Wait()
}

func commitChallenge(c *ChallengeEntity) {
Expand All @@ -259,28 +257,26 @@ func commitChallenge(c *ChallengeEntity) {
zap.String("validationTickets", string(c.ValidationTicketsString)),
zap.String("ObjectPath", string(c.ObjectPathString)),
zap.Error(err))
return
}

ctx := datastore.GetStore().CreateTransaction(context.TODO())
defer ctx.Done()

db := datastore.GetStore().GetTransaction(ctx)
tx := datastore.GetStore().GetTransaction(ctx)
var err error
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
}()

if err := c.CommitChallenge(ctx, false); err != nil {
err = c.CommitChallenge(ctx, false)
if err != nil {
logging.Logger.Error("[challenge]commit",
zap.String("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt),
zap.Error(err))
db.Rollback()
return
}

if err := db.Commit().Error; err != nil {
logging.Logger.Warn("[challenge]commit",
zap.Any("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt),
zap.Error(err))
db.Rollback()
return
}

Expand Down
Loading