@@ -2,14 +2,17 @@ package blob_uploader
22
33import (
44 "context"
5- "crypto/sha256"
65 "fmt"
76
87 "github.com/prometheus/client_golang/prometheus"
8+ "github.com/scroll-tech/da-codec/encoding"
9+ "github.com/scroll-tech/go-ethereum/common"
10+ "github.com/scroll-tech/go-ethereum/crypto/kzg4844"
911 "github.com/scroll-tech/go-ethereum/log"
1012 "gorm.io/gorm"
1113
1214 "scroll-tech/common/types"
15+ "scroll-tech/common/utils"
1316
1417 "scroll-tech/rollup/internal/config"
1518 "scroll-tech/rollup/internal/orm"
@@ -23,6 +26,8 @@ type BlobUploader struct {
2326
2427 s3Uploader * S3Uploader
2528 batchOrm * orm.Batch
29+ chunkOrm * orm.Chunk
30+ l2BlockOrm * orm.L2Block
2631
2732 metrics * blobUploaderMetrics
2833}
@@ -43,6 +48,8 @@ func NewBlobUploader(ctx context.Context, db *gorm.DB, cfg *config.BlobUploaderC
4348 cfg : cfg ,
4449 s3Uploader : s3Uploader ,
4550 batchOrm : orm .NewBatch (db ),
51+ chunkOrm : orm .NewChunk (db ),
52+ l2BlockOrm : orm .NewL2Block (db ),
4653 }
4754
4855 blobUploader .metrics = initblobUploaderMetrics (reg )
@@ -52,18 +59,94 @@ func NewBlobUploader(ctx context.Context, db *gorm.DB, cfg *config.BlobUploaderC
5259
5360func (b * BlobUploader ) UploadBlobToS3 () {
5461 // get un-uploaded batches from database in ascending order by their index.
55- dbBatches , err := b .batchOrm .GetFirstUnuploadedAndFailedBatch (b .ctx , types .BlobStoragePlatformS3 )
62+ dbBatch , err := b .batchOrm .GetFirstUnuploadedAndFailedBatch (b .ctx , types .BlobStoragePlatformS3 )
5663 if err != nil {
5764 log .Error ("Failed to fetch unuploaded batch" , "err" , err )
5865 return
5966 }
6067
6168 // nothing to do if we don't have any pending batches
62- if dbBatches == nil {
69+ if dbBatch == nil {
6370 return
6471 }
6572
66- // upload data to s3 bucket
67- b .s3Uploader .UploadData (b .ctx , )
73+ // construct blob
74+ var blob * kzg4844.Blob
75+ codecVersion := encoding .CodecVersion (dbBatch .CodecVersion )
76+ switch codecVersion {
77+ case encoding .CodecV7 :
78+ blob , err = b .constructBlobCodecV7 (dbBatch )
79+ if err != nil {
80+ log .Error ("failed to construct constructBlobCodecV7 payload for V7" , "codecVersion" , codecVersion , "batch index" , dbBatch .Index , "err" , err )
81+ return
82+ }
83+ default :
84+ log .Error ("unsupported codec version in UploadBlobToS3" , "codecVersion" , codecVersion , "batch index" , dbBatch .Index )
85+ return
86+ }
87+
88+ // calculate versioned blob hash
89+ versionedBlobHash , err := utils .CalculateVersionedBlobHash (* blob )
90+ if err != nil {
91+ log .Error ("failed to versioned blob hash" , "batch index" , dbBatch .Index , "err" , err )
92+ return
93+ }
94+
95+ // upload blob data to s3 bucket
96+ key := common .Bytes2Hex (versionedBlobHash [:])
97+ err = b .s3Uploader .UploadData (b .ctx , blob [:], key )
98+ if err != nil {
99+ log .Error ("failed to upload blob data to AWS S3" , "batch index" , dbBatch .Index , "versioned blob hash" , key , "err" , err )
100+ return
101+ }
102+
103+ // update db status
104+
68105}
69106
107+ func (b * BlobUploader ) constructBlobCodecV7 (dbBatch * orm.Batch ) (* kzg4844.Blob , error ) {
108+ var dbChunks []* orm.Chunk
109+
110+ // Verify batches compatibility
111+ dbChunks , err := b .chunkOrm .GetChunksInRange (b .ctx , dbBatch .StartChunkIndex , dbBatch .EndChunkIndex )
112+ if err != nil {
113+ return nil , fmt .Errorf ("failed to get chunks in range: %v" , err )
114+ }
115+
116+ // check codec version
117+ var batchBlocks []* encoding.Block
118+ for _ , dbChunk := range dbChunks {
119+ if dbBatch .CodecVersion != dbChunk .CodecVersion {
120+ return nil , fmt .Errorf ("batch codec version is different from chunk codec version, batch index: %d, chunk index: %d, batch codec version: %d, chunk codec version: %d" , dbBatch .Index , dbChunk .Index , dbBatch .CodecVersion , dbChunk .CodecVersion )
121+ }
122+
123+ blocks , err := b .l2BlockOrm .GetL2BlocksInRange (b .ctx , dbChunk .StartBlockNumber , dbChunk .EndBlockNumber )
124+ if err != nil {
125+ return nil , fmt .Errorf ("failed to get blocks in range for batch %d: %w" , dbBatch .Index , err )
126+ }
127+
128+ batchBlocks = append (batchBlocks , blocks ... )
129+ }
130+
131+ encodingBatch := & encoding.Batch {
132+ Index : dbBatch .Index ,
133+ ParentBatchHash : common .HexToHash (dbBatch .ParentBatchHash ),
134+ PrevL1MessageQueueHash : common .HexToHash (dbBatch .PrevL1MessageQueueHash ),
135+ PostL1MessageQueueHash : common .HexToHash (dbBatch .PostL1MessageQueueHash ),
136+ Blocks : batchBlocks ,
137+ }
138+
139+ version := encoding .CodecVersion (dbBatch .CodecVersion )
140+ codec , err := encoding .CodecFromVersion (version )
141+ if err != nil {
142+ return nil , fmt .Errorf ("failed to get codec from version %d, err: %w" , dbBatch .CodecVersion , err )
143+ }
144+
145+ daBatch , err := codec .NewDABatch (encodingBatch )
146+ if err != nil {
147+ return nil , fmt .Errorf ("failed to create DA batch: %w" , err )
148+ }
149+
150+ return daBatch .Blob (), nil
151+
152+ }
0 commit comments