Skip to content

Commit

Permalink
only propagate sizediff if no other upload was started
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Apr 9, 2024
1 parent febc159 commit 515f81b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
13 changes: 10 additions & 3 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,16 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {

now := time.Now()
if failed {
// propagate sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
log.Error().Err(err).Str("node", n.ID).Str("uploadID", ev.UploadID).Msg("reading node for session failed")
}
if latestSession == session.ID() {
// propagate reverted sizeDiff after failed postprocessing
if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change")
}
}
} else if p := getParent(); p != nil {
// update parent tmtime to propagate etag change after successful postprocessing
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,12 @@ func (n *Node) IsProcessing(ctx context.Context) bool {
return err == nil && strings.HasPrefix(v, ProcessingStatus)
}

// ProcessingID returns the latest upload session id
func (n *Node) ProcessingID(ctx context.Context) (string, error) {
v, err := n.XattrString(ctx, prefixes.StatusPrefix)
return strings.TrimPrefix(v, ProcessingStatus), err
}

// IsSpaceRoot checks if the node is a space root
func (n *Node) IsSpaceRoot(ctx context.Context) bool {
_, err := n.Xattr(ctx, prefixes.SpaceNameAttr)
Expand Down
18 changes: 13 additions & 5 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,12 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
ctx := session.Context(context.Background())

if revertNodeMetadata {
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if session.NodeExists() {
p := session.info.MetaData["versionsPath"]
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
Expand All @@ -335,7 +335,15 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
}

} else {
session.removeNode(ctx)
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
}
}
}

Expand Down
21 changes: 16 additions & 5 deletions pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

o, err = options.New(map[string]interface{}{
"root": tmpRoot,
"asyncfileuploads": true,
"root": tmpRoot,
"asyncfileuploads": true,
"treetime_accounting": true,
"treesize_accounting": true,
})
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -477,7 +479,7 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal(""))
})

FIt("correctly calculates the size when the second upload is finishes, even if first is deleted", func() {
It("correctly calculates the size when the second upload is finishes, even if first is deleted", func() {
// finish postprocessing of second upload
con <- events.PostprocessingFinished{
UploadID: secondUploadID,
Expand All @@ -500,11 +502,16 @@ var _ = Describe("Async file uploads", Ordered, func() {
// size should match the second upload
Expect(item.Size).To(Equal(uint64(len(file2Content))))

// parent size should match second upload as well
parentInfo, err := fs.GetMD(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(parentInfo.Size).To(Equal(uint64(len(file2Content))))

// finish postprocessing of first upload
con <- events.PostprocessingFinished{
UploadID: uploadID,
// Outcome: events.PPOutcomeDelete, // This will completely delete the file
Outcome: events.PPOutcomeAbort, // This as well ... fck
Outcome: events.PPOutcomeDelete,
// Outcome: events.PPOutcomeAbort, // This as well ... fck
}
// wait for upload to be ready
ev, ok = (<-pub).(events.UploadReady)
Expand All @@ -519,6 +526,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
// size should still match the second upload
Expect(item.Size).To(Equal(uint64(len(file2Content))))

// parent size should still match second upload as well
parentInfo, err = fs.GetMD(ctx, rootRef, []string{}, []string{})
Expect(err).ToNot(HaveOccurred())
Expect(parentInfo.Size).To(Equal(uint64(len(file2Content))))
})
})
})

0 comments on commit 515f81b

Please sign in to comment.