@@ -2,6 +2,7 @@ package blob_uploader
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67
78 "github.com/prometheus/client_golang/prometheus"
@@ -67,7 +68,7 @@ func (b *BlobUploader) UploadBlobToS3() {
6768 }
6869
6970 // get un-uploaded batches from database in ascending order by their index.
70- dbBatch , err := b .batchOrm . GetFirstUnuploadedBatchByPlatform (b .ctx , b .cfg .StartBatch , types .BlobStoragePlatformS3 )
71+ dbBatch , err := b .GetFirstUnuploadedBatchByPlatform (b .ctx , b .cfg .StartBatch , types .BlobStoragePlatformS3 )
7172 if err != nil {
7273 log .Error ("Failed to fetch unuploaded batch" , "err" , err )
7374 return
@@ -85,7 +86,7 @@ func (b *BlobUploader) UploadBlobToS3() {
8586 if err != nil {
8687 log .Error ("failed to construct constructBlobCodec payload " , "codecVersion" , codecVersion , "batch index" , dbBatch .Index , "err" , err )
8788 b .metrics .rollupBlobUploaderUploadToS3FailedTotal .Inc ()
88- if updateErr := b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , types .BlobStoragePlatformS3 , types .BlobUploadStatusFailed ); updateErr != nil {
89+ if updateErr := b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , dbBatch . Hash , types .BlobStoragePlatformS3 , types .BlobUploadStatusFailed ); updateErr != nil {
8990 log .Error ("failed to update blob upload status to failed" , "batch index" , dbBatch .Index , "err" , updateErr )
9091 }
9192 return
@@ -97,7 +98,7 @@ func (b *BlobUploader) UploadBlobToS3() {
9798 log .Error ("failed to calculate versioned blob hash" , "batch index" , dbBatch .Index , "err" , err )
9899 b .metrics .rollupBlobUploaderUploadToS3FailedTotal .Inc ()
99100 // update status to failed
100- if updateErr := b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , types .BlobStoragePlatformS3 , types .BlobUploadStatusFailed ); updateErr != nil {
101+ if updateErr := b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , dbBatch . Hash , types .BlobStoragePlatformS3 , types .BlobUploadStatusFailed ); updateErr != nil {
101102 log .Error ("failed to update blob upload status to failed" , "batch index" , dbBatch .Index , "err" , updateErr )
102103 }
103104 return
@@ -110,14 +111,14 @@ func (b *BlobUploader) UploadBlobToS3() {
110111 log .Error ("failed to upload blob data to AWS S3" , "batch index" , dbBatch .Index , "versioned blob hash" , key , "err" , err )
111112 b .metrics .rollupBlobUploaderUploadToS3FailedTotal .Inc ()
112113 // update status to failed
113- if updateErr := b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , types .BlobStoragePlatformS3 , types .BlobUploadStatusFailed ); updateErr != nil {
114+ if updateErr := b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , dbBatch . Hash , types .BlobStoragePlatformS3 , types .BlobUploadStatusFailed ); updateErr != nil {
114115 log .Error ("failed to update blob upload status to failed" , "batch index" , dbBatch .Index , "err" , updateErr )
115116 }
116117 return
117118 }
118119
119120 // update status to uploaded
120- if err = b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , types .BlobStoragePlatformS3 , types .BlobUploadStatusUploaded ); err != nil {
121+ if err = b .blobUploadOrm .InsertOrUpdateBlobUpload (b .ctx , dbBatch .Index , dbBatch . Hash , types .BlobStoragePlatformS3 , types .BlobUploadStatusUploaded ); err != nil {
121122 log .Error ("failed to update blob upload status to uploaded" , "batch index" , dbBatch .Index , "err" , err )
122123 b .metrics .rollupBlobUploaderUploadToS3FailedTotal .Inc ()
123124 return
@@ -195,3 +196,51 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er
195196
196197 return daBatch .Blob (), nil
197198}
199+
200+ // GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
201+ // The batch must have a commit_tx_hash (committed).
202+ func (b * BlobUploader ) GetFirstUnuploadedBatchByPlatform (ctx context.Context , startBatch uint64 , platform types.BlobStoragePlatform ) (* orm.Batch , error ) {
203+ batchIndex , err := b .blobUploadOrm .GetFirstUnuploadedBatchIndexByPlatform (ctx , startBatch , platform )
204+ if err != nil {
205+ return nil , err
206+ }
207+
208+ var batch * orm.Batch
209+ for {
210+ var err error
211+ batch , err = b .batchOrm .GetBatchByIndex (ctx , batchIndex )
212+ if err != nil {
213+ if errors .Is (err , gorm .ErrRecordNotFound ) {
214+ log .Debug ("got batch not proposed for blob uploading" , "batch_index" , batchIndex , "platform" , platform .String ())
215+ return nil , nil
216+ }
217+ return nil , err
218+ }
219+
220+ // to check if the parent batch uploaded
221+ // if no, there is a batch revert happened, we need to fallback to upload previous batch
222+ fields := map [string ]interface {}{
223+ "batch_index = ?" : batchIndex - 1 ,
224+ "batch_hash = ?" : batch .ParentBatchHash ,
225+ "platform = ?" : platform ,
226+ "status = ?" : types .BlobUploadStatusUploaded ,
227+ }
228+ blobUpload , err := b .blobUploadOrm .GetBlobUploads (ctx , fields , nil , 1 )
229+ if err != nil {
230+ return nil , err
231+ }
232+
233+ if len (blobUpload ) == 0 {
234+ batchIndex --
235+ continue
236+ }
237+ break
238+ }
239+
240+ if len (batch .CommitTxHash ) == 0 {
241+ log .Debug ("got batch not committed for blob uploading" , "batch_index" , batchIndex , "platform" , platform .String ())
242+ return nil , nil
243+ }
244+
245+ return batch , nil
246+ }
0 commit comments