Skip to content

Commit 385d97c

Browse files
committed
chores
1 parent 4bf4196 commit 385d97c

File tree

8 files changed

+114
-123
lines changed

8 files changed

+114
-123
lines changed

database/migrate/migrations/00027_ blob_upload.sql

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,33 @@
22
-- +goose StatementBegin
33

44
CREATE TABLE blob_upload (
5-
batch_index BIGINT NOT NULL,
5+
batch_index BIGINT NOT NULL,
66

7-
platform TEXT NOT NULL,
8-
status SMALLINT NOT NULL,
9-
updated_at TIMESTAMP NOT NULL DEFAULT now(),
7+
platform SMALLINT NOT NULL,
8+
status SMALLINT NOT NULL,
9+
10+
-- metadata
11+
updated_at TIMESTAMP NOT NULL DEFAULT now(),
12+
deleted_at TIMESTAMP(0) DEFAULT NULL
1013

1114
PRIMARY KEY (batch_index, platform),
1215
FOREIGN KEY (batch_index) REFERENCES batch(index)
1316
);
1417

1518
COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';
1619

20+
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL;
21+
22+
CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL;
23+
24+
CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL;
25+
26+
CREATE INDEX IF NOT EXISTS idx_blob_upload_updated_at ON blob_upload(updated_at) WHERE deleted_at IS NULL;
27+
28+
CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL;
29+
30+
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL;
31+
1732
-- +goose StatementEnd
1833

1934
-- +goose Down

database/migrate/migrations/00028_add_blob_upload_indexes.sql

Lines changed: 0 additions & 20 deletions
This file was deleted.

rollup/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@ mock_abi:
1111
rollup_bins: ## Builds the Rollup bins.
1212
go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/
1313
go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/
14+
go build -o $(PWD)/build/bin/blob_uploader ./cmd/blob_uploader/
1415

1516
gas_oracle: ## Builds the gas_oracle bin
1617
go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/
1718

1819
rollup_relayer: ## Builds the rollup_relayer bin
1920
go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/
2021

22+
blob_uploader: ## Builds the blob_uploader bin
23+
go build -o $(PWD)/build/bin/blob_uploader ./cmd/blob_uploader/
24+
2125
test:
2226
go test -v -race -coverprofile=coverage.txt -covermode=atomic -p 1 $(PWD)/...
2327

rollup/cmd/blob_uploader/app/app.go

Lines changed: 6 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/prometheus/client_golang/prometheus"
1111
"github.com/scroll-tech/da-codec/encoding"
12-
"github.com/scroll-tech/go-ethereum/ethclient"
1312
"github.com/scroll-tech/go-ethereum/log"
1413
"github.com/urfave/cli/v2"
1514

@@ -19,9 +18,7 @@ import (
1918
"scroll-tech/common/version"
2019

2120
"scroll-tech/rollup/internal/config"
22-
"scroll-tech/rollup/internal/controller/relayer"
23-
"scroll-tech/rollup/internal/controller/watcher"
24-
rutils "scroll-tech/rollup/internal/utils"
21+
"scroll-tech/rollup/internal/controller/blob_uploader"
2522
)
2623

2724
var app *cli.App
@@ -67,36 +64,12 @@ func action(ctx *cli.Context) error {
6764
registry := prometheus.DefaultRegisterer
6865
observability.Server(ctx, db)
6966

70-
// Init l2geth connection
71-
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
72-
if err != nil {
73-
log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err)
74-
}
75-
76-
genesisPath := ctx.String(utils.Genesis.Name)
77-
genesis, err := utils.ReadGenesis(genesisPath)
78-
if err != nil {
79-
log.Crit("failed to read genesis", "genesis file", genesisPath, "error", err)
80-
}
81-
8267
// sanity check config
83-
if cfg.L2Config.RelayerConfig.BatchSubmission == nil {
84-
log.Crit("cfg.L2Config.RelayerConfig.BatchSubmission must not be nil")
85-
}
86-
if cfg.L2Config.RelayerConfig.BatchSubmission.MinBatches < 1 {
87-
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MinBatches must be at least 1")
88-
}
89-
if cfg.L2Config.RelayerConfig.BatchSubmission.MaxBatches < 1 {
90-
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MaxBatches must be at least 1")
91-
}
92-
if cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch <= 0 {
93-
log.Crit("cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch must be greater than 0")
94-
}
95-
if cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk <= 0 {
96-
log.Crit("cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk must be greater than 0")
68+
if cfg.L2Config.BlobUploaderConfig == nil {
69+
log.Crit("cfg.L2Config.BlobUploaderConfig must not be nil")
9770
}
9871

99-
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, genesis.Config, relayer.ServiceTypeL2RollupRelayer, registry)
72+
blobUploader, err := blob_uploader.NewBlobUploader(ctx.Context, db, cfg.L2Config.BlobUploaderConfig, registry)
10073
if err != nil {
10174
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
10275
}
@@ -106,31 +79,7 @@ func action(ctx *cli.Context) error {
10679
log.Crit("min codec version must be greater than or equal to CodecV7", "minCodecVersion", minCodecVersion)
10780
}
10881

