Skip to content

Commit

Permalink
Merge pull request #1392 from 0chain/fix/upload-resume
Browse files Browse the repository at this point in the history
Fix resume upload
  • Loading branch information
dabasov authored Feb 26, 2024
2 parents 5b5fb10 + 264f6c4 commit ebbb3d9
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
hasher := filestore.GetNewCommitHasher(contentSize)
change.hasher = hasher
change.seqPQ = seqpriorityqueue.NewSeqPriorityQueue(contentSize)
hasher.WG.Add(1)
go hasher.Start(connectionObj.ctx, connectionID, connectionObj.AllocationID, fileName, pathHash, change.seqPQ)
saveChange = true
}
Expand All @@ -203,6 +202,7 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
if change.isFinalized {
return false, nil
}

if isFinal {
change.isFinalized = true
change.seqPQ.Done(seqpriorityqueue.UploadData{
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
nodeSie := getNodesSize(fileData.Size, util.MaxMerkleLeavesSize)
fileSize := rStat.Size() - nodeSie - FMTSize
now := time.Now()
err = fileData.Hasher.Wait(conID, allocID, fileData.Name, filePathHash)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
err = fileData.Hasher.Wait(ctx, conID, allocID, fileData.Name, filePathHash)
if err != nil {
return false, common.NewError("hasher_wait_error", err.Error())
}
Expand Down
4 changes: 0 additions & 4 deletions code/go/0chain.net/blobbercore/filestore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ func TestStoreStorageWriteAndCommit(t *testing.T) {
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
seqPQ := seqpriorityqueue.NewSeqPriorityQueue(int64(size))
hasher.WG.Add(1)
go hasher.Start(context.TODO(), test.connID, test.allocID, test.fileName, pathHash, seqPQ)
seqPQ.Done(seqpriorityqueue.UploadData{
Offset: 0,
Expand Down Expand Up @@ -365,7 +364,6 @@ func TestDeletePreCommitDir(t *testing.T) {
nodeSize := getNodesSize(int64(size), util.MaxMerkleLeavesSize)
require.Equal(t, int64(size), tF.Size()-nodeSize-FMTSize)
seqPQ := seqpriorityqueue.NewSeqPriorityQueue(int64(size))
hasher.WG.Add(1)
go hasher.Start(context.TODO(), connID, allocID, fileName, pathHash, seqPQ)
seqPQ.Done(seqpriorityqueue.UploadData{
Offset: 0,
Expand Down Expand Up @@ -401,7 +399,6 @@ func TestDeletePreCommitDir(t *testing.T) {
_, err = os.Stat(tempFilePath)
require.Nil(t, err)
seqPQ = seqpriorityqueue.NewSeqPriorityQueue(int64(size))
hasher.WG.Add(1)
go hasher.Start(context.TODO(), connID, allocID, fileName, pathHash, seqPQ)
seqPQ.Done(seqpriorityqueue.UploadData{
Offset: 0,
Expand Down Expand Up @@ -476,7 +473,6 @@ func TestStorageUploadUpdate(t *testing.T) {
nodeSize := getNodesSize(int64(size), util.MaxMerkleLeavesSize)
require.Equal(t, int64(size), tF.Size()-nodeSize-FMTSize)
seqPQ := seqpriorityqueue.NewSeqPriorityQueue(int64(size))
hasher.WG.Add(1)
go hasher.Start(context.TODO(), connID, allocID, fileName, pathHash, seqPQ)
seqPQ.Done(seqpriorityqueue.UploadData{
Offset: 0,
Expand Down
17 changes: 12 additions & 5 deletions code/go/0chain.net/blobbercore/filestore/tree_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ type CommitHasher struct {
fmt *fixedMerkleTree
vt *validationTree
isInitialized bool
WG sync.WaitGroup
doneChan chan struct{}
hashErr error
dataSize int64
}
Expand All @@ -419,12 +419,13 @@ func GetNewCommitHasher(dataSize int64) *CommitHasher {
c.fmt = getNewFixedMerkleTree()
c.vt = getNewValidationTree(dataSize)
c.isInitialized = true
c.doneChan = make(chan struct{})
c.dataSize = dataSize
return c
}

func (c *CommitHasher) Start(ctx context.Context, connID, allocID, fileName, filePathHash string, seqPQ *seqpriorityqueue.SeqPriorityQueue) {
defer c.WG.Done()
defer close(c.doneChan)
tempFilePath := GetFileStore().GetTempFilePath(allocID, connID, fileName, filePathHash)
f, err := os.Open(tempFilePath)
if err != nil {
Expand Down Expand Up @@ -453,6 +454,8 @@ func (c *CommitHasher) Start(ctx context.Context, connID, allocID, fileName, fil
default:
}
toFinalize = true
} else if pq.DataBytes == 0 {
continue
}
logging.Logger.Info("hasher_pop", zap.Int64("offset", pq.Offset), zap.Int64("dataBytes", pq.DataBytes), zap.Any("toFinalize", toFinalize), zap.Int64("dataSize", c.dataSize), zap.String("filename", fileName), zap.Int64("totalWritten", totalWritten))
bufSize := 2 * BufferSize
Expand Down Expand Up @@ -487,9 +490,13 @@ func (c *CommitHasher) Start(ctx context.Context, connID, allocID, fileName, fil
}
}

func (c *CommitHasher) Wait(connID, allocID, fileName, filePathHash string) error {
c.WG.Wait()
return c.hashErr
func (c *CommitHasher) Wait(ctx context.Context, connID, allocID, fileName, filePathHash string) error {
select {
case <-c.doneChan:
return c.hashErr
case <-ctx.Done():
return ctx.Err()
}
}

func (c *CommitHasher) Write(b []byte) (int, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (pq *SeqPriorityQueue) Done(v UploadData) {

func (pq *SeqPriorityQueue) Popup() UploadData {
pq.lock.Lock()
for pq.queue.Len() == 0 && !pq.done || (pq.queue.Len() > 0 && pq.queue[0].Offset != pq.next) {
for pq.queue.Len() == 0 || (!pq.done && pq.queue[0].Offset > pq.next) {
pq.cv.Wait()
}
if pq.done {
Expand All @@ -91,8 +91,11 @@ func (pq *SeqPriorityQueue) Popup() UploadData {
retItem := UploadData{
Offset: pq.next,
}
for pq.queue.Len() > 0 && pq.queue[0].Offset == pq.next {
for pq.queue.Len() > 0 && pq.queue[0].Offset <= pq.next {
item := heap.Pop(&pq.queue).(UploadData)
if item.Offset < pq.next {
continue
}
pq.next += item.DataBytes
}
retItem.DataBytes = pq.next - retItem.Offset
Expand Down

0 comments on commit ebbb3d9

Please sign in to comment.