Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement checksums in the owncloud storage driver #1629

Merged
merged 1 commit into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/owncloud-storage-checksums.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Implement checksums in the owncloud storage

Implemented checksums in the owncloud storage driver.

https://github.com/cs3org/reva/pull/1629
67 changes: 64 additions & 3 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package owncloud

import (
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
Expand All @@ -35,6 +36,7 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/grpc/services/storageprovider"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/logger"
Expand Down Expand Up @@ -72,8 +74,9 @@ const (
mdPrefix string = ocPrefix + "md." // arbitrary metadata
favPrefix string = ocPrefix + "fav." // favorite flag, per user
etagPrefix string = ocPrefix + "etag." // allow overriding a calculated etag with one from the extended attributes
// checksumPrefix string = ocPrefix + "cs." // TODO add checksum support

checksumPrefix string = ocPrefix + "cs."
checksumsKey string = "http://owncloud.org/ns/checksums"
favoriteKey string = "http://owncloud.org/ns/favorite"
)

var defaultPermissions *provider.ResourcePermissions = &provider.ResourcePermissions{
Expand Down Expand Up @@ -612,7 +615,6 @@ func (fs *ocfs) convertToResourceInfo(ctx context.Context, fi os.FileInfo, ip st

metadata := map[string]string{}

favoriteKey := "http://owncloud.org/ns/favorite"
if _, ok := mdKeysMap[favoriteKey]; returnAllKeys || ok {
favorite := ""
if u, ok := user.ContextGetUser(ctx); ok {
Expand Down Expand Up @@ -682,6 +684,16 @@ func (fs *ocfs) convertToResourceInfo(ctx context.Context, fi os.FileInfo, ip st

ri.PermissionSet = fs.permissionSet(ctx, ri.Owner)

// checksums
if !fi.IsDir() {
if _, checksumRequested := mdKeysMap[checksumsKey]; returnAllKeys || checksumRequested {
// TODO which checksum was requested? sha1 adler32 or md5? for now hardcode sha1?
readChecksumIntoResourceChecksum(ctx, ip, storageprovider.XSSHA1, ri)
readChecksumIntoOpaque(ctx, ip, storageprovider.XSMD5, ri)
readChecksumIntoOpaque(ctx, ip, storageprovider.XSAdler32, ri)
}
}

return ri
}
func getResourceType(isDir bool) provider.ResourceType {
Expand Down Expand Up @@ -2228,5 +2240,54 @@ func (fs *ocfs) propagate(ctx context.Context, leafPath string) error {
return nil
}

func readChecksumIntoResourceChecksum(ctx context.Context, nodePath, algo string, ri *provider.ResourceInfo) {
v, err := xattr.Get(nodePath, checksumPrefix+algo)
log := appctx.GetLogger(ctx).
Debug().
Err(err).
Str("nodepath", nodePath).
Str("algorithm", algo)
switch {
case err == nil:
ri.Checksum = &provider.ResourceChecksum{
Type: storageprovider.PKG2GRPCXS(algo),
Sum: hex.EncodeToString(v),
}
case isNoData(err):
log.Msg("checksum not set")
case isNotFound(err):
log.Msg("file not found")
default:
log.Msg("could not read checksum")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How nice can it be :-)

}

func readChecksumIntoOpaque(ctx context.Context, nodePath, algo string, ri *provider.ResourceInfo) {
v, err := xattr.Get(nodePath, checksumPrefix+algo)
log := appctx.GetLogger(ctx).
Debug().
Err(err).
Str("nodepath", nodePath).
Str("algorithm", algo)
switch {
case err == nil:
if ri.Opaque == nil {
ri.Opaque = &types.Opaque{
Map: map[string]*types.OpaqueEntry{},
}
}
ri.Opaque.Map[algo] = &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(hex.EncodeToString(v)),
}
case isNoData(err):
log.Msg("checksum not set")
case isNotFound(err):
log.Msg("file not found")
default:
log.Msg("could not read checksum")
}
}

// TODO propagate etag and mtime or append event to history? propagate on disk ...
// - but propagation is a separate task. only if upload was successful ...
100 changes: 90 additions & 10 deletions pkg/storage/fs/owncloud/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ package owncloud

import (
"context"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand All @@ -36,6 +42,8 @@ import (
"github.com/cs3org/reva/pkg/user"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/pkg/xattr"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/pkg/handler"
)

Expand Down Expand Up @@ -363,18 +371,54 @@ func (upload *fileUpload) writeInfo() error {

// FinishUpload finishes an upload and moves the file to the internal destination
func (upload *fileUpload) FinishUpload(ctx context.Context) error {
log := appctx.GetLogger(upload.ctx)

/*
checksum := upload.info.MetaData["checksum"]
if checksum != "" {
// TODO check checksum
s := strings.SplitN(checksum, " ", 2)
if len(s) == 2 {
alg, hash := s[0], s[1]
sha1Sum := make([]byte, 0, 32)
md5Sum := make([]byte, 0, 32)
adler32Sum := make([]byte, 0, 32)
{
sha1h := sha1.New()
md5h := md5.New()
adler32h := adler32.New()
f, err := os.Open(upload.binPath)
if err != nil {
log.Err(err).Msg("Decomposedfs: could not open file for checksumming")
// we can continue if no oc checksum header is set
}
defer f.Close()

}
r1 := io.TeeReader(f, sha1h)
r2 := io.TeeReader(r1, md5h)

if _, err := io.Copy(adler32h, r2); err != nil {
log.Err(err).Msg("Decomposedfs: could not copy bytes for checksumming")
}
*/

sha1Sum = sha1h.Sum(sha1Sum)
md5Sum = md5h.Sum(md5Sum)
adler32Sum = adler32h.Sum(adler32Sum)
}

if upload.info.MetaData["checksum"] != "" {
parts := strings.SplitN(upload.info.MetaData["checksum"], " ", 2)
if len(parts) != 2 {
return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'")
}
var err error
switch parts[0] {
case "sha1":
err = upload.checkHash(parts[1], sha1Sum)
case "md5":
err = upload.checkHash(parts[1], md5Sum)
case "adler32":
err = upload.checkHash(parts[1], adler32Sum)
default:
err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0])
}
if err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we do not require checksums, right? But if the feature "checksums" is activated, shouldn't it be an error/warning if the upload comes completely without checksum? Or is that checked elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as it is right now checksums are optional. I don't know if we even have the flag to "activate" checksums.
Also I'm not sure how much sense it makes to spend too much effort in the owncloud storage driver since we are only using it for the webUI acceptance tests and even them will in the near future need to switch to the ocis storage driver.

This PR is just to make the webUI tests for the Thumbnails PR pass. owncloud/ocis#1898


ip := upload.info.Storage["InternalDestination"]

Expand All @@ -391,7 +435,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}
}

log := appctx.GetLogger(upload.ctx)
err := os.Rename(upload.binPath, ip)
if err != nil {
log.Err(err).Interface("info", upload.info).
Expand All @@ -417,6 +460,11 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}
}

// now try write all checksums
tryWritingChecksum(log, ip, "sha1", sha1Sum)
tryWritingChecksum(log, ip, "md5", md5Sum)
tryWritingChecksum(log, ip, "adler32", adler32Sum)

return upload.fs.propagate(upload.ctx, ip)
}

Expand Down Expand Up @@ -492,3 +540,35 @@ func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Uplo

return
}

func (upload *fileUpload) checkHash(expected string, h []byte) error {
if expected != hex.EncodeToString(h) {
upload.discardChunk()
return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.info.MetaData["checksum"], h))
}
return nil
}

func (upload *fileUpload) discardChunk() {
if err := os.Remove(upload.binPath); err != nil {
if !os.IsNotExist(err) {
appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("binPath", upload.binPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk")
return
}
}
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("infoPath", upload.infoPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk info")
return
}
}
}

func tryWritingChecksum(log *zerolog.Logger, path, algo string, h []byte) {
if err := xattr.Set(path, checksumPrefix+algo, h); err != nil {
log.Err(err).
Str("csType", algo).
Bytes("hash", h).
Msg("ocfs: could not write checksum")
}
}
Loading