Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write blob based on session id #4615

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-blobstore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: write blob based on session id

Decomposedfs now uses the session id and size when moving an uplode to the blobstore. This fixes a cornercase that prevents an upload session from correctly being finished when another upload session to the file was started and already finished.

https://github.com/cs3org/reva/pull/4615
13 changes: 10 additions & 3 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,16 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {

now := time.Now()
if failed {
// propagate sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
log.Error().Err(err).Str("node", n.ID).Str("uploadID", ev.UploadID).Msg("reading node for session failed")
}
if latestSession == session.ID() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why do we only revert the size diff if the failed session is the latest session? At a first glance we seem to unconditionally apply the size diff for all uploads, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. The size diff that is propagated before the postprocessing starts is calculated against the current size of the node.

hm... which could be either the original size or the size of the currently being processed upload, depending on the timing. Ugh...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When initiating an upload we always update the file size. If a new upload is initiated after postprocessing of the first started but before it ended we must not change the size back, or the size of the node will be wrong because the new upload session has a size diff that is based on the size of the node. so we must not revert it.

// propagate reverted sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
}
}
} else if p := getParent(); p != nil {
// update parent tmtime to propagate etag change after successful postprocessing
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,12 @@ func (n *Node) IsProcessing(ctx context.Context) bool {
return err == nil && strings.HasPrefix(v, ProcessingStatus)
}

// ProcessingID returns the latest upload session id
func (n *Node) ProcessingID(ctx context.Context) (string, error) {
v, err := n.XattrString(ctx, prefixes.StatusPrefix)
return strings.TrimPrefix(v, ProcessingStatus), err
}

// IsSpaceRoot checks if the node is a space root
func (n *Node) IsSpaceRoot(ctx context.Context) bool {
_, err := n.Xattr(ctx, prefixes.SpaceNameAttr)
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/utils/decomposedfs/upload/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,15 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
}

var f *lockedfile.File
if session.NodeExists() {
if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node
// we do not need to propagate a change when a node is created, only when the upload is ready.
// that still creates problems for desktop clients because if another change causes propagation it will detects an empty file
// so the first upload has to point to the first revision with the expected size. The file cannot be downloaded, but it can be overwritten (which will create a new revision and make the node reflect the latest revision)
// any finished postprocessing will not affect the node metadata.
// *thinking* but then initializing an upload will lock the file until the upload has finished. That sucks.
// so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists())
// FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter!

f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size()))
if f != nil {
appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode")
Expand Down
27 changes: 17 additions & 10 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,12 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo
func (session *OcisSession) Finalize() (err error) {
ctx, span := tracer.Start(session.Context(context.Background()), "Finalize")
defer span.End()
n, err := session.Node(ctx)
if err != nil {
return err
}

revisionNode := &node.Node{SpaceID: session.SpaceID(), BlobID: session.ID(), Blobsize: session.Size()}

// upload the data to the blobstore
_, subspan := tracer.Start(ctx, "WriteBlob")
err = session.store.tp.WriteBlob(n, session.binPath())
err = session.store.tp.WriteBlob(revisionNode, session.binPath())
subspan.End()
if err != nil {
return errors.Wrap(err, "failed to upload file to blobstore")
Expand Down Expand Up @@ -318,12 +316,12 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
ctx := session.Context(context.Background())

if revertNodeMetadata {
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if session.NodeExists() {
p := session.info.MetaData["versionsPath"]
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
Expand All @@ -339,7 +337,16 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
}

} else {
session.removeNode(ctx)
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
}
// FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
}
}

Expand Down
75 changes: 66 additions & 9 deletions pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/mocks"
Expand Down Expand Up @@ -61,7 +62,8 @@ var _ = Describe("Async file uploads", Ordered, func() {
Username: "username",
}

fileContent = []byte("0123456789")
fileContent = []byte("0123456789")
file2Content = []byte("01234567890123456789")

ctx = ruser.ContextSetUser(context.Background(), user)

Expand All @@ -84,8 +86,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

o, err = options.New(map[string]interface{}{
"root": tmpRoot,
"asyncfileuploads": true,
"root": tmpRoot,
"asyncfileuploads": true,
"treetime_accounting": true,
"treesize_accounting": true,
})
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -141,10 +145,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
n := args.Get(0).(*node.Node)
data, err := os.ReadFile(args.Get(1).(string))

Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(fileContent))
Expect(len(data)).To(Equal(int(n.Blobsize)))
})

// start upload of a file
Expand Down Expand Up @@ -412,7 +416,7 @@ var _ = Describe("Async file uploads", Ordered, func() {

JustBeforeEach(func() {
// upload again
uploadIds, err := fs.InitiateUpload(ctx, ref, 10, map[string]string{})
uploadIds, err := fs.InitiateUpload(ctx, ref, 20, map[string]string{})
Expect(err).ToNot(HaveOccurred())
Expect(len(uploadIds)).To(Equal(2))
Expect(uploadIds["simple"]).ToNot(BeEmpty())
Expand All @@ -422,8 +426,8 @@ var _ = Describe("Async file uploads", Ordered, func() {

_, err = fs.Upload(ctx, storage.UploadRequest{
Ref: uploadRef,
Body: io.NopCloser(bytes.NewReader(fileContent)),
Length: int64(len(fileContent)),
Body: io.NopCloser(bytes.NewReader(file2Content)),
Length: int64(len(file2Content)),
}, nil)
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -456,7 +460,7 @@ var _ = Describe("Async file uploads", Ordered, func() {
})

It("removes processing status when second upload is finished, even if first isn't", func() {
// finish postprocessing
// finish postprocessing of second upload
con <- events.PostprocessingFinished{
UploadID: secondUploadID,
Outcome: events.PPOutcomeContinue,
Expand All @@ -475,5 +479,58 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(item.Path).To(Equal(ref.Path))
Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal(""))
})

It("correctly calculates the size when the second upload is finishes, even if first is deleted", func() {
// finish postprocessing of second upload
con <- events.PostprocessingFinished{
UploadID: secondUploadID,
Outcome: events.PPOutcomeContinue,
}
// wait for upload to be ready
ev, ok := (<-pub).(events.UploadReady)
Expect(ok).To(BeTrue())
Expect(ev.Failed).To(BeFalse())

// check processing status
resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(len(resources)).To(Equal(1))

item := resources[0]
Expect(item.Path).To(Equal(ref.Path))
Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal(""))

// size should match the second upload
Expect(item.Size).To(Equal(uint64(len(file2Content))))

// parent size should match second upload as well
parentInfo, err := fs.GetMD(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(parentInfo.Size).To(Equal(uint64(len(file2Content))))

// finish postprocessing of first upload
con <- events.PostprocessingFinished{
UploadID: uploadID,
Outcome: events.PPOutcomeDelete,
// Outcome: events.PPOutcomeAbort, // This as well ... fck
}
// wait for upload to be ready
ev, ok = (<-pub).(events.UploadReady)
Expect(ok).To(BeTrue())
Expect(ev.Failed).To(BeTrue())

// check processing status
resources, err = fs.ListFolder(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(len(resources)).To(Equal(1))

// size should still match the second upload
Expect(item.Size).To(Equal(uint64(len(file2Content))))

// parent size should still match second upload as well
parentInfo, err = fs.GetMD(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(parentInfo.Size).To(Equal(uint64(len(file2Content))))
})
})
})
Loading