Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/challenge issue #774

Merged
merged 28 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
315c23e
Avoid dot importing
lpoli Jul 31, 2022
b050a7f
Fix connection leakage
lpoli Jul 31, 2022
8af5adb
Use goroutines to request validators
lpoli Jul 31, 2022
a8c432d
Add lock for allocation changes
lpoli Jul 31, 2022
1d58eab
Remove unnecessary check
lpoli Jul 31, 2022
1b18c48
Avoid returning error on blank allocation
lpoli Jul 31, 2022
9c9798a
Fix tag
lpoli Jul 31, 2022
a82d998
Fix tag
lpoli Jul 31, 2022
fb11e09
Merge branch 'staging' into fix/challenge-issue
lpoli Aug 1, 2022
31f14e8
Fix stat for empty allocation challenge pass
lpoli Aug 1, 2022
0eea088
Fix unlock issue
lpoli Aug 1, 2022
66e3547
Add error log and remove stupid comment
lpoli Aug 2, 2022
74421e4
Remove lock to check system-tests
lpoli Aug 2, 2022
669d74b
Commit/Rollback in defer function
lpoli Aug 2, 2022
f50d0c3
Undo lock comment
lpoli Aug 2, 2022
62664a3
Reduce transaction creation for select query
lpoli Aug 3, 2022
2fd0179
Avoid unrequired use of db transaction
lpoli Aug 3, 2022
dc51cf9
Rename logging variable
lpoli Aug 3, 2022
a7fcedb
Use function to update map
lpoli Aug 3, 2022
abf6292
Merge branch 'staging' into fix/challenge-issue
lpoli Aug 3, 2022
8109937
Update as per staging branch
lpoli Aug 3, 2022
662a196
Merge branch 'staging' into fix/challenge-issue
lpoli Aug 3, 2022
b44a40a
Fix nil pointer issue
lpoli Aug 4, 2022
d5e8eba
Fix pinter issue
lpoli Aug 4, 2022
2a0a3c1
chore(git): merge staging
cnlangzi Aug 5, 2022
d0b2104
Add status to challenges without validators
lpoli Aug 5, 2022
26a4a6c
Merge branch 'fix/challenge-issue' of https://github.com/0chain/blobb…
lpoli Aug 5, 2022
907f825
Add time taken log to submit challenge to BC
lpoli Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions code/go/0chain.net/blobbercore/challenge/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
"github.com/remeh/sizedwaitgroup"
"go.uber.org/zap"
"gorm.io/gorm"

"github.com/0chain/blobber/code/go/0chain.net/core/logging"
)
Expand All @@ -36,7 +35,8 @@ func syncOpenChallenges(ctx context.Context) {

var blobberChallenges BCChallengeResponse
blobberChallenges.Challenges = make([]*ChallengeEntity, 0)
retBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain())
retBytes, err := transaction.MakeSCRestAPICall(
transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain())

