Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add webstreaming to multiupload #1190

Merged
merged 5 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions mobilesdk/zbox/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type MultiOperationOption struct {
}

type MultiUploadOption struct {
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`
}

type MultiDownloadOption struct {
Expand Down Expand Up @@ -336,6 +337,7 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str
encrypts := make([]bool, totalUploads)
chunkNumbers := make([]int, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)
for idx, option := range options {
filePaths[idx] = option.FilePath
fileNames[idx] = option.FileName
Expand All @@ -344,13 +346,14 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str
chunkNumbers[idx] = option.ChunkNumber
encrypts[idx] = option.Encrypt
isUpdates[idx] = false
isWebstreaming[idx] = option.IsWebstreaming
}

a, err := getAllocation(allocationID)
if err != nil {
return err
}
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb})
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb})

}

Expand All @@ -373,6 +376,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
encrypts := make([]bool, totalUploads)
chunkNumbers := make([]int, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)
for idx, option := range options {
filePaths[idx] = option.FilePath
fileNames[idx] = option.FileName
Expand All @@ -381,6 +385,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
chunkNumbers[idx] = option.ChunkNumber
encrypts[idx] = option.Encrypt
isUpdates[idx] = true
isWebstreaming[idx] = option.IsWebstreaming
}
if err != nil {
return err
Expand All @@ -390,7 +395,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
if err != nil {
return err
}
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb})
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb})

}

Expand Down
12 changes: 7 additions & 5 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ type BulkUploadOption struct {
Webstreaming bool `json:"webstreaming,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsRepair bool `json:"isRepair,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`

NumBlocks int `json:"numBlocks,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Expand Down Expand Up @@ -671,11 +672,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
sdk.WithChunkNumber(numBlocks),
}
operationRequests[idx] = sdk.OperationRequest{
FileMeta: fileMeta,
FileReader: fileReader,
OperationType: FileOperationInsert,
Opts: options,
Workdir: "/",
FileMeta: fileMeta,
FileReader: fileReader,
OperationType: FileOperationInsert,
Opts: options,
Workdir: "/",
IsWebstreaming: option.IsWebstreaming,
}

}
Expand Down
7 changes: 4 additions & 3 deletions winsdk/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ type UploadFile struct {
Path string
ThumbnailPath string

RemotePath string
Encrypt bool
IsUpdate bool
RemotePath string
Encrypt bool
IsUpdate bool
IsWebstreaming bool

ChunkNumber int
}
19 changes: 11 additions & 8 deletions winsdk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ type MultiOperationOption struct {
}

type MultiUploadOption struct {
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`
}

// MultiOperation - do copy, move, delete and createdir operation together
Expand Down Expand Up @@ -323,6 +324,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {
chunkNumbers := make([]int, totalUploads)
encrypts := make([]bool, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)

statusBar := &StatusCallback{
status: make(map[string]*Status),
Expand All @@ -335,6 +337,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {
remotePaths[idx] = option.RemotePath
chunkNumbers[idx] = option.ChunkNumber
isUpdates[idx] = option.IsUpdate
isWebstreaming[idx] = option.IsWebstreaming
encrypts[idx] = option.Encrypt
statusBar.status[option.RemotePath+option.Name] = &Status{}
}
Expand All @@ -346,7 +349,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {

statusCaches.Add(C.GoString(uploadID), statusBar)

err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, statusBar)
err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, statusBar)
if err != nil {
return WithJSON(nil, err)
}
Expand Down
22 changes: 13 additions & 9 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,13 @@ type Allocation struct {
}

type OperationRequest struct {
OperationType string
LocalPath string
RemotePath string
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
OperationType string
LocalPath string
RemotePath string
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
IsWebstreaming bool

// Required for uploads
Workdir string
Expand Down Expand Up @@ -495,7 +496,7 @@ func (a *Allocation) EncryptAndUploadFileWithThumbnail(
)
}

func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, status StatusCallback) error {
func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, isWebstreaming []bool, status StatusCallback) error {
if len(localPaths) != len(thumbnailPaths) {
return errors.New("invalid_value", "length of localpaths and thumbnailpaths must be equal")
}
Expand Down Expand Up @@ -588,6 +589,9 @@ func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileN
if isUpdate[idx] {
operationRequests[idx].OperationType = constants.FileOperationUpdate
}
if isWebstreaming[idx] {
operationRequests[idx].IsWebstreaming = true
}

}
err := a.DoMultiOperation(operationRequests)
Expand Down Expand Up @@ -873,13 +877,13 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationInsert:
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.Opts...)
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.Opts...)

case constants.FileOperationDelete:
operation = NewDeleteOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationUpdate:
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.Opts...)
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.Opts...)

case constants.FileOperationCreateDir:
operation = NewDirOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
Expand Down
6 changes: 4 additions & 2 deletions zboxcore/sdk/upload_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ type UploadOperation struct {
opts []ChunkedUploadOption
refs []*fileref.FileRef
isUpdate bool
isWebstreaming bool
statusCallback StatusCallback
opCode int
}

func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) {
cu, err := CreateChunkedUpload(uo.workdir, allocObj, uo.fileMeta, uo.fileReader, uo.isUpdate, false, false, connectionID, uo.opts...)
cu, err := CreateChunkedUpload(uo.workdir, allocObj, uo.fileMeta, uo.fileReader, uo.isUpdate, false, uo.isWebstreaming, connectionID, uo.opts...)
if err != nil {
uploadMask := zboxutil.NewUint128(1).Lsh(uint64(len(allocObj.Blobbers))).Sub64(1)
return nil, uploadMask, err
Expand Down Expand Up @@ -123,12 +124,13 @@ func (uo *UploadOperation) Error(allocObj *Allocation, consensus int, err error)
}
}

func NewUploadOperation(workdir string, fileMeta FileMeta, fileReader io.Reader, isUpdate bool, opts ...ChunkedUploadOption) *UploadOperation {
func NewUploadOperation(workdir string, fileMeta FileMeta, fileReader io.Reader, isUpdate, isWebstreaming bool, opts ...ChunkedUploadOption) *UploadOperation {
uo := &UploadOperation{}
uo.workdir = workdir
uo.fileMeta = fileMeta
uo.fileReader = fileReader
uo.opts = opts
uo.isUpdate = isUpdate
uo.isWebstreaming = isWebstreaming
return uo
}