Skip to content

Commit

Permalink
iterate interface, add filter
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Nov 30, 2023
1 parent 479dd9e commit 0ceef88
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 74 deletions.
45 changes: 0 additions & 45 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ import (
"context"
"io"
"net/url"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference)

// FS is the interface to implement access to the storage.
type FS interface {
GetHome(ctx context.Context) (string, error)
Expand Down Expand Up @@ -76,40 +71,6 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}

// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
// ListUploads returns a list of all currently known uploads
// TODO and their processing state
ListUploads() ([]UploadProgress, error)
// PurgeExpiredUploads purges expired uploads
// TODO skip uploads in progress
PurgeExpiredUploads(chan<- UploadProgress) error
// GetUploadProgress returns the upload progress
GetUploadProgress(ctx context.Context, uploadID string) (UploadProgress, error)
}

type UploadProgress interface {
// ID returns the upload id
ID() string
// Filename returns the filename of the file
Filename() string
// Size returns the size of the upload
Size() int64
// Offset returns the current offset
Offset() int64
// Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root
Reference() provider.Reference
// Executant returns the userid of the user that created the upload
Executant() userpb.UserId
// SpaceOwner returns the owner of a space if set. optional
SpaceOwner() *userpb.UserId
// Expires returns the time when the upload cen no longer be used
Expires() time.Time

// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge() error
}

// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {
Expand All @@ -125,9 +86,3 @@ type PathWrapper interface {
Unwrap(ctx context.Context, rp string) (string, error)
Wrap(ctx context.Context, rp string) (string, error)
}

type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}
79 changes: 79 additions & 0 deletions pkg/storage/uploads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package storage

import (
"context"
"io"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference)

type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}

// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
// GetUploadProgress returns the upload progress
ListUploadSessions(ctx context.Context, filter UploadSessionFilter) (UploadSession, error)
}

type UploadSession interface {
// ID returns the upload id
ID() string
// Filename returns the filename of the file
Filename() string
// Size returns the size of the upload
Size() int64
// Offset returns the current offset
Offset() int64
// Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root
Reference() provider.Reference
// Executant returns the userid of the user that created the upload
Executant() userpb.UserId
// SpaceOwner returns the owner of a space if set. optional
SpaceOwner() *userpb.UserId
// Expires returns the time when the upload can no longer be used
Expires() time.Time

// IsProcessing returns true if postprocessing has not finished, yet
// The actual postprocessing state is tracked in the postprocessing service.
IsProcessing() bool
// MalwareDescription returns the scan result returned by the scanner
MalwareDescription() string
// MalwareScanTime returns the timestamp the upload was scanned. Default time means the item has not been scanned
MalwareScanTime() time.Time

// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge() error
}

type UploadSessionFilter struct {
Id *string

Check failure on line 76 in pkg/storage/uploads.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: struct field Id should be ID (revive)
Processing *bool
Expired *bool
}
71 changes: 44 additions & 27 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,32 +243,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
}

// GetUploadProgress returns the metadata for the given upload id
func (fs *Decomposedfs) GetUploadProgress(ctx context.Context, uploadID string) (storage.UploadProgress, error) {
return fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", uploadID+".info"))
}

// ListUploads returns a list of all incomplete uploads
func (fs *Decomposedfs) ListUploads() ([]storage.UploadProgress, error) {
return fs.uploadInfos(context.Background())
}

// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers
func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- storage.UploadProgress) error {
uploads, err := fs.uploadInfos(context.Background())
if err != nil {
return err
func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) {
var sessions []storage.UploadSession
if filter.Id != nil && *filter.Id != "" {
session, err := fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", *filter.Id+".info"))
if err != nil {
return nil, err
}
sessions = []storage.UploadSession{session}
} else {
var err error
sessions, err = fs.uploadSessions(ctx)
if err != nil {
return nil, err
}
}
filteredSessions := []storage.UploadSession{}
now := time.Now()
for _, session := range sessions {
if filter.Processing != nil && *filter.Processing != session.IsProcessing() {
continue
}
if filter.Expired != nil {
if *filter.Expired {
if now.Before(session.Expires()) {
continue
}
} else {
if now.After(session.Expires()) {
continue
}

for _, upload := range uploads {
if time.Now().After(upload.Expires()) {
// TODO check postprocessing state
purgedChan <- upload

_ = upload.Purge()
// TODO use a channel to return errors
}
}
filteredSessions = append(filteredSessions, session)
}
return nil
return filteredSessions, nil
}

// AsTerminatableUpload returns a TerminatableUpload
Expand All @@ -292,8 +302,8 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload
return up.(*upload.Upload)
}

func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]storage.UploadProgress, error) {
uploads := []storage.UploadProgress{}
func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) {
uploads := []storage.UploadSession{}
infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info"))
if err != nil {
return nil, err
Expand All @@ -311,7 +321,7 @@ func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]storage.UploadProgre
return uploads, nil
}

func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadProgress, error) {
func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadSession, error) {
match := _idRegexp.FindStringSubmatch(path)
if match == nil || len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
Expand All @@ -324,9 +334,16 @@ func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (sto
if err != nil {
return nil, err
}
// upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload
n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true)
if err != nil {
return nil, err
}
progress := upload.Progress{
Path: path,
Info: info,
Path: path,
Info: info,
Processing: n.IsProcessing(ctx),
}
_, progress.ScanStatus, progress.ScanTime = n.ScanData(ctx)
return progress, nil
}
17 changes: 15 additions & 2 deletions pkg/storage/utils/decomposedfs/upload/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,11 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look
}

type Progress struct {
Path string
Info tusd.FileInfo
Path string
Info tusd.FileInfo
Processing bool
ScanStatus string
ScanTime time.Time
}

func (p Progress) ID() string {
Expand Down Expand Up @@ -542,6 +545,16 @@ func (p Progress) Expires() time.Time {
return mt
}

func (p Progress) IsProcessing() bool {
return p.Processing
}
func (p Progress) MalwareDescription() string {
return p.ScanStatus
}
func (p Progress) MalwareScanTime() time.Time {
return p.ScanTime
}

func (p Progress) Purge() error {
// TODO we should use the upload id to look up the tus upload and Terminate() that
err := os.Remove(p.Info.Storage["BinPath"])
Expand Down

0 comments on commit 0ceef88

Please sign in to comment.