Skip to content

Commit

Permalink
add prev size and add vm to rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Jul 21, 2024
1 parent 23e7eb9 commit 7e3540d
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 114 deletions.
25 changes: 14 additions & 11 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,26 @@ type Allocation struct {
Tx string `gorm:"column:tx;size:64;not null;unique;index:idx_unique_allocations_tx,unique"`
TotalSize int64 `gorm:"column:size;not null;default:0"`
UsedSize int64 `gorm:"column:used_size;not null;default:0"`
PrevUsedSize int64 `gorm:"column:prev_used_size;not null;default:0"`
OwnerID string `gorm:"column:owner_id;size:64;not null"`
OwnerPublicKey string `gorm:"column:owner_public_key;size:512;not null"`
RepairerID string `gorm:"column:repairer_id;size:64;not null"`
Expiration common.Timestamp `gorm:"column:expiration_date;not null"`
// AllocationRoot allcation_root of last write_marker
AllocationRoot string `gorm:"column:allocation_root;size:64;not null;default:''"`
FileMetaRoot string `gorm:"column:file_meta_root;size:64;not null;default:''"`
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
LastRedeemedSeq int64 `gorm:"column:last_redeemed_sequence;default:0"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
AllocationRoot string `gorm:"column:allocation_root;size:64;not null;default:''"`
FileMetaRoot string `gorm:"column:file_meta_root;size:64;not null;default:''"`
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
PrevBlobberSizeUsed int64 `gorm:"column:prev_blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
LastRedeemedSeq int64 `gorm:"column:last_redeemed_sequence;default:0"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
// Ending and cleaning
CleanedUp bool `gorm:"column:cleaned_up;not null;default:false"`
Finalized bool `gorm:"column:finalized;not null;default:false"`
CleanedUp bool `gorm:"column:cleaned_up;not null;default:false"`
Finalized bool `gorm:"column:finalized;not null;default:false"`
AllocationVersion int64 `gorm:"column:allocation_version;not null;default:0"`

// FileOptions to define file restrictions on an allocation for third-parties
// default 00000000 for all crud operations suggesting only owner has the below listed abilities.
Expand Down
4 changes: 4 additions & 0 deletions code/go/0chain.net/blobbercore/blobberhttp/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ type LatestWriteMarkerResult struct {
PrevWM *writemarker.WriteMarker `json:"prev_write_marker"`
Version string `json:"version"`
}

type LatestVersionMarkerResult struct {
VersionMarker *writemarker.VersionMarker `json:"version_marker"`
}
94 changes: 23 additions & 71 deletions code/go/0chain.net/blobbercore/handler/object_operation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,19 +606,26 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
if err != nil {
return nil, common.NewError("db_error", fmt.Sprintf("Error while saving version marker: %s", err.Error()))
}

allocationObj.PrevBlobberSizeUsed = allocationObj.BlobberSizeUsed
allocationObj.PrevUsedSize = allocationObj.UsedSize
allocationObj.BlobberSizeUsed += connectionObj.Size
allocationObj.UsedSize += connectionObj.Size

updateMap := map[string]interface{}{
"used_size": allocationObj.UsedSize,
"blobber_size_used": allocationObj.BlobberSizeUsed,
"is_redeem_required": true,
"allocation_version": versionMarker.Version,
"prev_used_size": allocationObj.PrevUsedSize,
"prev_blobber_size": allocationObj.PrevBlobberSizeUsed,
}
updateOption := func(a *allocation.Allocation) {
a.IsRedeemRequired = true
a.BlobberSizeUsed = allocationObj.BlobberSizeUsed
a.UsedSize = allocationObj.UsedSize
a.AllocationVersion = versionMarker.Version
a.PrevUsedSize = allocationObj.PrevUsedSize
a.PrevBlobberSizeUsed = allocationObj.PrevBlobberSizeUsed
}

if err = allocation.Repo.UpdateAllocation(ctx, allocationObj, updateMap, updateOption); err != nil {
Expand Down Expand Up @@ -1237,19 +1244,14 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error())
}

if allocationObj.AllocationRoot == "" {
Logger.Error("Allocation root is not set", zap.String("allocation_id", allocationObj.ID))
return nil, common.NewError("invalid_parameters", "Allocation root is not set")
if allocationObj.AllocationVersion == 0 {
Logger.Error("Allocation version is 0", zap.String("allocation_id", allocationObj.ID))
return nil, common.NewError("invalid_parameters", "Allocation version is not set")
}

elapsedAllocation := time.Since(startTime)

allocationID := allocationObj.ID
connectionID, ok := common.GetField(r, "connection_id")
if !ok {
return nil, common.NewError("invalid_parameters", "Invalid connection id passed")
}

elapsedGetLock := time.Since(startTime) - elapsedAllocation

if clientID == "" || clientKey == "" {
Expand All @@ -1260,50 +1262,19 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner of the allocation")
}

writeMarkerString := r.FormValue("write_marker")
writeMarker := writemarker.WriteMarker{}
err = json.Unmarshal([]byte(writeMarkerString), &writeMarker)
versionMarkerString := r.FormValue("version_marker")
versionMarker := writemarker.VersionMarker{}
err = json.Unmarshal([]byte(versionMarkerString), &versionMarker)
if err != nil {
return nil, common.NewErrorf("invalid_parameters",
"Invalid parameters. Error parsing the writemarker for commit: %v",
err)
}

var result blobberhttp.CommitResult
err = versionMarker.Verify(allocationID, allocationObj.OwnerPublicKey)

var latestWriteMarkerEntity *writemarker.WriteMarkerEntity
latestWriteMarkerEntity, err = writemarker.GetWriteMarkerEntity(ctx,
allocationObj.AllocationRoot)
if err != nil {
return nil, common.NewErrorf("latest_write_marker_read_error",
"Error reading the latest write marker for allocation: %v", err)
}
if latestWriteMarkerEntity == nil {
return nil, common.NewError("latest_write_marker_not_found",
"Latest write marker not found for allocation")
}

writemarkerEntity := &writemarker.WriteMarkerEntity{}
writemarkerEntity.WM = writeMarker

err = writemarkerEntity.VerifyRollbackMarker(ctx, allocationObj, latestWriteMarkerEntity)
if err != nil {
return nil, common.NewError("write_marker_verification_failed", "Verification of the write marker failed: "+err.Error())
}

if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength {
return nil, common.NewError("chain_length_exceeded", "Chain length exceeded")
}

elapsedVerifyWM := time.Since(startTime) - elapsedAllocation - elapsedGetLock

var clientIDForWriteRedeem = writeMarker.ClientID

if err := writePreRedeem(ctx, allocationObj, &writeMarker, clientIDForWriteRedeem); err != nil {
return nil, err
}

elapsedWritePreRedeem := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM
elapsedWritePreRedeem := time.Since(startTime) - elapsedAllocation - elapsedGetLock
timeoutCtx, cancel := context.WithTimeout(ctx, 45*time.Second)
defer cancel()
c := datastore.GetStore().CreateTransaction(timeoutCtx)
Expand All @@ -1313,7 +1284,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
txn.Rollback()
return nil, common.NewError("allocation_rollback_error", "Error applying the rollback for allocation: "+err.Error())
}
elapsedApplyRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - elapsedWritePreRedeem
elapsedApplyRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedWritePreRedeem

//get allocation root and ref
rootRef, err := reference.GetLimitedRefFieldsByPath(c, allocationID, "/", []string{"hash", "file_meta_hash", "is_precommit"})
Expand All @@ -1326,38 +1297,22 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
}

Logger.Info("rollback_root_ref", zap.Any("root_ref", rootRef))
fileMetaRoot := rootRef.FileMetaHash
if fileMetaRoot != writeMarker.FileMetaRoot {
if latestWriteMarkerEntity != nil {
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." +
" Expected hash: " + fileMetaRoot + "; Got: " + writeMarker.FileMetaRoot
txn.Rollback()
return &result, common.NewError("file_meta_root_mismatch", result.ErrorMessage)
}

writemarkerEntity.ConnectionID = connectionID
writemarkerEntity.ClientPublicKey = clientKey
Logger.Info("rollback_writemarker", zap.Any("writemarker", writemarkerEntity.WM))

alloc, err := allocation.Repo.GetByIdAndLock(c, allocationID)
Logger.Info("[rollback]Lock Allocation", zap.Bool("is_redeem_required", alloc.IsRedeemRequired), zap.String("allocation_root", alloc.AllocationRoot), zap.String("latest_wm_redeemed", alloc.LatestRedeemedWM))
if err != nil {
txn.Rollback()
return &result, common.NewError("allocation_read_error", "Error reading the allocation object")
}

alloc.BlobberSizeUsed -= latestWriteMarkerEntity.WM.Size
alloc.UsedSize -= latestWriteMarkerEntity.WM.Size
alloc.FileMetaRoot = fileMetaRoot
alloc.BlobberSizeUsed = alloc.PrevBlobberSizeUsed
alloc.UsedSize = alloc.PrevUsedSize
alloc.IsRedeemRequired = true
alloc.AllocationVersion = versionMarker.Version
updateMap := map[string]interface{}{
"blobber_size_used": alloc.BlobberSizeUsed,
"used_size": alloc.UsedSize,
"file_meta_root": alloc.FileMetaRoot,
"is_redeem_required": true,
"allocation_version": versionMarker.Version,
}

updateOption := func(a *allocation.Allocation) {
Expand All @@ -1367,8 +1322,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
a.FileMetaRoot = alloc.FileMetaRoot
a.IsRedeemRequired = alloc.IsRedeemRequired
}
writemarkerEntity.Latest = true
err = txn.Create(writemarkerEntity).Error
err = txn.Create(versionMarker).Error
if err != nil {
txn.Rollback()
return &result, common.NewError("write_marker_error", "Error persisting the write marker "+err.Error())
Expand All @@ -1387,9 +1341,8 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
Logger.Error("Error committing the rollback for allocation", zap.Error(err))
}

elapsedCommitRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - elapsedWritePreRedeem
elapsedCommitRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedWritePreRedeem
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = writemarkerEntity
result.Success = true
result.ErrorMessage = ""
commitOperation := "rollback"
Expand All @@ -1398,7 +1351,6 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
zap.String("alloc_id", allocationID),
zap.Duration("get_alloc", elapsedAllocation),
zap.Duration("get-lock", elapsedGetLock),
zap.Duration("verify-wm", elapsedVerifyWM),
zap.Duration("write-pre-redeem", elapsedWritePreRedeem),
zap.Duration("apply-rollback", elapsedApplyRollback),
zap.Duration("total", time.Since(startTime)),
Expand Down
41 changes: 10 additions & 31 deletions code/go/0chain.net/blobbercore/handler/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (fsh *StorageHandler) ListEntities(ctx context.Context, r *http.Request) (*
return &result, nil
}

func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Request) (*blobberhttp.LatestWriteMarkerResult, error) {
func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Request) (*blobberhttp.LatestVersionMarkerResult, error) {
clientID := ctx.Value(constants.ContextKeyClient).(string)
if clientID == "" {
return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner of the allocation")
Expand All @@ -439,41 +439,20 @@ func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Req
return nil, common.NewError("invalid_signature", "could not verify the allocation owner")
}

var latestWM *writemarker.WriteMarkerEntity
var prevWM *writemarker.WriteMarkerEntity
if allocationObj.AllocationRoot == "" {
latestWM = nil
} else {
latestWM, err = writemarker.GetWriteMarkerEntity(ctx, allocationObj.AllocationRoot)
var vm *writemarker.VersionMarker
if allocationObj.AllocationVersion != 0 {
vm, err = writemarker.GetVersionMarker(ctx, allocationId, allocationObj.AllocationVersion)
if err != nil {
Logger.Error("[latest_write_marker]", zap.String("allocation_root", allocationObj.AllocationRoot), zap.String("allocation_id", allocationObj.ID))
return nil, common.NewError("latest_write_marker_read_error", "Error reading the latest write marker for allocation. "+err.Error())
}
if latestWM == nil {
Logger.Info("[latest_write_marker]", zap.String("allocation_root", allocationObj.AllocationRoot), zap.String("allocation_id", allocationObj.ID))
return nil, common.NewError("latest_write_marker_read_error", "Latest write marker not found for allocation.")
}
if latestWM.WM.PreviousAllocationRoot != "" {
prevWM, err = writemarker.GetWriteMarkerEntity(ctx, latestWM.WM.PreviousAllocationRoot)
if err != nil {
return nil, common.NewError("latest_write_marker_read_error", "Error reading the previous write marker for allocation."+err.Error())
}
return nil, common.NewError("latest_write_marker_read_error", "Error reading the latest write marker for allocation."+err.Error())
}
} else {
vm = &writemarker.VersionMarker{}
}

var result blobberhttp.LatestWriteMarkerResult
result.Version = writemarker.MARKER_VERSION
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
result.LatestWM = &latestWM.WM
result := &blobberhttp.LatestVersionMarkerResult{
VersionMarker: vm,
}
if prevWM != nil {
result.PrevWM = &prevWM.WM
}

return &result, nil
return result, nil
}

func (fsh *StorageHandler) GetReferencePath(ctx context.Context, r *http.Request) (*blobberhttp.ReferencePathResult, error) {
Expand Down
11 changes: 10 additions & 1 deletion code/go/0chain.net/blobbercore/writemarker/version_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

type VersionMarker struct {
ID int64 `gorm:"column:sequence;primaryKey"`
ClientID string `gorm:"client_id" json:"client_id"`
BlobberID string `gorm:"blobber_id" json:"blobber_id"`
AllocationID string `gorm:"allocation_id" json:"allocation_id"`
Version int64 `gorm:"version" json:"version"`
Timestamp int64 `gorm:"timestamp" json:"timestamp"`
Expand All @@ -33,6 +35,13 @@ func GetCurrentVersion(ctx context.Context, allocationID string) (*VersionMarker
return &vm, err
}

func GetVersionMarker(ctx context.Context, allocationID string, version int64) (*VersionMarker, error) {
db := datastore.GetStore().GetTransaction(ctx)
var vm VersionMarker
err := db.Where("allocation_id = ? and version = ?", allocationID, version).Order("id DESC").Take(&vm).Error
return &vm, err
}

func (vm *VersionMarker) Verify(allocationID, clientPubKey string) error {
if vm.AllocationID != allocationID {
return common.NewError("version_marker_validation_failed", "Invalid allocation id")
Expand All @@ -56,5 +65,5 @@ func (vm *VersionMarker) Verify(allocationID, clientPubKey string) error {
}

func (vm *VersionMarker) GetHashData() string {
return fmt.Sprintf("%s:%d:%d", vm.AllocationID, vm.Version, vm.Timestamp)
return fmt.Sprintf("%s:%s:%s:%d:%d", vm.AllocationID, vm.ClientID, vm.BlobberID, vm.Version, vm.Timestamp)
}
1 change: 1 addition & 0 deletions goose/migrations/1698861371_full_db_snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ CREATE TABLE allocations (
finalized boolean DEFAULT false NOT NULL,
file_options integer DEFAULT 63 NOT NULL,
start_time bigint NOT NULL
allocation_version bigint DEFAULT 0 NOT NULL
);


Expand Down
4 changes: 4 additions & 0 deletions goose/migrations/1721021811_version_marker.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
CREATE TABLE version_markers(
id bigint NOT NULL,
allocation_id character varying(64) NOT NULL,
blobber_id character varying(64) NOT NULL,
client_id character varying(64) NOT NULL,
"version" bigint NOT NULL,
"timestamp" bigint NOT NULL,
signature character varying(64),
Expand Down Expand Up @@ -43,4 +45,6 @@ ALTER TABLE ONLY version_markers
ADD CONSTRAINT version_markers_pkey PRIMARY KEY (id);


CREATE INDEX version_markers_allocation_id_idx ON version_markers USING btree (allocation_id,version);

-- +goose StatementEnd

0 comments on commit 7e3540d

Please sign in to comment.