Skip to content

Commit

Permalink
allow to restart postprocessing
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Jul 5, 2023
1 parent 0ab7209 commit 8810b78
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/restart-postprocessing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Restart Postprocessing

Resend the `BytesReady` event if instructed.

https://github.com/cs3org/reva/pull/4039
13 changes: 13 additions & 0 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,16 @@ func (ResumePostprocessing) Unmarshal(v []byte) (interface{}, error) {
err := json.Unmarshal(v, &e)
return e, err
}

// RestartPostprocessing will be emitted by postprocessing service if it doesn't know about an upload
type RestartPostprocessing struct {
UploadID string
Timestamp *types.Timestamp
}

// Unmarshal to fulfill umarshaller interface
func (RestartPostprocessing) Unmarshal(v []byte) (interface{}, error) {
e := RestartPostprocessing{}
err := json.Unmarshal(v, &e)
return e, err
}
29 changes: 29 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"strings"
"time"

user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
Expand Down Expand Up @@ -304,6 +305,34 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}
case events.RestartPostprocessing:
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload")
continue
}
n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
}
s, err := up.URL(up.Ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url")
continue
}
// restart postprocessing
if err := events.Publish(fs.stream, events.BytesReceived{
UploadID: up.Info.ID,
URL: s,
SpaceOwner: n.SpaceOwnerOrManager(up.Ctx),
ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead?
ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID},
Filename: up.Info.Storage["NodeName"],
Filesize: uint64(up.Info.Size),
}); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event")
}

case events.PostprocessingStepFinished:
if ev.FinishedStep != events.PPStepAntivirus {
Expand Down

0 comments on commit 8810b78

Please sign in to comment.