diff --git a/cmd/lakectl/cmd/fs.go b/cmd/lakectl/cmd/fs.go index 56e5b7c63ef..84964175d36 100644 --- a/cmd/lakectl/cmd/fs.go +++ b/cmd/lakectl/cmd/fs.go @@ -113,7 +113,9 @@ var fsCatCmd = &cobra.Command{ func upload(ctx context.Context, client api.Client, sourcePathname string, destURI *uri.URI, direct bool) (*models.ObjectStats, error) { fp := OpenByPath(sourcePathname) defer func() { - _ = fp.Close() + if err := fp.Close(); err != nil { + DieErr(fmt.Errorf("close: %w", err)) + } }() if direct { diff --git a/cmd/lakectl/cmd/input.go b/cmd/lakectl/cmd/input.go index 63422f0195b..0174e6d5687 100644 --- a/cmd/lakectl/cmd/input.go +++ b/cmd/lakectl/cmd/input.go @@ -1,6 +1,7 @@ package cmd import ( + "fmt" "io" "os" @@ -47,10 +48,42 @@ func (nc *nopCloser) Close() error { return nil } -// OpenByPath returns a reader from the given path. If path is "-", it'll return Stdin +// deleteOnClose wraps a File to be a ReadSeekCloser that deletes itself when closed. +type deleteOnClose struct { + *os.File +} + +func (d *deleteOnClose) Close() error { + if err := os.Remove(d.Name()); err != nil { + d.File.Close() // "Only" file descriptor leak if close fails (but data might stay). + return fmt.Errorf("delete on close: %w", err) + } + return d.File.Close() +} + +// OpenByPath returns a reader from the given path. If path is "-", it consumes Stdin and +// opens a readable copy that is either deleted (POSIX) or will delete itself on close +// (non-POSIX, notably WINs). func OpenByPath(path string) io.ReadSeekCloser { if path == StdinFileName { - // read from stdin + if !isSeekable(os.Stdin) { + temp, err := os.CreateTemp("", "lakectl-stdin") + if err != nil { + DieErr(fmt.Errorf("create temporary file to buffer stdin: %w", err)) + } + if _, err = io.Copy(temp, os.Stdin); err != nil { + DieErr(fmt.Errorf("copy stdin to temporary file: %w", err)) + } + if _, err = temp.Seek(0, io.SeekStart); err != nil { + DieErr(fmt.Errorf("rewind temporary copied file: %w", err)) + } + // Try to delete the file. This will fail on Windows, we shall try to + // delete on close anyway. + if os.Remove(temp.Name()) != nil { + return &deleteOnClose{temp} + } + return temp + } return &nopCloser{os.Stdin} } fp, err := os.Open(path) diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 99415bf54a1..b9dc565cfb6 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -80,6 +80,12 @@ func getClient() api.Client { return client } +// isSeekable returns true if f.Seek appears to work. +func isSeekable(f io.Seeker) bool { + _, err := f.Seek(0, io.SeekCurrent) + return err == nil // a little naive, but probably good enough for its purpose +} + // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { diff --git a/cmd/lakectl/cmd/sst.go b/cmd/lakectl/cmd/sst.go index 2935c871094..46d349d8f48 100644 --- a/cmd/lakectl/cmd/sst.go +++ b/cmd/lakectl/cmd/sst.go @@ -17,11 +17,6 @@ import ( "google.golang.org/protobuf/proto" ) -func isSeekable(f io.Seeker) bool { - _, err := f.Seek(0, io.SeekCurrent) - return err == nil // a little naive, but probably good enough for its purpose -} - func readStdin() (pebblesst.ReadableFile, error) { // test if stdin is seekable if isSeekable(os.Stdin) { diff --git a/go.mod b/go.mod index a247424b478..17a79c140b3 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,7 @@ require ( github.com/klauspost/compress v1.11.12 // indirect github.com/lunixbochs/vtclean v1.0.0 // indirect github.com/manifoldco/promptui v0.8.0 - github.com/matoous/go-nanoid/v2 v2.0.0 + github.com/matoous/go-nanoid/v2 v2.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 github.com/ory/dockertest/v3 v3.6.3 github.com/prometheus/client_golang v1.9.0 diff --git a/pkg/api/client.go b/pkg/api/client.go index 4899c2b6a04..b5d65924585 100644 --- a/pkg/api/client.go +++ b/pkg/api/client.go @@ -742,6 +742,20 @@ func (c *client) StageObject(ctx context.Context, repoID, branchID, path string, return resp.GetPayload(), nil } +// readSize returns the size of r. +func readSize(r io.Seeker) (int64, error) { + cur, err := r.Seek(0, io.SeekCurrent) + if err != nil { + return 0, fmt.Errorf("tell: %w", err) + } + end, err := r.Seek(0, io.SeekEnd) + if err != nil { + return 0, fmt.Errorf("seek to end: %w", err) + } + _, err = r.Seek(cur, io.SeekStart) + return end, err +} + func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string, metadata map[string]string, contents io.ReadSeeker) (*models.ObjectStats, error) { resp, err := c.remote.Staging.GetPhysicalAddress(&staging.GetPhysicalAddressParams{ Repository: repoID, @@ -754,6 +768,11 @@ func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string } stagingLocation := resp.GetPayload() + size, err := readSize(contents) + if err != nil { + return nil, fmt.Errorf("readSize: %w", err) + } + for { // Return from inside loop physicalAddress, err := url.Parse(stagingLocation.PhysicalAddress) @@ -786,15 +805,6 @@ func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string return nil, fmt.Errorf("upload to backing store %v: %w", physicalAddress, err) } - size, err := contents.Seek(0, io.SeekEnd) - if err != nil { - return nil, fmt.Errorf("read size: %w", err) - } - _, err = contents.Seek(0, io.SeekStart) - if err != nil { - return nil, fmt.Errorf("rewind: %w", err) - } - _, err = c.remote.Staging.LinkPhysicalAddress(&staging.LinkPhysicalAddressParams{ Repository: repoID, Branch: branchID,