109-
chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, minCodecVersion, genesis.Config, db, registry)
110-
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, minCodecVersion, genesis.Config, db, registry)
111-
bundleProposer := watcher.NewBundleProposer(subCtx, cfg.L2Config.BundleProposerConfig, minCodecVersion, genesis.Config, db, registry)
112-
113-
l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, genesis.Config, db, registry)
114-
115-
// Watcher loop to fetch missing blocks
116-
go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
117-
number, loopErr := rutils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations)
118-
if loopErr != nil {
119-
log.Error("failed to get block number", "err", loopErr)
120-
return
121-
}
122-
l2watcher.TryFetchRunningMissingBlocks(number)
123-
})
124-
125-
go utils.Loop(subCtx, time.Duration(cfg.L2Config.ChunkProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, chunkProposer.TryProposeChunk)
126-
127-
go utils.Loop(subCtx, time.Duration(cfg.L2Config.BatchProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, batchProposer.TryProposeBatch)
128-
129-
go utils.Loop(subCtx, 10*time.Second, bundleProposer.TryProposeBundle)
130-
131-
go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches)
132-
133-
go utils.Loop(subCtx, 15*time.Second, l2relayer.ProcessPendingBundles)
82+
go utils.Loop(subCtx, 2*time.Second, blobUploader.UploadBlobToS3)
13483

13584
// Finish start all blob-uploader functions.
13685
log.Info("Start blob-uploader successfully", "version", version.Version)
@@ -145,7 +94,7 @@ func action(ctx *cli.Context) error {
14594
return nil
14695
}
14796

148-
// Run rollup relayer cmd instance.
97+
// Run blob uploader cmd instance.
14998
func Run() {
15099
if err := app.Run(os.Args); err != nil {
151100
_, _ = fmt.Fprintln(os.Stderr, err)

rollup/conf/config.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,15 @@
104104
"bundle_proposer_config": {
105105
"max_batch_num_per_bundle": 20,
106106
"bundle_timeout_sec": 36000
107+
},
108+
"blob_uploader_config": {
109+
"start_batch": 0,
110+
"aws_s3_config": {
111+
"bucket": "blob-data",
112+
"region": "us-west-2",
113+
"access_key": "ACCESSKEY",
114+
"secret_key": "SECRETKEY"
115+
}
107116
}
108117
},
109118
"db_config": {

rollup/internal/config/l2.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ type BundleProposerConfig struct {
5151

5252
// BlobUploaderConfig loads blob_uploader configuration items.
5353
type BlobUploaderConfig struct {
54-
AWSS3Config *AWSS3Config `json:"aws_s3_config"`
54+
StartBatch uint64 `json:"start_batch"`
55+
AWSS3Config *AWSS3Config `json:"aws_s3_config"`
5556
}
5657

5758
// AWSS3Config loads s3_uploader configuration items.
@@ -60,4 +61,4 @@ type AWSS3Config struct {
6061
Region string `json:"region"`
6162
AccessKey string `json:"access_key"`
6263
SecretKey string `json:"secret_key"`
63-
}
64+
}

rollup/internal/controller/blob_uploader/blob_uploader.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ type BlobUploader struct {
2525
cfg *config.BlobUploaderConfig
2626

2727
s3Uploader *S3Uploader
28-
batchOrm *orm.Batch
29-
chunkOrm *orm.Chunk
30-
l2BlockOrm *orm.L2Block
28+
29+
blobUploadOrm *orm.BlobUpload
30+
batchOrm *orm.Batch
31+
chunkOrm *orm.Chunk
32+
l2BlockOrm *orm.L2Block
3133

3234
metrics *blobUploaderMetrics
3335
}
@@ -44,12 +46,13 @@ func NewBlobUploader(ctx context.Context, db *gorm.DB, cfg *config.BlobUploaderC
4446
}
4547

4648
blobUploader := &BlobUploader{
47-
ctx: ctx,
48-
cfg: cfg,
49-
s3Uploader: s3Uploader,
50-
batchOrm: orm.NewBatch(db),
51-
chunkOrm: orm.NewChunk(db),
52-
l2BlockOrm: orm.NewL2Block(db),
49+
ctx: ctx,
50+
cfg: cfg,
51+
s3Uploader: s3Uploader,
52+
batchOrm: orm.NewBatch(db),
53+
chunkOrm: orm.NewChunk(db),
54+
l2BlockOrm: orm.NewL2Block(db),
55+
blobUploadOrm: orm.NewBlobUpload(db),
5356
}
5457

5558
blobUploader.metrics = initblobUploaderMetrics(reg)
@@ -85,10 +88,10 @@ func (b *BlobUploader) UploadBlobToS3() {
8588
return
8689
}
8790

88-
// calculate versioned blob hash
91+
// calculate versioned blob hash
8992
versionedBlobHash, err := utils.CalculateVersionedBlobHash(*blob)
9093
if err != nil {
91-
log.Error("failed to versioned blob hash", "batch index", dbBatch.Index, "err", err)
94+
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
9295
return
9396
}
9497

@@ -97,11 +100,21 @@ func (b *BlobUploader) UploadBlobToS3() {
97100
err = b.s3Uploader.UploadData(b.ctx, blob[:], key)
98101
if err != nil {
99102
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
103+
// Update status to failed
104+
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); err != nil {
105+
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", err)
106+
}
107+
return
108+
}
109+
110+
// Update status to uploaded
111+
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
112+
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
100113
return
101114
}
102115

103-
// update db status
104-
116+
b.metrics.rollupBlobUploaderUploadToS3Total.Inc()
117+
log.Info("Successfully uploaded blob to S3", "batch index", dbBatch.Index, "versioned blob hash", key)
105118
}
106119

107120
func (b *BlobUploader) constructBlobCodecV7(dbBatch *orm.Batch) (*kzg4844.Blob, error) {

0 commit comments

Comments
 (0)