if err != nil {
logging.Logger.Error("[challenge]open: ", zap.Error(err))
Expand All @@ -46,6 +46,7 @@ func syncOpenChallenges(ctx context.Context) {
bytesReader := bytes.NewBuffer(retBytes)
d := json.NewDecoder(bytesReader)
d.UseNumber()

if err := d.Decode(&blobberChallenges); err != nil {
logging.Logger.Error("[challenge]json: ", zap.String("resp", string(retBytes)), zap.Error(err))
return
Expand All @@ -70,12 +71,14 @@ func saveNewChallenge(c *ChallengeEntity, ctx context.Context) {
}
}()

db := datastore.GetStore().GetDB()
if Exists(db, c.ChallengeID) {
ctx = datastore.GetStore().CreateTransaction(ctx)
tx := datastore.GetStore().GetTransaction(ctx)

if Exists(tx, c.ChallengeID) {
return
}

lastChallengeID, err := getLastChallengeID(db)
lastChallengeID, err := getLastChallengeID(tx)

if err != nil {
logging.Logger.Error("[challenge]add(get_latest_challenge_id): ", zap.Error(err))
Expand All @@ -98,15 +101,18 @@ func saveNewChallenge(c *ChallengeEntity, ctx context.Context) {
zap.String("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt))

if err := db.Transaction(func(tx *gorm.DB) error {
return c.SaveWith(tx)
}); err != nil {
if err = c.SaveWith(tx); err != nil {
logging.Logger.Error("[challenge]add: ",
zap.String("challenge_id", c.ChallengeID),
zap.Time("created", c.CreatedAt),
zap.Error(err))

tx.Rollback()
return
}

tx.Commit()

}

// processAccepted read accepted challenge from db, and send them to validator to pass challenge
Expand Down
139 changes: 87 additions & 52 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"math"
"math/rand"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
Expand All @@ -13,9 +14,11 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
. "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/lock"
zlogger "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
"github.com/0chain/blobber/code/go/0chain.net/core/util"
"github.com/remeh/sizedwaitgroup"

"go.uber.org/zap"
)
Expand All @@ -39,19 +42,19 @@ func (cr *ChallengeEntity) SubmitChallengeToBC(ctx context.Context) (*transactio

err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS, transaction.CHALLENGE_RESPONSE, sn, 0)
if err != nil {
Logger.Info("Failed submitting challenge to the mining network", zap.String("err:", err.Error()))
zlogger.Logger.Info("Failed submitting challenge to the mining network", zap.String("err:", err.Error()))
return nil, err
}

Logger.Info("Verifying challenge response to blockchain.", zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID))
zlogger.Logger.Info("Verifying challenge response to blockchain.", zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID))
time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)

t, err := transaction.VerifyTransaction(txn.Hash, chain.GetServerChain())
if err != nil {
Logger.Error("Error verifying the challenge response transaction", zap.String("err:", err.Error()), zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID))
zlogger.Logger.Error("Error verifying the challenge response transaction", zap.String("err:", err.Error()), zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID))
return txn, err
}
Logger.Info("Challenge committed and accepted", zap.Any("txn.hash", t.Hash), zap.Any("txn.output", t.TransactionOutput), zap.String("challenge_id", cr.ChallengeID))
zlogger.Logger.Info("Challenge committed and accepted", zap.Any("txn.hash", t.Hash), zap.Any("txn.output", t.TransactionOutput), zap.String("challenge_id", cr.ChallengeID))
return t, nil
}

Expand All @@ -60,7 +63,7 @@ func (cr *ChallengeEntity) ErrorChallenge(ctx context.Context, err error) {
cr.UpdatedAt = time.Now().UTC()

if err := cr.Save(ctx); err != nil {
Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
zlogger.Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
}

Expand All @@ -71,7 +74,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
cr.UpdatedAt = time.Now().UTC()

if err := cr.Save(ctx); err != nil {
Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
zlogger.Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
return common.NewError("no_validators", "No validators assigned to the challenge")
}
Expand All @@ -81,6 +84,10 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
return err
}

// Lock allocation changes from happening in handler.CommitWrite function
allocMu := lock.GetMutex(allocationObj.TableName(), allocationObj.ID)
allocMu.Lock()

wms, err := writemarker.GetWriteMarkersInRange(ctx, cr.AllocationID, cr.AllocationRoot, allocationObj.AllocationRoot)
if err != nil {
return err
Expand All @@ -90,29 +97,26 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}

rootRef, err := reference.GetReference(ctx, cr.AllocationID, "/")
if err != nil {
cr.ErrorChallenge(ctx, err)
return err
}

blockNum := int64(0)
if rootRef.NumBlocks > 0 {
r := rand.New(rand.NewSource(cr.RandomNumber))
blockNum = r.Int63n(rootRef.NumBlocks)
blockNum++
cr.BlockNum = blockNum
} else {
err = common.NewError("allocation_is_blank", "Got a challenge for a blank allocation")
cr.ErrorChallenge(ctx, err)
return err
}

if err != nil {
cr.ErrorChallenge(ctx, err)
return err
}

Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
zlogger.Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
objectPath, err := reference.GetObjectPath(ctx, cr.AllocationID, blockNum)
if err != nil {
cr.ErrorChallenge(ctx, err)
return err
}

