diff --git a/code/go/0chain.net/blobbercore/allocation/workers.go b/code/go/0chain.net/blobbercore/allocation/workers.go index ab72bc6ea..0675dbfb3 100644 --- a/code/go/0chain.net/blobbercore/allocation/workers.go +++ b/code/go/0chain.net/blobbercore/allocation/workers.go @@ -121,10 +121,11 @@ func findAllocations(ctx context.Context, offset int64) (allocs []*Allocation, c err = tx.Model(&Allocation{}).Where(query).Count(&count).Error if err != nil { + Logger.Error(err.Error()) return } - allocs = make([]*Allocation, 0) // have to make for the GROM (stupid GORM) + allocs = make([]*Allocation, 0) err = tx.Model(&Allocation{}). Where(query). Limit(UPDATE_LIMIT). diff --git a/code/go/0chain.net/blobbercore/challenge/protocol.go b/code/go/0chain.net/blobbercore/challenge/protocol.go index e5e429b0f..20826a1ed 100644 --- a/code/go/0chain.net/blobbercore/challenge/protocol.go +++ b/code/go/0chain.net/blobbercore/challenge/protocol.go @@ -5,6 +5,7 @@ import ( "encoding/json" "math" "math/rand" + "sync" "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" @@ -13,9 +14,11 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" - . "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "github.com/0chain/blobber/code/go/0chain.net/core/lock" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/blobber/code/go/0chain.net/core/transaction" "github.com/0chain/blobber/code/go/0chain.net/core/util" + "github.com/remeh/sizedwaitgroup" "go.uber.org/zap" ) @@ -39,19 +42,19 @@ func (cr *ChallengeEntity) SubmitChallengeToBC(ctx context.Context) (*transactio err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS, transaction.CHALLENGE_RESPONSE, sn, 0) if err != nil { - Logger.Info("Failed submitting challenge to the mining network", zap.String("err:", err.Error())) + logging.Logger.Info("Failed submitting challenge to the mining network", zap.String("err:", err.Error())) return nil, err } - Logger.Info("Verifying challenge response to blockchain.", zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID)) + logging.Logger.Info("Verifying challenge response to blockchain.", zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID)) time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second) t, err := transaction.VerifyTransaction(txn.Hash, chain.GetServerChain()) if err != nil { - Logger.Error("Error verifying the challenge response transaction", zap.String("err:", err.Error()), zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID)) + logging.Logger.Error("Error verifying the challenge response transaction", zap.String("err:", err.Error()), zap.String("txn", txn.Hash), zap.String("challenge_id", cr.ChallengeID)) return txn, err } - Logger.Info("Challenge committed and accepted", zap.Any("txn.hash", t.Hash), zap.Any("txn.output", t.TransactionOutput), zap.String("challenge_id", cr.ChallengeID)) + logging.Logger.Info("Challenge committed and accepted", zap.Any("txn.hash", t.Hash), zap.Any("txn.output", t.TransactionOutput), zap.String("challenge_id", cr.ChallengeID)) return t, nil } @@ -60,20 +63,24 @@ func (cr *ChallengeEntity) ErrorChallenge(ctx context.Context, err error) { cr.UpdatedAt = time.Now().UTC() if err := cr.Save(ctx); err != nil { - Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } } // LoadValidationTickets load validation tickets func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { if len(cr.Validators) == 0 { + cr.Status = Cancelled cr.StatusMessage = "No validators assigned to the challenge" cr.UpdatedAt = time.Now().UTC() + logging.Logger.Error(cr.StatusMessage + "for challenge: " + cr.ChallengeID) + if err := cr.Save(ctx); err != nil { - Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + return err } - return common.NewError("no_validators", "No validators assigned to the challenge") + return nil } allocationObj, err := allocation.GetAllocationByID(ctx, cr.AllocationID) @@ -81,38 +88,46 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { return err } + // Lock allocation changes from happening in handler.CommitWrite function + // This lock should be unlocked as soon as possible. We should not defer + // unlocking it as it will be locked for longer time and handler.CommitWrite + // will fail. + allocMu := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) + allocMu.Lock() + wms, err := writemarker.GetWriteMarkersInRange(ctx, cr.AllocationID, cr.AllocationRoot, allocationObj.AllocationRoot) if err != nil { + allocMu.Unlock() return err } if len(wms) == 0 { + allocMu.Unlock() return common.NewError("write_marker_not_found", "Could find the writemarker for the given allocation root on challenge") } rootRef, err := reference.GetReference(ctx, cr.AllocationID, "/") + if err != nil { + allocMu.Unlock() + cr.ErrorChallenge(ctx, err) + return err + } + blockNum := int64(0) if rootRef.NumBlocks > 0 { r := rand.New(rand.NewSource(cr.RandomNumber)) blockNum = r.Int63n(rootRef.NumBlocks) blockNum++ cr.BlockNum = blockNum - } else { - err = common.NewError("allocation_is_blank", "Got a challenge for a blank allocation") - cr.ErrorChallenge(ctx, err) - return err } - if err != nil { - cr.ErrorChallenge(ctx, err) - return err - } - - Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber)) + logging.Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber)) objectPath, err := reference.GetObjectPath(ctx, cr.AllocationID, blockNum) if err != nil { + allocMu.Unlock() cr.ErrorChallenge(ctx, err) return err } + cr.RefID = objectPath.RefID cr.RespondedAllocationRoot = allocationObj.AllocationRoot cr.ObjectPath = objectPath @@ -131,7 +146,8 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { if blockNum > 0 { if objectPath.Meta["type"] != reference.FILE { - Logger.Info("Block number to be challenged for file:", zap.Any("block", objectPath.FileBlockNum), zap.Any("meta", objectPath.Meta), zap.Any("obejct_path", objectPath)) + allocMu.Unlock() + logging.Logger.Info("Block number to be challenged for file:", zap.Any("block", objectPath.FileBlockNum), zap.Any("meta", objectPath.Meta), zap.Any("obejct_path", objectPath)) err = common.NewError("invalid_object_path", "Object path was not for a file") cr.ErrorChallenge(ctx, err) return err @@ -161,6 +177,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { blockData, mt, err := filestore.GetFileStore().GetBlocksMerkleTreeForChallenge(cr.AllocationID, inputData, blockoffset) if err != nil { + allocMu.Unlock() cr.ErrorChallenge(ctx, err) return common.NewError("blockdata_not_found", err.Error()) } @@ -169,9 +186,11 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { postData["chunk_size"] = objectPath.ChunkSize } + allocMu.Unlock() + postDataBytes, err := json.Marshal(postData) if err != nil { - Logger.Error("[db]form: " + err.Error()) + logging.Logger.Error("[db]form: " + err.Error()) cr.ErrorChallenge(ctx, err) return err } @@ -179,43 +198,72 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { if cr.ValidationTickets == nil { cr.ValidationTickets = make([]*ValidationTicket, len(cr.Validators)) } + + accessMu := sync.Mutex{} + updateMapAndSlice := func(validatorID string, i int, vt *ValidationTicket) { + accessMu.Lock() + cr.ValidationTickets[i] = vt + if vt != nil { + responses[validatorID] = *vt + } else { + delete(responses, validatorID) + } + accessMu.Unlock() + } + + swg := sizedwaitgroup.New(10) for i, validator := range cr.Validators { if cr.ValidationTickets[i] != nil { exisitingVT := cr.ValidationTickets[i] - if len(exisitingVT.Signature) > 0 && exisitingVT.ChallengeID == cr.ChallengeID { + if exisitingVT.Signature != "" && exisitingVT.ChallengeID == cr.ChallengeID { continue } } url := validator.URL + VALIDATOR_URL - resp, err := util.SendPostRequest(url, postDataBytes, nil) - if err != nil { - Logger.Info("[challenge]post: ", zap.Any("error", err.Error())) - delete(responses, validator.ID) - cr.ValidationTickets[i] = nil - continue - } - var validationTicket ValidationTicket - err = json.Unmarshal(resp, &validationTicket) - if err != nil { - Logger.Error("[challenge]resp: ", zap.String("validator", validator.ID), zap.Any("resp", string(resp)), zap.Any("error", err.Error())) - delete(responses, validator.ID) - cr.ValidationTickets[i] = nil - continue - } - Logger.Info("[challenge]resp: Got response from the validator.", zap.Any("validator_response", validationTicket)) - verified, err := validationTicket.VerifySign() - if err != nil || !verified { - Logger.Error("[challenge]ticket: Validation ticket from validator could not be verified.", zap.String("validator", validator.ID)) - delete(responses, validator.ID) - cr.ValidationTickets[i] = nil - continue - } - responses[validator.ID] = validationTicket - cr.ValidationTickets[i] = &validationTicket + swg.Add() + go func(url, validatorID string, i int) { + defer swg.Done() + + resp, err := util.SendPostRequest(url, postDataBytes, nil) + if err != nil { + logging.Logger.Info("[challenge]post: ", zap.Any("error", err.Error())) + updateMapAndSlice(validatorID, i, nil) + return + } + var validationTicket ValidationTicket + err = json.Unmarshal(resp, &validationTicket) + if err != nil { + logging.Logger.Error( + "[challenge]resp: ", + zap.String("validator", + validatorID), + zap.Any("resp", string(resp)), + zap.Any("error", err.Error()), + ) + updateMapAndSlice(validatorID, i, nil) + return + } + logging.Logger.Info( + "[challenge]resp: Got response from the validator.", + zap.Any("validator_response", validationTicket), + ) + verified, err := validationTicket.VerifySign() + if err != nil || !verified { + logging.Logger.Error( + "[challenge]ticket: Validation ticket from validator could not be verified.", + zap.String("validator", validatorID), + ) + updateMapAndSlice(validatorID, i, nil) + return + } + updateMapAndSlice(validatorID, i, &validationTicket) + }(url, validator.ID, i) } + swg.Wait() + numSuccess := 0 numFailure := 0 @@ -225,21 +273,21 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { if vt.Result { numSuccess++ } else { - Logger.Error("[challenge]ticket: "+vt.Message, zap.String("validator", vt.ValidatorID)) + logging.Logger.Error("[challenge]ticket: "+vt.Message, zap.String("validator", vt.ValidatorID)) numFailure++ } numValidatorsResponded++ } } - Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses)) + logging.Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses)) if numSuccess > (len(cr.Validators)/2) || numFailure > (len(cr.Validators)/2) || numValidatorsResponded == len(cr.Validators) { if numSuccess > (len(cr.Validators) / 2) { cr.Result = ChallengeSuccess } else { cr.Result = ChallengeFailure - Logger.Error("[challenge]validate: ", zap.String("challenge_id", cr.ChallengeID), zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath)) + logging.Logger.Error("[challenge]validate: ", zap.String("challenge_id", cr.ChallengeID), zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath)) } cr.Status = Processed @@ -255,7 +303,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) error { if len(cr.LastCommitTxnIDs) > 0 { for _, lastTxn := range cr.LastCommitTxnIDs { - Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn) + logging.Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn) t, err := transaction.VerifyTransaction(lastTxn, chain.GetServerChain()) if err == nil { cr.Status = Committed @@ -263,12 +311,14 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) cr.CommitTxnID = t.Hash cr.UpdatedAt = time.Now().UTC() if err := cr.Save(ctx); err != nil { - Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + } + if cr.RefID != 0 { + FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID) } - FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID) return nil } - Logger.Error("[challenge]trans: Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]trans: Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } } @@ -276,7 +326,11 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) return nil } + now := time.Now() t, err := cr.SubmitChallengeToBC(ctx) + logging.Logger.Debug("[challenge]submit: Time taken to submit challenge: ", + zap.Any("time_taken", time.Since(now))) + if err != nil { if t != nil { cr.CommitTxnID = t.Hash @@ -284,7 +338,7 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) cr.UpdatedAt = time.Now().UTC() } cr.ErrorChallenge(ctx, err) - Logger.Error("[challenge]submit: Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]submit: Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } else { cr.Status = Committed cr.StatusMessage = t.TransactionOutput @@ -293,6 +347,8 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) cr.UpdatedAt = time.Now().UTC() } err = cr.Save(ctx) - FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID) + if cr.RefID != 0 { + FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID) + } return err } diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 8561387ec..71ef7544b 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -344,6 +344,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b return nil, common.NewError("invalid_parameters", "Invalid connection id passed") } + // Lock will compete with other CommitWrites and Challenge validation mutex := lock.GetMutex(allocationObj.TableName(), allocationID) mutex.Lock() defer mutex.Unlock() diff --git a/code/go/0chain.net/blobbercore/reference/ref.go b/code/go/0chain.net/blobbercore/reference/ref.go index 03b740820..71ae6a9b2 100644 --- a/code/go/0chain.net/blobbercore/reference/ref.go +++ b/code/go/0chain.net/blobbercore/reference/ref.go @@ -30,7 +30,7 @@ const ( type Ref struct { ID int64 `gorm:"column:id;primaryKey"` Type string `gorm:"column:type;size:1" dirlist:"type" filelist:"type"` - AllocationID string `gorm:"column:allocation_id;size:64;not null;index:idx_path_alloc,priority:1;index:idx_lookup_hash_alloc,priority:1"` + AllocationID string `gorm:"column:allocation_id;size:64;not null;index:idx_path_alloc,priority:1;index:idx_lookup_hash_alloc,priority:1" dirlist:"allocation_id" filelist:"allocation_id"` LookupHash string `gorm:"column:lookup_hash;size:64;not null;index:idx_lookup_hash_alloc,priority:2" dirlist:"lookup_hash" filelist:"lookup_hash"` Name string `gorm:"column:name;size:100;not null" dirlist:"name" filelist:"name"` Path string `gorm:"column:path;size:1000;not null;index:idx_path_alloc,priority:2;index:path_idx" dirlist:"path" filelist:"path"` diff --git a/code/go/0chain.net/validatorcore/storage/models.go b/code/go/0chain.net/validatorcore/storage/models.go index 22dc9e0d9..f646eaca6 100644 --- a/code/go/0chain.net/validatorcore/storage/models.go +++ b/code/go/0chain.net/validatorcore/storage/models.go @@ -43,7 +43,7 @@ type DirMetaData struct { Hash string `json:"hash" mapstructure:"hash"` PathHash string `json:"path_hash" mapstructure:"path_hash"` NumBlocks int64 `json:"num_of_blocks" mapstructure:"num_of_blocks"` - AllocationID string `json:"allocation_id"` + AllocationID string `json:"allocation_id" mapstructure:"allocation_id"` Children []ObjectEntity `json:"-"` } @@ -283,7 +283,7 @@ func (cr *ChallengeRequest) VerifyChallenge(challengeObj *Challenge, allocationO return common.NewError("challenge_validation_failed", "Failed to verify the object path."+err.Error()) } - if cr.WriteMarkers == nil || len(cr.WriteMarkers) == 0 { + if len(cr.WriteMarkers) == 0 { return common.NewError("challenge_validation_failed", "Invalid write marker") }