Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions database/migrate/migrations/00027_ blob_upload.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

CREATE TABLE blob_upload (
batch_index BIGINT NOT NULL,
batch_hash VARCHAR NOT NULL,

platform SMALLINT NOT NULL,
status SMALLINT NOT NULL,
Expand All @@ -13,20 +14,15 @@ CREATE TABLE blob_upload (
deleted_at TIMESTAMP(0) DEFAULT NULL
);

CREATE UNIQUE INDEX IF NOT EXISTS batch_index_platform_uindex
ON blob_upload(batch_index, platform) WHERE deleted_at IS NULL;
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_batch_hash_platform_uindex
ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL;

COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';

CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL;

CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL;

CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL;

CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL;

CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform
ON blob_upload(batch_index, batch_hash, status, platform) WHERE deleted_at IS NULL;

-- +goose StatementEnd

Expand Down
64 changes: 59 additions & 5 deletions rollup/internal/controller/blob_uploader/blob_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blob_uploader

import (
"context"
"errors"
"fmt"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (b *BlobUploader) UploadBlobToS3() {
}

// get un-uploaded batches from database in ascending order by their index.
dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
if err != nil {
log.Error("Failed to fetch unuploaded batch", "err", err)
return
Expand All @@ -85,7 +86,7 @@ func (b *BlobUploader) UploadBlobToS3() {
if err != nil {
log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
Expand All @@ -97,7 +98,7 @@ func (b *BlobUploader) UploadBlobToS3() {
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
// update status to failed
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
Expand All @@ -110,14 +111,14 @@ func (b *BlobUploader) UploadBlobToS3() {
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
// update status to failed
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
}

// update status to uploaded
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
return
Expand Down Expand Up @@ -195,3 +196,56 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er

return daBatch.Blob(), nil
}

// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
// The batch must have a commit_tx_hash (committed).
func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) {
batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform)
if err != nil {
return nil, err
}

var batch *orm.Batch
for {
var err error
batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return nil, err
}

// to check if the parent batch uploaded
// if no, there is a batch revert happened, we need to fallback to upload previous batch
// skip the check if the parent batch is genesis batch
if batchIndex <= 1 || batchIndex == startBatch {
break
}
fields := map[string]interface{}{
"batch_index = ?": batchIndex - 1,
"batch_hash = ?": batch.ParentBatchHash,
"platform = ?": platform,
"status = ?": types.BlobUploadStatusUploaded,
}
blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1)
if err != nil {
return nil, err
}

if len(blobUpload) == 0 {
batchIndex--
continue
}

break
}

if len(batch.CommitTxHash) == 0 {
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}

return batch, nil
}
38 changes: 0 additions & 38 deletions rollup/internal/orm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,44 +263,6 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro
return &batch, nil
}

// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
// The batch must have a commit_tx_hash (committed).
func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
db = db.Order("batch_index DESC")
db = db.Limit(1)

var blobUpload BlobUpload
var batchIndex uint64
if err := db.First(&blobUpload).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
batchIndex = startBatch
} else {
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
}
} else {
batchIndex = blobUpload.BatchIndex + 1
}

batch, err := o.GetBatchByIndex(ctx, batchIndex)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
}

if len(batch.CommitTxHash) == 0 {
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}

return batch, nil
}

// InsertBatch inserts a new batch into the database.
func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) {
if batch == nil {
Expand Down
91 changes: 78 additions & 13 deletions rollup/internal/orm/blob_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package orm

import (
"context"
"errors"
"fmt"
"time"

"gorm.io/gorm"
"gorm.io/gorm/clause"

"scroll-tech/common/types"
)
Expand All @@ -16,8 +16,9 @@ type BlobUpload struct {
db *gorm.DB `gorm:"-"`

// blob upload
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"`
Platform int16 `json:"platform" gorm:"column:platform;primaryKey"`
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"`
BatchHash string `json:"batch_hash" gorm:"column:batch_hash"`
Platform int16 `json:"platform" gorm:"column:platform"`
Status int16 `json:"status" gorm:"column:status"`

// metadata
Expand All @@ -36,23 +37,87 @@ func (*BlobUpload) TableName() string {
return "blob_upload"
}

// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service
func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
db = db.Order("batch_index DESC")
db = db.Limit(1)

var blobUpload BlobUpload
var batchIndex uint64
if err := db.First(&blobUpload).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
batchIndex = startBatch
} else {
return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err)
}
} else {
batchIndex = blobUpload.BatchIndex + 1
}

return batchIndex, nil
}

// GetBlobUpload retrieves the selected blob uploads from the database.
func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})

for key, value := range fields {
db = db.Where(key, value)
}

for _, orderBy := range orderByList {
db = db.Order(orderBy)
}

if limit > 0 {
db = db.Limit(limit)
}

db = db.Order("batch_index ASC")

var blobUploads []*BlobUpload
if err := db.Find(&blobUploads).Error; err != nil {
return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err)
}

return blobUploads, nil
}

// InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one.
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
blobUpload := &BlobUpload{
BatchIndex: batchIndex,
Platform: int16(platform),
Status: int16(status),

var existing BlobUpload
err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL",
batchIndex, batchHash, int16(platform),
).First(&existing).Error

if errors.Is(err, gorm.ErrRecordNotFound) {
newRecord := BlobUpload{
BatchIndex: batchIndex,
BatchHash: batchHash,
Platform: int16(platform),
Status: int16(status),
}
if err := db.Create(&newRecord).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}
return nil
} else if err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}
if err := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}},
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(blobUpload).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)

if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}

return nil
}
Loading