From 947f91d5460b5a5f700427ba0f4ff4a6ca4dd48f Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 02:56:46 +0800 Subject: [PATCH 01/13] feat(blob-uploader): blob_upload table add batch hash --- .../migrate/migrations/00027_ blob_upload.sql | 10 ++- .../controller/blob_uploader/blob_uploader.go | 59 ++++++++++++++++-- rollup/internal/orm/batch.go | 38 ------------ rollup/internal/orm/blob_upload.go | 61 +++++++++++++++++-- 4 files changed, 118 insertions(+), 50 deletions(-) diff --git a/database/migrate/migrations/00027_ blob_upload.sql b/database/migrate/migrations/00027_ blob_upload.sql index a771ac0e1e..0db7f698e9 100644 --- a/database/migrate/migrations/00027_ blob_upload.sql +++ b/database/migrate/migrations/00027_ blob_upload.sql @@ -3,6 +3,7 @@ CREATE TABLE blob_upload ( batch_index BIGINT NOT NULL, + batch_hash VARCHAR NOT NULL, platform SMALLINT NOT NULL, status SMALLINT NOT NULL, @@ -13,20 +14,23 @@ CREATE TABLE blob_upload ( deleted_at TIMESTAMP(0) DEFAULT NULL ); -CREATE UNIQUE INDEX IF NOT EXISTS batch_index_platform_uindex -ON blob_upload(batch_index, platform) WHERE deleted_at IS NULL; +CREATE UNIQUE INDEX IF NOT EXISTS batch_index_batch_hash_platform_uindex +ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL; COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed'; CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_hash ON blob_upload(batch_hash) WHERE deleted_at IS NULL; + CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL; CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL; CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL; -CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform +ON blob_upload(batch_index, batch_hash, status, platform) WHERE deleted_at IS NULL; -- +goose StatementEnd diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index 3b6c6b7591..2d114f4dee 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -2,6 +2,7 @@ package blob_uploader import ( "context" + "errors" "fmt" "github.com/prometheus/client_golang/prometheus" @@ -67,7 +68,7 @@ func (b *BlobUploader) UploadBlobToS3() { } // get un-uploaded batches from database in ascending order by their index. - dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3) + dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3) if err != nil { log.Error("Failed to fetch unuploaded batch", "err", err) return @@ -85,7 +86,7 @@ func (b *BlobUploader) UploadBlobToS3() { if err != nil { log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() - if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { + if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr) } return @@ -97,7 +98,7 @@ func (b *BlobUploader) UploadBlobToS3() { log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() // update status to failed - if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { + if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr) } return @@ -110,14 +111,14 @@ func (b *BlobUploader) UploadBlobToS3() { log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() // update status to failed - if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { + if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr) } return } // update status to uploaded - if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil { + if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil { log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() return @@ -195,3 +196,51 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er return daBatch.Blob(), nil } + +// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service +// The batch must have a commit_tx_hash (committed). +func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) { + batchIndex, err := b.blobUploadOrm.GetFirstUnuploadedBatchIndexByPlatform(ctx, startBatch, platform) + if err != nil { + return nil, err + } + + var batch *orm.Batch + for { + var err error + batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) + return nil, nil + } + return nil, err + } + + // to check if the parent batch uploaded + // if no, there is a batch revert happened, we need to fallback to upload previous batch + fields := map[string]interface{}{ + "batch_index = ?": batchIndex - 1, + "batch_hash = ?": batch.ParentBatchHash, + "platform = ?": platform, + "status = ?": types.BlobUploadStatusUploaded, + } + blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) + if err != nil { + return nil, err + } + + if len(blobUpload) == 0 { + batchIndex-- + continue + } + break + } + + if len(batch.CommitTxHash) == 0 { + log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) + return nil, nil + } + + return batch, nil +} diff --git a/rollup/internal/orm/batch.go b/rollup/internal/orm/batch.go index 5e036ac6f7..584792fe18 100644 --- a/rollup/internal/orm/batch.go +++ b/rollup/internal/orm/batch.go @@ -263,44 +263,6 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro return &batch, nil } -// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service -// The batch must have a commit_tx_hash (committed). -func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) { - db := o.db.WithContext(ctx) - db = db.Model(&BlobUpload{}) - db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded) - db = db.Order("batch_index DESC") - db = db.Limit(1) - - var blobUpload BlobUpload - var batchIndex uint64 - if err := db.First(&blobUpload).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - batchIndex = startBatch - } else { - return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err) - } - } else { - batchIndex = blobUpload.BatchIndex + 1 - } - - batch, err := o.GetBatchByIndex(ctx, batchIndex) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) - return nil, nil - } - return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err) - } - - if len(batch.CommitTxHash) == 0 { - log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) - return nil, nil - } - - return batch, nil -} - // InsertBatch inserts a new batch into the database. func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) { if batch == nil { diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 79daa3838a..07645f2f50 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -2,6 +2,7 @@ package orm import ( "context" + "errors" "fmt" "time" @@ -16,8 +17,9 @@ type BlobUpload struct { db *gorm.DB `gorm:"-"` // blob upload - BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"` - Platform int16 `json:"platform" gorm:"column:platform;primaryKey"` + BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"` + BatchHash string `json:"batch_hash" gorm:"column:batch_hash"` + Platform int16 `json:"platform" gorm:"column:platform"` Status int16 `json:"status" gorm:"column:status"` // metadata @@ -36,8 +38,58 @@ func (*BlobUpload) TableName() string { return "blob_upload" } +// GetFirstUnuploadedBatchIndexByPlatform retrieves the first batch index that either hasn't been uploaded to corresponding blob storage service +func (o *BlobUpload) GetFirstUnuploadedBatchIndexByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { + db := o.db.WithContext(ctx) + db = db.Model(&BlobUpload{}) + db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded) + db = db.Order("batch_index DESC") + db = db.Limit(1) + + var blobUpload BlobUpload + var batchIndex uint64 + if err := db.First(&blobUpload).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + batchIndex = startBatch + } else { + return 0, fmt.Errorf("Batch.GetFirstUnuploadedBatchIndexByPlatform error: %w", err) + } + } else { + batchIndex = blobUpload.BatchIndex + 1 + } + + return batchIndex, nil +} + +// GetBlobUpload retrieves the selected blob uploads from the database. +func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) { + db := o.db.WithContext(ctx) + db = db.Model(&BlobUpload{}) + + for key, value := range fields { + db = db.Where(key, value) + } + + for _, orderBy := range orderByList { + db = db.Order(orderBy) + } + + if limit > 0 { + db = db.Limit(limit) + } + + db = db.Order("batch_index ASC") + + var blobUploads []*BlobUpload + if err := db.Find(&blobUploads).Error; err != nil { + return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err) + } + + return blobUploads, nil +} + // InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one. -func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error { +func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error { db := o.db if len(dbTX) > 0 && dbTX[0] != nil { db = dbTX[0] @@ -45,11 +97,12 @@ func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex ui db = db.WithContext(ctx) blobUpload := &BlobUpload{ BatchIndex: batchIndex, + BatchHash: batchHash, Platform: int16(platform), Status: int16(status), } if err := db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}}, + Columns: []clause.Column{{Name: "batch_index"}, {Name: "batch_hash"}, {Name: "platform"}}, DoUpdates: clause.AssignmentColumns([]string{"status"}), }).Create(blobUpload).Error; err != nil { return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform) From b2553366332d72b148d5ac730796bd4b45651df9 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 03:59:43 +0800 Subject: [PATCH 02/13] fix fields --- rollup/internal/controller/blob_uploader/blob_uploader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index 2d114f4dee..2f934b3e73 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -222,8 +222,8 @@ func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, st fields := map[string]interface{}{ "batch_index = ?": batchIndex - 1, "batch_hash = ?": batch.ParentBatchHash, - "platform = ?": platform, - "status = ?": types.BlobUploadStatusUploaded, + "platform = ?": int16(platform), + "status = ?": int16(types.BlobUploadStatusUploaded), } blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) if err != nil { From e6b84a2f54b1fdeeb092f422a0c502fd7e75c534 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 04:47:33 +0800 Subject: [PATCH 03/13] fix logic --- .../controller/blob_uploader/blob_uploader.go | 29 ++++++++++--------- rollup/internal/orm/blob_upload.go | 2 +- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index 2f934b3e73..e662be5fd2 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -219,21 +219,24 @@ func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, st // to check if the parent batch uploaded // if no, there is a batch revert happened, we need to fallback to upload previous batch - fields := map[string]interface{}{ - "batch_index = ?": batchIndex - 1, - "batch_hash = ?": batch.ParentBatchHash, - "platform = ?": int16(platform), - "status = ?": int16(types.BlobUploadStatusUploaded), - } - blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) - if err != nil { - return nil, err + if batchIndex > 0 { + fields := map[string]interface{}{ + "batch_index = ?": batchIndex - 1, + "batch_hash = ?": batch.ParentBatchHash, + "platform = ?": platform, + "status = ?": types.BlobUploadStatusUploaded, + } + blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) + if err != nil { + return nil, err + } + + if len(blobUpload) == 0 { + batchIndex-- + continue + } } - if len(blobUpload) == 0 { - batchIndex-- - continue - } break } diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 07645f2f50..75c37db800 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -38,7 +38,7 @@ func (*BlobUpload) TableName() string { return "blob_upload" } -// GetFirstUnuploadedBatchIndexByPlatform retrieves the first batch index that either hasn't been uploaded to corresponding blob storage service +// GetFirstUnuploadedBatchIndexByPlatform retrieves the first batch index that hasn't been uploaded to corresponding blob storage service func (o *BlobUpload) GetFirstUnuploadedBatchIndexByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { db := o.db.WithContext(ctx) db = db.Model(&BlobUpload{}) From ba627eb934a5db45b34bada34582023c56bdd7a6 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 05:10:27 +0800 Subject: [PATCH 04/13] fix logic --- rollup/internal/controller/blob_uploader/blob_uploader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index e662be5fd2..98a46def30 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -240,7 +240,7 @@ func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, st break } - if len(batch.CommitTxHash) == 0 { + if batchIndex > 0 && len(batch.CommitTxHash) == 0 { log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) return nil, nil } From 7e64a781cc7c7461a839d95570d04a4a4ce7145b Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 05:21:54 +0800 Subject: [PATCH 05/13] fix logic --- .../controller/blob_uploader/blob_uploader.go | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index 98a46def30..1100cb9bcc 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -219,28 +219,30 @@ func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, st // to check if the parent batch uploaded // if no, there is a batch revert happened, we need to fallback to upload previous batch - if batchIndex > 0 { - fields := map[string]interface{}{ - "batch_index = ?": batchIndex - 1, - "batch_hash = ?": batch.ParentBatchHash, - "platform = ?": platform, - "status = ?": types.BlobUploadStatusUploaded, - } - blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) - if err != nil { - return nil, err - } - - if len(blobUpload) == 0 { - batchIndex-- - continue - } + // skip the check if the parent batch is genesis batch + if batchIndex <= 1 { + break + } + fields := map[string]interface{}{ + "batch_index = ?": batchIndex - 1, + "batch_hash = ?": batch.ParentBatchHash, + "platform = ?": platform, + "status = ?": types.BlobUploadStatusUploaded, + } + blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) + if err != nil { + return nil, err + } + + if len(blobUpload) == 0 { + batchIndex-- + continue } break } - if batchIndex > 0 && len(batch.CommitTxHash) == 0 { + if len(batch.CommitTxHash) == 0 { log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) return nil, nil } From 2195a7aa112ad6dc89c1f2341ee6e2ddf4c1d8ff Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 05:32:53 +0800 Subject: [PATCH 06/13] fix logic --- rollup/internal/orm/blob_upload.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 75c37db800..cc89b2959f 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -42,7 +42,7 @@ func (*BlobUpload) TableName() string { func (o *BlobUpload) GetFirstUnuploadedBatchIndexByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { db := o.db.WithContext(ctx) db = db.Model(&BlobUpload{}) - db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded) + db = db.Where("platform = ? AND status = ? AND deleted_at IS NULL", platform, types.BlobUploadStatusUploaded) db = db.Order("batch_index DESC") db = db.Limit(1) @@ -103,6 +103,7 @@ func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex ui } if err := db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "batch_index"}, {Name: "batch_hash"}, {Name: "platform"}}, + Where: clause.Where{Exprs: []clause.Expression{clause.Eq{Column: "deleted_at", Value: nil}}}, DoUpdates: clause.AssignmentColumns([]string{"status"}), }).Create(blobUpload).Error; err != nil { return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform) From a2d59cc1d49d407210e08d0d97c93cc56807f998 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 05:34:32 +0800 Subject: [PATCH 07/13] fix logic --- rollup/internal/orm/blob_upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index cc89b2959f..d71da99ca3 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -103,7 +103,7 @@ func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex ui } if err := db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "batch_index"}, {Name: "batch_hash"}, {Name: "platform"}}, - Where: clause.Where{Exprs: []clause.Expression{clause.Eq{Column: "deleted_at", Value: nil}}}, + Where: clause.Where{Exprs: []clause.Expression{clause.Eq{Column: "blob_upload.deleted_at", Value: nil}}}, DoUpdates: clause.AssignmentColumns([]string{"status"}), }).Create(blobUpload).Error; err != nil { return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform) From fc2be9800b9d0fb48b3f15e42e55ad854b7512bf Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 15:05:41 +0800 Subject: [PATCH 08/13] fix logic --- rollup/internal/orm/blob_upload.go | 35 ++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index d71da99ca3..127c89fb8b 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -7,7 +7,6 @@ import ( "time" "gorm.io/gorm" - "gorm.io/gorm/clause" "scroll-tech/common/types" ) @@ -95,18 +94,30 @@ func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex ui db = dbTX[0] } db = db.WithContext(ctx) - blobUpload := &BlobUpload{ - BatchIndex: batchIndex, - BatchHash: batchHash, - Platform: int16(platform), - Status: int16(status), + + var existing BlobUpload + err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL", + batchIndex, batchHash, int16(platform), + ).First(&existing).Error + + if errors.Is(err, gorm.ErrRecordNotFound) { + newRecord := BlobUpload{ + BatchIndex: batchIndex, + BatchHash: batchHash, + Platform: int16(platform), + Status: int16(status), + } + if err := db.Create(&newRecord).Error; err != nil { + return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform) + } + return nil + } else if err != nil { + return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform) } - if err := db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "batch_index"}, {Name: "batch_hash"}, {Name: "platform"}}, - Where: clause.Where{Exprs: []clause.Expression{clause.Eq{Column: "blob_upload.deleted_at", Value: nil}}}, - DoUpdates: clause.AssignmentColumns([]string{"status"}), - }).Create(blobUpload).Error; err != nil { - return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform) + + if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil { + return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform) } + return nil } From dabcfca359920a9d1238e50f7141cc9536d0bab9 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 15:19:37 +0800 Subject: [PATCH 09/13] address comment --- rollup/internal/orm/blob_upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 127c89fb8b..1c5aaa7001 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -51,7 +51,7 @@ func (o *BlobUpload) GetFirstUnuploadedBatchIndexByPlatform(ctx context.Context, if errors.Is(err, gorm.ErrRecordNotFound) { batchIndex = startBatch } else { - return 0, fmt.Errorf("Batch.GetFirstUnuploadedBatchIndexByPlatform error: %w", err) + return 0, fmt.Errorf("BlobUpload.GetFirstUnuploadedBatchIndexByPlatform error: %w", err) } } else { batchIndex = blobUpload.BatchIndex + 1 From 9b026894ffa1aa4aed77ce75f655db661a65a739 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 15:34:03 +0800 Subject: [PATCH 10/13] rename function GetFirstUnuploadedBatchByPlatform --- rollup/internal/controller/blob_uploader/blob_uploader.go | 2 +- rollup/internal/orm/blob_upload.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index 1100cb9bcc..b19f180ba3 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -200,7 +200,7 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er // GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service // The batch must have a commit_tx_hash (committed). func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) { - batchIndex, err := b.blobUploadOrm.GetFirstUnuploadedBatchIndexByPlatform(ctx, startBatch, platform) + batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform) if err != nil { return nil, err } diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 1c5aaa7001..8165608ae2 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -37,8 +37,8 @@ func (*BlobUpload) TableName() string { return "blob_upload" } -// GetFirstUnuploadedBatchIndexByPlatform retrieves the first batch index that hasn't been uploaded to corresponding blob storage service -func (o *BlobUpload) GetFirstUnuploadedBatchIndexByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { +// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service +func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { db := o.db.WithContext(ctx) db = db.Model(&BlobUpload{}) db = db.Where("platform = ? AND status = ? AND deleted_at IS NULL", platform, types.BlobUploadStatusUploaded) @@ -51,7 +51,7 @@ func (o *BlobUpload) GetFirstUnuploadedBatchIndexByPlatform(ctx context.Context, if errors.Is(err, gorm.ErrRecordNotFound) { batchIndex = startBatch } else { - return 0, fmt.Errorf("BlobUpload.GetFirstUnuploadedBatchIndexByPlatform error: %w", err) + return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err) } } else { batchIndex = blobUpload.BatchIndex + 1 From 9db96046b8e97d871b5a09425c8de233df7ce977 Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 15:57:54 +0800 Subject: [PATCH 11/13] remove used blob_upload index --- database/migrate/migrations/00027_ blob_upload.sql | 8 -------- 1 file changed, 8 deletions(-) diff --git a/database/migrate/migrations/00027_ blob_upload.sql b/database/migrate/migrations/00027_ blob_upload.sql index 0db7f698e9..bc0dee7518 100644 --- a/database/migrate/migrations/00027_ blob_upload.sql +++ b/database/migrate/migrations/00027_ blob_upload.sql @@ -19,14 +19,6 @@ ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL; COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed'; -CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_hash ON blob_upload(batch_hash) WHERE deleted_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL; - CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL; CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform From 48107c9ce774079dc12b782a97e2ffc1750c435d Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 16:00:09 +0800 Subject: [PATCH 12/13] remove deleted_at IS NULL --- rollup/internal/orm/blob_upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 8165608ae2..391be5f31e 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -41,7 +41,7 @@ func (*BlobUpload) TableName() string { func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { db := o.db.WithContext(ctx) db = db.Model(&BlobUpload{}) - db = db.Where("platform = ? AND status = ? AND deleted_at IS NULL", platform, types.BlobUploadStatusUploaded) + db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded) db = db.Order("batch_index DESC") db = db.Limit(1) From 90440f0183a2e1ac0d8d27efe03d49cd5e8182ac Mon Sep 17 00:00:00 2001 From: Morty Date: Wed, 11 Jun 2025 16:42:45 +0800 Subject: [PATCH 13/13] handle edge case --- rollup/internal/controller/blob_uploader/blob_uploader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index b19f180ba3..0ee70ec4ed 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -220,7 +220,7 @@ func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, st // to check if the parent batch uploaded // if no, there is a batch revert happened, we need to fallback to upload previous batch // skip the check if the parent batch is genesis batch - if batchIndex <= 1 { + if batchIndex <= 1 || batchIndex == startBatch { break } fields := map[string]interface{}{