cr.RefID = objectPath.RefID
cr.RespondedAllocationRoot = allocationObj.AllocationRoot
cr.ObjectPath = objectPath
Expand All @@ -131,7 +135,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {

if blockNum > 0 {
if objectPath.Meta["type"] != reference.FILE {
Logger.Info("Block number to be challenged for file:", zap.Any("block", objectPath.FileBlockNum), zap.Any("meta", objectPath.Meta), zap.Any("obejct_path", objectPath))
zlogger.Logger.Info("Block number to be challenged for file:", zap.Any("block", objectPath.FileBlockNum), zap.Any("meta", objectPath.Meta), zap.Any("obejct_path", objectPath))
err = common.NewError("invalid_object_path", "Object path was not for a file")
cr.ErrorChallenge(ctx, err)
return err
Expand Down Expand Up @@ -169,53 +173,84 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
postData["chunk_size"] = objectPath.ChunkSize
}

allocMu.Unlock()

postDataBytes, err := json.Marshal(postData)
if err != nil {
Logger.Error("[db]form: " + err.Error())
zlogger.Logger.Error("[db]form: " + err.Error())
cr.ErrorChallenge(ctx, err)
return err
}
responses := make(map[string]ValidationTicket)
if cr.ValidationTickets == nil {
cr.ValidationTickets = make([]*ValidationTicket, len(cr.Validators))
}

swg := sizedwaitgroup.New(10)
accessMu := sync.Mutex{}
for i, validator := range cr.Validators {
if cr.ValidationTickets[i] != nil {
exisitingVT := cr.ValidationTickets[i]
if len(exisitingVT.Signature) > 0 && exisitingVT.ChallengeID == cr.ChallengeID {
if exisitingVT.Signature != "" && exisitingVT.ChallengeID == cr.ChallengeID {
continue
}
}

url := validator.URL + VALIDATOR_URL

resp, err := util.SendPostRequest(url, postDataBytes, nil)
if err != nil {
Logger.Info("[challenge]post: ", zap.Any("error", err.Error()))
delete(responses, validator.ID)
cr.ValidationTickets[i] = nil
continue
}
var validationTicket ValidationTicket
err = json.Unmarshal(resp, &validationTicket)
if err != nil {
Logger.Error("[challenge]resp: ", zap.String("validator", validator.ID), zap.Any("resp", string(resp)), zap.Any("error", err.Error()))
delete(responses, validator.ID)
cr.ValidationTickets[i] = nil
continue
}
Logger.Info("[challenge]resp: Got response from the validator.", zap.Any("validator_response", validationTicket))
verified, err := validationTicket.VerifySign()
if err != nil || !verified {
Logger.Error("[challenge]ticket: Validation ticket from validator could not be verified.", zap.String("validator", validator.ID))
delete(responses, validator.ID)
cr.ValidationTickets[i] = nil
continue
}
responses[validator.ID] = validationTicket
cr.ValidationTickets[i] = &validationTicket
swg.Add()
go func(url, validatorID string, i int) {
defer swg.Done()

resp, err := util.SendPostRequest(url, postDataBytes, nil)
if err != nil {
zlogger.Logger.Info("[challenge]post: ", zap.Any("error", err.Error()))
delete(responses, validatorID)
cr.ValidationTickets[i] = nil
return
}
var validationTicket ValidationTicket
err = json.Unmarshal(resp, &validationTicket)
if err != nil {
zlogger.Logger.Error(
"[challenge]resp: ",
zap.String("validator",
validatorID),
zap.Any("resp", string(resp)),
zap.Any("error", err.Error()),
)
accessMu.Lock()
delete(responses, validatorID)
cr.ValidationTickets[i] = nil
accessMu.Unlock()
return
}
zlogger.Logger.Info(
"[challenge]resp: Got response from the validator.",
zap.Any("validator_response", validationTicket),
)
verified, err := validationTicket.VerifySign()
if err != nil || !verified {
zlogger.Logger.Error(
"[challenge]ticket: Validation ticket from validator could not be verified.",
zap.String("validator",
validatorID),
)
accessMu.Lock()
delete(responses, validatorID)
cr.ValidationTickets[i] = nil
accessMu.Unlock()
return
}
accessMu.Lock()
responses[validatorID] = validationTicket
cr.ValidationTickets[i] = &validationTicket
accessMu.Unlock()
}(url, validator.ID, i)
}

swg.Wait()

numSuccess := 0
numFailure := 0

Expand All @@ -225,21 +260,21 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
if vt.Result {
numSuccess++
} else {
Logger.Error("[challenge]ticket: "+vt.Message, zap.String("validator", vt.ValidatorID))
zlogger.Logger.Error("[challenge]ticket: "+vt.Message, zap.String("validator", vt.ValidatorID))
numFailure++
}
numValidatorsResponded++
}
}

Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses))
zlogger.Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses))
if numSuccess > (len(cr.Validators)/2) || numFailure > (len(cr.Validators)/2) || numValidatorsResponded == len(cr.Validators) {
if numSuccess > (len(cr.Validators) / 2) {
cr.Result = ChallengeSuccess
} else {
cr.Result = ChallengeFailure

Logger.Error("[challenge]validate: ", zap.String("challenge_id", cr.ChallengeID), zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath))
zlogger.Logger.Error("[challenge]validate: ", zap.String("challenge_id", cr.ChallengeID), zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath))
}

