From c39b4714361ff869d75f87994c73732e39a327e7 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Thu, 18 Mar 2021 10:31:49 +0200 Subject: [PATCH 1/4] Initial server implementation of split-uploads for thick client --- go.mod | 2 +- pkg/api/controller.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/go.mod b/go.mod index a247424b478..17a79c140b3 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,7 @@ require ( github.com/klauspost/compress v1.11.12 // indirect github.com/lunixbochs/vtclean v1.0.0 // indirect github.com/manifoldco/promptui v0.8.0 - github.com/matoous/go-nanoid/v2 v2.0.0 + github.com/matoous/go-nanoid/v2 v2.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 github.com/ory/dockertest/v3 v3.6.3 github.com/prometheus/client_golang v1.9.0 diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 1aec70e73a1..60385acda23 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -1506,7 +1506,6 @@ func (c *Controller) StagingLinkPhysicalAddressHandler() staging.LinkPhysicalAdd repo, err := c.Catalog.GetRepository(ctx, params.Repository) if errors.Is(err, catalog.ErrNotFound) { return staging.NewLinkPhysicalAddressNotFound().WithPayload(responseErrorFrom(err)) - } if err != nil { return staging.NewLinkPhysicalAddressDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err)) } From a12c1e38f0a27a468980ddd355404e3434524550 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 24 Mar 2021 11:35:38 +0200 Subject: [PATCH 2/4] Support non-seekable stdin (`-` arg) in "fs upload" command Parity with `aws s3 cp - ...` on this. --- cmd/lakectl/cmd/fs.go | 4 +++- cmd/lakectl/cmd/input.go | 45 ++++++++++++++++++++++++++++++++++++++-- cmd/lakectl/cmd/root.go | 6 ++++++ cmd/lakectl/cmd/sst.go | 5 ----- pkg/api/client.go | 28 +++++++++++++++++-------- 5 files changed, 71 insertions(+), 17 deletions(-) diff --git a/cmd/lakectl/cmd/fs.go b/cmd/lakectl/cmd/fs.go index 56e5b7c63ef..84964175d36 100644 --- a/cmd/lakectl/cmd/fs.go +++ b/cmd/lakectl/cmd/fs.go @@ -113,7 +113,9 @@ var fsCatCmd = &cobra.Command{ func upload(ctx context.Context, client api.Client, sourcePathname string, destURI *uri.URI, direct bool) (*models.ObjectStats, error) { fp := OpenByPath(sourcePathname) defer func() { - _ = fp.Close() + if err := fp.Close(); err != nil { + DieErr(fmt.Errorf("close: %w", err)) + } }() if direct { diff --git a/cmd/lakectl/cmd/input.go b/cmd/lakectl/cmd/input.go index 63422f0195b..75d3fbd11ff 100644 --- a/cmd/lakectl/cmd/input.go +++ b/cmd/lakectl/cmd/input.go @@ -1,6 +1,7 @@ package cmd import ( + "fmt" "io" "os" @@ -47,10 +48,50 @@ func (nc *nopCloser) Close() error { return nil } -// OpenByPath returns a reader from the given path. If path is "-", it'll return Stdin +// deleteOnClose wraps a File to be a ReadSeekCloser that deletes itself when closed. +type deleteOnClose struct { + *os.File +} + +func (d *deleteOnClose) Read(p []byte) (n int, err error) { + return d.File.Read(p) +} + +func (d *deleteOnClose) Seek(offset int64, whence int) (int64, error) { + return d.File.Seek(offset, whence) +} + +func (d *deleteOnClose) Close() error { + if err := os.Remove(d.Name()); err != nil { + d.File.Close() // Close failure is unimportant on read, but data definitely stays! + return fmt.Errorf("delete on close: %w", err) + } + return d.File.Close() +} + +// OpenByPath returns a reader from the given path. If path is "-", it consumes Stdin and +// opens a readable copy that is either deleted (POSIX) or will delete itself on close +// (non-POSIX, notably WINs). func OpenByPath(path string) io.ReadSeekCloser { if path == StdinFileName { - // read from stdin + if !isSeekable(os.Stdin) { + temp, err := os.CreateTemp("", "lakectl-stdin") + if err != nil { + DieErr(fmt.Errorf("create temporary file to buffer stdin: %w", err)) + } + if _, err = io.Copy(temp, os.Stdin); err != nil { + DieErr(fmt.Errorf("copy stdin to temporary file: %w", err)) + } + if _, err = temp.Seek(0, io.SeekStart); err != nil { + DieErr(fmt.Errorf("rewind temporary copied file: %w", err)) + } + // Try to delete the file. This will fail on Windows, we shall try to + // delete on close anyway. + if os.Remove(temp.Name()) != nil { + return &deleteOnClose{temp} + } + return temp + } return &nopCloser{os.Stdin} } fp, err := os.Open(path) diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 99415bf54a1..b9dc565cfb6 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -80,6 +80,12 @@ func getClient() api.Client { return client } +// isSeekable returns true if f.Seek appears to work. +func isSeekable(f io.Seeker) bool { + _, err := f.Seek(0, io.SeekCurrent) + return err == nil // a little naive, but probably good enough for its purpose +} + // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { diff --git a/cmd/lakectl/cmd/sst.go b/cmd/lakectl/cmd/sst.go index 2935c871094..46d349d8f48 100644 --- a/cmd/lakectl/cmd/sst.go +++ b/cmd/lakectl/cmd/sst.go @@ -17,11 +17,6 @@ import ( "google.golang.org/protobuf/proto" ) -func isSeekable(f io.Seeker) bool { - _, err := f.Seek(0, io.SeekCurrent) - return err == nil // a little naive, but probably good enough for its purpose -} - func readStdin() (pebblesst.ReadableFile, error) { // test if stdin is seekable if isSeekable(os.Stdin) { diff --git a/pkg/api/client.go b/pkg/api/client.go index 4899c2b6a04..b5d65924585 100644 --- a/pkg/api/client.go +++ b/pkg/api/client.go @@ -742,6 +742,20 @@ func (c *client) StageObject(ctx context.Context, repoID, branchID, path string, return resp.GetPayload(), nil } +// readSize returns the size of r. +func readSize(r io.Seeker) (int64, error) { + cur, err := r.Seek(0, io.SeekCurrent) + if err != nil { + return 0, fmt.Errorf("tell: %w", err) + } + end, err := r.Seek(0, io.SeekEnd) + if err != nil { + return 0, fmt.Errorf("seek to end: %w", err) + } + _, err = r.Seek(cur, io.SeekStart) + return end, err +} + func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string, metadata map[string]string, contents io.ReadSeeker) (*models.ObjectStats, error) { resp, err := c.remote.Staging.GetPhysicalAddress(&staging.GetPhysicalAddressParams{ Repository: repoID, @@ -754,6 +768,11 @@ func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string } stagingLocation := resp.GetPayload() + size, err := readSize(contents) + if err != nil { + return nil, fmt.Errorf("readSize: %w", err) + } + for { // Return from inside loop physicalAddress, err := url.Parse(stagingLocation.PhysicalAddress) @@ -786,15 +805,6 @@ func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string return nil, fmt.Errorf("upload to backing store %v: %w", physicalAddress, err) } - size, err := contents.Seek(0, io.SeekEnd) - if err != nil { - return nil, fmt.Errorf("read size: %w", err) - } - _, err = contents.Seek(0, io.SeekStart) - if err != nil { - return nil, fmt.Errorf("rewind: %w", err) - } - _, err = c.remote.Staging.LinkPhysicalAddress(&staging.LinkPhysicalAddressParams{ Repository: repoID, Branch: branchID, From 063ed0049c05bd0537e81ef5c3d731c85120b8b4 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 24 Mar 2021 15:36:59 +0200 Subject: [PATCH 3/4] go fmt --- cmd/lakectl/cmd/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/lakectl/cmd/input.go b/cmd/lakectl/cmd/input.go index 75d3fbd11ff..b090884f414 100644 --- a/cmd/lakectl/cmd/input.go +++ b/cmd/lakectl/cmd/input.go @@ -63,7 +63,7 @@ func (d *deleteOnClose) Seek(offset int64, whence int) (int64, error) { func (d *deleteOnClose) Close() error { if err := os.Remove(d.Name()); err != nil { - d.File.Close() // Close failure is unimportant on read, but data definitely stays! + d.File.Close() // Close failure is unimportant on read, but data definitely stays! return fmt.Errorf("delete on close: %w", err) } return d.File.Close() @@ -88,7 +88,7 @@ func OpenByPath(path string) io.ReadSeekCloser { // Try to delete the file. This will fail on Windows, we shall try to // delete on close anyway. if os.Remove(temp.Name()) != nil { - return &deleteOnClose{temp} + return &deleteOnClose{temp} } return temp } From c3494f0f0ea1bd762acdb923e85c31a9c95b0e4e Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 24 Mar 2021 15:40:07 +0200 Subject: [PATCH 4/4] [CR] Clean up deleteOnClose and clarify Close implementation comment --- cmd/lakectl/cmd/input.go | 10 +--------- pkg/api/controller.go | 1 + 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/cmd/lakectl/cmd/input.go b/cmd/lakectl/cmd/input.go index b090884f414..0174e6d5687 100644 --- a/cmd/lakectl/cmd/input.go +++ b/cmd/lakectl/cmd/input.go @@ -53,17 +53,9 @@ type deleteOnClose struct { *os.File } -func (d *deleteOnClose) Read(p []byte) (n int, err error) { - return d.File.Read(p) -} - -func (d *deleteOnClose) Seek(offset int64, whence int) (int64, error) { - return d.File.Seek(offset, whence) -} - func (d *deleteOnClose) Close() error { if err := os.Remove(d.Name()); err != nil { - d.File.Close() // Close failure is unimportant on read, but data definitely stays! + d.File.Close() // "Only" file descriptor leak if close fails (but data might stay). return fmt.Errorf("delete on close: %w", err) } return d.File.Close() diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 60385acda23..1aec70e73a1 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -1506,6 +1506,7 @@ func (c *Controller) StagingLinkPhysicalAddressHandler() staging.LinkPhysicalAdd repo, err := c.Catalog.GetRepository(ctx, params.Repository) if errors.Is(err, catalog.ErrNotFound) { return staging.NewLinkPhysicalAddressNotFound().WithPayload(responseErrorFrom(err)) + } if err != nil { return staging.NewLinkPhysicalAddressDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err)) }