Skip to content

Commit

Permalink
priority download (#1399)
Browse files Browse the repository at this point in the history
* add option to select all

* priority download

* fix download with auth ticket
  • Loading branch information
Hitenjain14 authored Feb 22, 2024
1 parent 8f44e17 commit 9430885
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 17 deletions.
5 changes: 5 additions & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ func (a *Allocation) generateDownloadRequest(
// }
downloadReq.contentMode = contentMode
downloadReq.connectionID = connectionID
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))

return downloadReq, nil
}
Expand Down Expand Up @@ -1163,6 +1164,9 @@ func (a *Allocation) processReadMarker(drs []*DownloadRequest) {
for _, dr := range drs {
wg.Add(1)
go func(dr *DownloadRequest) {
if isReadFree {
dr.freeRead = true
}
defer wg.Done()
dr.processDownloadRequest()
var pos uint64
Expand Down Expand Up @@ -2211,6 +2215,7 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str
downloadReq.shouldVerify = verifyDownload
downloadReq.fullconsensus = a.fullconsensus
downloadReq.consensusThresh = a.consensusThreshold
downloadReq.downloadQueue = make(downloadQueue, len(a.Blobbers))
downloadReq.connectionID = zboxutil.NewConnectionId()
downloadReq.completedCallback = func(remotepath string, remotepathHash string) {
a.mutex.Lock()
Expand Down
12 changes: 10 additions & 2 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BlockDownloadRequest struct {
allocationTx string
allocOwnerID string
blobberIdx int
maskIdx int
remotefilepath string
remotefilepathhash string
chunkSize int
Expand All @@ -46,6 +47,7 @@ type BlockDownloadRequest struct {
shouldVerify bool
connectionID string
respBuf []byte
timeRequest bool
}

type downloadResponse struct {
Expand All @@ -59,7 +61,9 @@ type downloadBlock struct {
Success bool `json:"success"`
LatestRM *marker.ReadMarker `json:"latest_rm"`
idx int
maskIdx int
err error
timeTaken int64
}

var downloadBlockChan map[string]chan *BlockDownloadRequest
Expand Down Expand Up @@ -152,8 +156,10 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostC
header.ToFastHeader(httpreq)

err = func() error {
now := time.Now()
statuscode, respBuf, err := hostClient.GetWithRequestTimeout(httpreq, req.respBuf, 30*time.Second)
fasthttp.ReleaseRequest(httpreq)
timeTaken := time.Since(now).Milliseconds()
if err != nil {
zlogger.Logger.Error("Error downloading block: ", err)
return err
Expand Down Expand Up @@ -204,6 +210,8 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostC
}

rspData.idx = req.blobberIdx
rspData.maskIdx = req.maskIdx
rspData.timeTaken = timeTaken
rspData.Success = true

if req.encryptedKey != "" {
Expand Down Expand Up @@ -237,13 +245,13 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostC
retry++
continue
} else {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err}
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err, maskIdx: req.maskIdx}
}
}
return
}

req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err}
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err, maskIdx: req.maskIdx}

}

Expand Down
87 changes: 73 additions & 14 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sdk