cr.Status = Processed
Expand All @@ -255,20 +290,20 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) error {
if len(cr.LastCommitTxnIDs) > 0 {
for _, lastTxn := range cr.LastCommitTxnIDs {
Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn)
zlogger.Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn)
t, err := transaction.VerifyTransaction(lastTxn, chain.GetServerChain())
if err == nil {
cr.Status = Committed
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
cr.UpdatedAt = time.Now().UTC()
if err := cr.Save(ctx); err != nil {
Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
zlogger.Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID)
return nil
}
Logger.Error("[challenge]trans: Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
zlogger.Logger.Error("[challenge]trans: Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
}

Expand All @@ -284,7 +319,7 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool)
cr.UpdatedAt = time.Now().UTC()
}
cr.ErrorChallenge(ctx, err)
Logger.Error("[challenge]submit: Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
zlogger.Logger.Error("[challenge]submit: Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
} else {
cr.Status = Committed
cr.StatusMessage = t.TransactionOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewError("invalid_parameters", "Invalid connection id passed")
}

// Lock will compete with other CommitWrites and Challenge validation
mutex := lock.GetMutex(allocationObj.TableName(), allocationID)
mutex.Lock()
defer mutex.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/reference/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
type Ref struct {
ID int64 `gorm:"column:id;primaryKey"`
Type string `gorm:"column:type;size:1" dirlist:"type" filelist:"type"`
AllocationID string `gorm:"column:allocation_id;size:64;not null;index:idx_path_alloc,priority:1;index:idx_lookup_hash_alloc,priority:1"`
AllocationID string `gorm:"column:allocation_id;size:64;not null;index:idx_path_alloc,priority:1;index:idx_lookup_hash_alloc,priority:1" dirlist:"allocation_id" filelist:"allocation_id"`
LookupHash string `gorm:"column:lookup_hash;size:64;not null;index:idx_lookup_hash_alloc,priority:2" dirlist:"lookup_hash" filelist:"lookup_hash"`
Name string `gorm:"column:name;size:100;not null" dirlist:"name" filelist:"name"`
Path string `gorm:"column:path;size:1000;not null;index:idx_path_alloc,priority:2;index:path_idx" dirlist:"path" filelist:"path"`
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/validatorcore/storage/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DirMetaData struct {
Hash string `json:"hash" mapstructure:"hash"`
PathHash string `json:"path_hash" mapstructure:"path_hash"`
NumBlocks int64 `json:"num_of_blocks" mapstructure:"num_of_blocks"`
AllocationID string `json:"allocation_id"`
AllocationID string `json:"allocation_id" mapstructure:"allocation_id"`
Children []ObjectEntity `json:"-"`
}

Expand Down Expand Up @@ -283,7 +283,7 @@ func (cr *ChallengeRequest) VerifyChallenge(challengeObj *Challenge, allocationO
return common.NewError("challenge_validation_failed", "Failed to verify the object path."+err.Error())
}

if cr.WriteMarkers == nil || len(cr.WriteMarkers) == 0 {
if len(cr.WriteMarkers) == 0 {
return common.NewError("challenge_validation_failed", "Invalid write marker")
}

Expand Down