Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-seekable stdin (- arg) in "fs upload" command #1672

Merged
merged 4 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ var fsCatCmd = &cobra.Command{
func upload(ctx context.Context, client api.Client, sourcePathname string, destURI *uri.URI, direct bool) (*models.ObjectStats, error) {
fp := OpenByPath(sourcePathname)
defer func() {
_ = fp.Close()
if err := fp.Close(); err != nil {
DieErr(fmt.Errorf("close: %w", err))
}
}()

if direct {
Expand Down
37 changes: 35 additions & 2 deletions cmd/lakectl/cmd/input.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"fmt"
"io"
"os"

Expand Down Expand Up @@ -47,10 +48,42 @@ func (nc *nopCloser) Close() error {
return nil
}

// OpenByPath returns a reader from the given path. If path is "-", it'll return Stdin
// deleteOnClose wraps a File to be a ReadSeekCloser that deletes itself when closed.
type deleteOnClose struct {
*os.File
}

func (d *deleteOnClose) Close() error {
if err := os.Remove(d.Name()); err != nil {
d.File.Close() // "Only" file descriptor leak if close fails (but data might stay).
return fmt.Errorf("delete on close: %w", err)
}
return d.File.Close()
}

// OpenByPath returns a reader from the given path. If path is "-", it consumes Stdin and
// opens a readable copy that is either deleted (POSIX) or will delete itself on close
// (non-POSIX, notably WINs).
func OpenByPath(path string) io.ReadSeekCloser {
if path == StdinFileName {
// read from stdin
if !isSeekable(os.Stdin) {
temp, err := os.CreateTemp("", "lakectl-stdin")
if err != nil {
DieErr(fmt.Errorf("create temporary file to buffer stdin: %w", err))
}
if _, err = io.Copy(temp, os.Stdin); err != nil {
DieErr(fmt.Errorf("copy stdin to temporary file: %w", err))
}
if _, err = temp.Seek(0, io.SeekStart); err != nil {
DieErr(fmt.Errorf("rewind temporary copied file: %w", err))
}
// Try to delete the file. This will fail on Windows, we shall try to
// delete on close anyway.
if os.Remove(temp.Name()) != nil {
return &deleteOnClose{temp}
}
return temp
}
return &nopCloser{os.Stdin}
}
fp, err := os.Open(path)
Expand Down
6 changes: 6 additions & 0 deletions cmd/lakectl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func getClient() api.Client {
return client
}

// isSeekable returns true if f.Seek appears to work.
func isSeekable(f io.Seeker) bool {
_, err := f.Seek(0, io.SeekCurrent)
return err == nil // a little naive, but probably good enough for its purpose
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
Expand Down
5 changes: 0 additions & 5 deletions cmd/lakectl/cmd/sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ import (
"google.golang.org/protobuf/proto"
)

func isSeekable(f io.Seeker) bool {
_, err := f.Seek(0, io.SeekCurrent)
return err == nil // a little naive, but probably good enough for its purpose
}

func readStdin() (pebblesst.ReadableFile, error) {
// test if stdin is seekable
if isSeekable(os.Stdin) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ require (
github.com/klauspost/compress v1.11.12 // indirect
github.com/lunixbochs/vtclean v1.0.0 // indirect
github.com/manifoldco/promptui v0.8.0
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/ory/dockertest/v3 v3.6.3
github.com/prometheus/client_golang v1.9.0
Expand Down
28 changes: 19 additions & 9 deletions pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,20 @@ func (c *client) StageObject(ctx context.Context, repoID, branchID, path string,
return resp.GetPayload(), nil
}

// readSize returns the size of r.
func readSize(r io.Seeker) (int64, error) {
cur, err := r.Seek(0, io.SeekCurrent)
if err != nil {
return 0, fmt.Errorf("tell: %w", err)
}
end, err := r.Seek(0, io.SeekEnd)
if err != nil {
return 0, fmt.Errorf("seek to end: %w", err)
}
_, err = r.Seek(cur, io.SeekStart)
return end, err
}

func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string, metadata map[string]string, contents io.ReadSeeker) (*models.ObjectStats, error) {
resp, err := c.remote.Staging.GetPhysicalAddress(&staging.GetPhysicalAddressParams{
Repository: repoID,
Expand All @@ -754,6 +768,11 @@ func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string
}
stagingLocation := resp.GetPayload()

size, err := readSize(contents)
if err != nil {
return nil, fmt.Errorf("readSize: %w", err)
}

for { // Return from inside loop
physicalAddress, err := url.Parse(stagingLocation.PhysicalAddress)

Expand Down Expand Up @@ -786,15 +805,6 @@ func (c *client) ClientUpload(ctx context.Context, repoID, branchID, path string
return nil, fmt.Errorf("upload to backing store %v: %w", physicalAddress, err)
}

size, err := contents.Seek(0, io.SeekEnd)
if err != nil {
return nil, fmt.Errorf("read size: %w", err)
}
_, err = contents.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("rewind: %w", err)
}

_, err = c.remote.Staging.LinkPhysicalAddress(&staging.LinkPhysicalAddressParams{
Repository: repoID,
Branch: branchID,
Expand Down