import (
"bytes"
"container/heap"
"context"
"crypto/md5"
"encoding/hex"
Expand Down Expand Up @@ -96,13 +97,44 @@ type DownloadRequest struct {
blocksPerShard int64
connectionID string
skip bool
freeRead bool
fRef *fileref.FileRef
chunksPerShard int64
size int64
offset int64
bufferMap map[int]zboxutil.DownloadBuffer
downloadStorer DownloadProgressStorer
workdir string
downloadQueue downloadQueue
}

type downloadPriority struct {
timeTaken int64
blobberIdx int
}

type downloadQueue []downloadPriority

func (pq downloadQueue) Len() int { return len(pq) }

func (pq downloadQueue) Less(i, j int) bool {
return pq[i].timeTaken < pq[j].timeTaken
}

func (pq downloadQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *downloadQueue) Push(x interface{}) {
*pq = append(*pq, x.(downloadPriority))
}

func (pq *downloadQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}

type DownloadProgress struct {
Expand All @@ -121,7 +153,7 @@ func (req *DownloadRequest) removeFromMask(pos uint64) {
req.maskMu.Unlock()
}

func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int64) ([][][]byte, error) {
func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int64, timeRequest bool) ([][][]byte, error) {
shards := make([][][]byte, totalBlock)
for i := range shards {
shards[i] = make([][]byte, len(req.blobbers))
Expand All @@ -139,7 +171,7 @@ func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int
curReqDownloads := requiredDownloads
for {
remainingMask, failed, downloadErrors, err = req.downloadBlock(
startBlock, totalBlock, mask, curReqDownloads, shards)
startBlock, totalBlock, mask, curReqDownloads, shards, timeRequest)
if err != nil {
return nil, err
}
Expand All @@ -162,9 +194,9 @@ func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int

// getBlocksData will get data blocks for some interval from minimal blobers and aggregate them and
// return to the caller
func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64) ([][][]byte, error) {
func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64, timeRequest bool) ([][][]byte, error) {

shards, err := req.getBlocksDataFromBlobbers(startBlock, totalBlock)
shards, err := req.getBlocksDataFromBlobbers(startBlock, totalBlock, timeRequest)
if err != nil {
return nil, err
}
Expand All @@ -190,7 +222,7 @@ func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64) ([][][]b
func (req *DownloadRequest) downloadBlock(
startBlock, totalBlock int64,
mask zboxutil.Uint128, requiredDownloads int,
shards [][][]byte) (zboxutil.Uint128, int, []string, error) {
shards [][][]byte, timeRequest bool) (zboxutil.Uint128, int, []string, error) {

var remainingMask zboxutil.Uint128
activeBlobbers := mask.CountOnes()
Expand All @@ -199,6 +231,9 @@ func (req *DownloadRequest) downloadBlock(
fmt.Sprintf("Required downloads %d, remaining active blobber %d",
req.consensusThresh, activeBlobbers))
}
if timeRequest {
requiredDownloads = activeBlobbers
}
rspCh := make(chan *downloadBlock, requiredDownloads)

var (
Expand All @@ -207,15 +242,22 @@ func (req *DownloadRequest) downloadBlock(
skipDownload bool
)

for i := req.downloadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
for i := mask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
if c == requiredDownloads {
remainingMask = i
break
}

pos = uint64(i.TrailingZeros())
blobberIdx := req.downloadQueue[pos].blobberIdx
blockDownloadReq := &BlockDownloadRequest{
allocationID: req.allocationID,
allocationTx: req.allocationTx,
allocOwnerID: req.allocOwnerID,
authTicket: req.authTicket,
blobber: req.blobbers[pos],
blobberIdx: int(pos),
blobber: req.blobbers[blobberIdx],
blobberIdx: blobberIdx,
maskIdx: int(pos),
chunkSize: req.chunkSize,
blockNum: startBlock,
contentMode: req.contentMode,
Expand All @@ -227,6 +269,7 @@ func (req *DownloadRequest) downloadBlock(
encryptedKey: req.encryptedKey,
shouldVerify: req.shouldVerify,
connectionID: req.connectionID,
timeRequest: timeRequest,
}

if blockDownloadReq.blobber.IsSkip() {
Expand All @@ -248,10 +291,7 @@ func (req *DownloadRequest) downloadBlock(
}

c++
if c == requiredDownloads {
remainingMask = i
break
}

}

var failed int32
Expand All @@ -265,11 +305,13 @@ func (req *DownloadRequest) downloadBlock(
defer func() {
if err != nil {
atomic.AddInt32(&failed, 1)
req.removeFromMask(uint64(result.idx))
req.removeFromMask(uint64(result.maskIdx))
downloadErrors[i] = fmt.Sprintf("Error %s from %s",
err.Error(), req.blobbers[result.idx].Baseurl)
logger.Logger.Error(err)
req.bufferMap[result.idx].ReleaseChunk(int(req.startBlock / req.numBlocks))
} else {
req.downloadQueue[result.idx].timeTaken = result.timeTaken
}
wg.Done()
}()
Expand Down Expand Up @@ -634,16 +676,26 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
}

var progressLock sync.Mutex
firstReqWG := sync.WaitGroup{}
firstReqWG.Add(1)
eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(downloadWorkerCount + EXTRA_COUNT)
for i := 0; i < n; i++ {
j := i
if i == 1 {
firstReqWG.Wait()
heap.Init(&req.downloadQueue)
}
eg.Go(func() error {

if j == 0 {
defer firstReqWG.Done()
}
blocksToDownload := numBlocks
if startBlock+int64(j)*numBlocks+numBlocks > endBlock {
blocksToDownload = endBlock - (startBlock + int64(j)*numBlocks)
}
data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload)
data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0)
if req.isDownloadCanceled {
return errors.New("download_abort", "Download aborted by user")
}
Expand Down Expand Up @@ -1113,6 +1165,9 @@ func (req *DownloadRequest) getFileMetaConsensus(fMetaResp []*fileMetaResponse)
if countThreshold > req.fullconsensus {
countThreshold = req.consensusThresh
}
if req.freeRead {
countThreshold = req.fullconsensus
}
for i := 0; i < len(fMetaResp); i++ {
fmr := fMetaResp[i]
if fmr.err != nil || fmr.fileref == nil {
Expand Down Expand Up @@ -1146,6 +1201,10 @@ func (req *DownloadRequest) getFileMetaConsensus(fMetaResp []*fileMetaResponse)
}
shift := zboxutil.NewUint128(1).Lsh(uint64(fmr.blobberIdx))
foundMask = foundMask.Or(shift)
req.downloadQueue[fmr.blobberIdx] = downloadPriority{
blobberIdx: fmr.blobberIdx,
timeTaken: 60000,
}
blobberCount++
if blobberCount == countThreshold {
break
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (sd *StreamDownload) Read(b []byte) (int, error) {
numBlocks = endInd - startInd
}

data, err := sd.getBlocksData(startInd, numBlocks)
data, err := sd.getBlocksData(startInd, numBlocks, true)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 9430885

Please sign in to comment.