Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ingest cmd #1864

Merged
merged 10 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ components:
size_bytes:
type: integer
format: int64
mtime:
type: integer
format: int64
description: Unix Epoch in seconds
metadata:
type: object
additionalProperties:
Expand Down
5 changes: 5 additions & 0 deletions clients/java/api/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/java/docs/ObjectStageCreation.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/python/docs/ObjectStageCreation.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Name | Type | Description | Notes
**physical_address** | **str** | |
**checksum** | **str** | |
**size_bytes** | **int** | |
**mtime** | **int** | Unix Epoch in seconds | [optional]
**metadata** | **{str: (str,)}** | | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
Expand Down
1 change: 1 addition & 0 deletions clients/python/docs/ObjectsApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
physical_address="physical_address_example",
checksum="checksum_example",
size_bytes=1,
mtime=1,
metadata={
"key": "key_example",
},
Expand Down
3 changes: 3 additions & 0 deletions clients/python/lakefs_client/model/object_stage_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def openapi_types():
'physical_address': (str,), # noqa: E501
'checksum': (str,), # noqa: E501
'size_bytes': (int,), # noqa: E501
'mtime': (int,), # noqa: E501
'metadata': ({str: (str,)},), # noqa: E501
}

Expand All @@ -88,6 +89,7 @@ def discriminator():
'physical_address': 'physical_address', # noqa: E501
'checksum': 'checksum', # noqa: E501
'size_bytes': 'size_bytes', # noqa: E501
'mtime': 'mtime', # noqa: E501
'metadata': 'metadata', # noqa: E501
}

Expand Down Expand Up @@ -142,6 +144,7 @@ def __init__(self, physical_address, checksum, size_bytes, *args, **kwargs): #
Animal class but this time we won't travel
through its discriminator because we passed in
_visited_composed_classes = (Animal,)
mtime (int): Unix Epoch in seconds. [optional] # noqa: E501
metadata ({str: (str,)}): [optional] # noqa: E501
"""

Expand Down
8 changes: 8 additions & 0 deletions cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,19 @@ var fsStageCmd = &cobra.Command{
client := getClient()
pathURI := MustParsePathURI("path", args[0])
size, _ := cmd.Flags().GetInt64("size")
mtimeSeconds, _ := cmd.Flags().GetInt64("mtime")
location, _ := cmd.Flags().GetString("location")
checksum, _ := cmd.Flags().GetString("checksum")
meta, metaErr := getKV(cmd, "meta")

var mtime *int64
if mtimeSeconds != 0 {
mtime = &mtimeSeconds
}

obj := api.ObjectStageCreation{
Checksum: checksum,
Mtime: mtime,
PhysicalAddress: location,
SizeBytes: size,
}
Expand Down Expand Up @@ -304,6 +311,7 @@ func init() {
fsStageCmd.Flags().String("location", "", "fully qualified storage location (i.e. \"s3://bucket/path/to/object\")")
fsStageCmd.Flags().Int64("size", 0, "Object size in bytes")
fsStageCmd.Flags().String("checksum", "", "Object MD5 checksum as a hexadecimal string")
fsStageCmd.Flags().Int64("mtime", 0, "Object modified time (Unix Epoch in seconds). Defaults to current time.")
ozkatz marked this conversation as resolved.
Show resolved Hide resolved
fsStageCmd.Flags().StringSlice("meta", []string{}, "key value pairs in the form of key=value")
_ = fsStageCmd.MarkFlagRequired("location")
_ = fsStageCmd.MarkFlagRequired("size")
Expand Down
40 changes: 16 additions & 24 deletions cmd/lakectl/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"strings"

"github.com/spf13/cobra"

"github.com/treeverse/lakefs/cmd/lakectl/cmd/store"
"github.com/treeverse/lakefs/pkg/api"
)
Expand All @@ -14,9 +13,8 @@ Staged {{ .Objects | yellow }} external objects (total of {{ .Bytes | human_byte
`

var ingestCmd = &cobra.Command{
Use: "ingest --from <object store URI> --to <lakeFS path URI> [--dry-run]",
Short: "Ingest objects from an external source into a lakeFS branch (without actually copying them)",
Hidden: false,
Use: "ingest --from <object store URI> --to <lakeFS path URI> [--dry-run]",
Short: "Ingest objects from an external source into a lakeFS branch (without actually copying them)",
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
verbose := MustBool(cmd.Flags().GetBool("verbose"))
Expand All @@ -25,8 +23,10 @@ var ingestCmd = &cobra.Command{
to := MustString(cmd.Flags().GetString("to"))
lakefsURI := MustParsePathURI("to", to)

var staged int64
var stagedBytes int64
summary := struct {
Objects int64
Bytes int64
}{}
client := getClient()
err := store.Walk(ctx, from, func(e store.ObjectStoreEntry) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple progress bar will help the user understand that things are progressing and in which rate. See import progress bars as an example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently possible with --verbose which will print each individual iobject being staged. Progress bars are nice for things that are done interactively, I feel the use case here is more "do this before running my job in airflow" in which case the output will appear in a log file, rendering the progress bar unreadable.. Happy to hear objections or suggestions though..

if dryRun {
Expand All @@ -35,19 +35,23 @@ var ingestCmd = &cobra.Command{
}
key := e.RelativeKey
if lakefsURI.Path != nil && *lakefsURI.Path != "" {
path := *lakefsURI.Path
if strings.HasSuffix(*lakefsURI.Path, "/") {
key = strings.Join([]string{api.StringValue(lakefsURI.Path), key}, "")
key = path + key
} else {
key = strings.Join([]string{api.StringValue(lakefsURI.Path), key}, "/")
key = path + "/" + key
}
}
mtime := e.Mtime.Unix()
resp, err := client.StageObjectWithResponse(ctx,
lakefsURI.Repository,
lakefsURI.Ref, &api.StageObjectParams{
lakefsURI.Ref,
&api.StageObjectParams{
Path: key,
},
api.StageObjectJSONRequestBody{
Checksum: e.ETag,
Mtime: &mtime,
PhysicalAddress: e.Address,
SizeBytes: e.Size,
},
Expand All @@ -56,28 +60,16 @@ var ingestCmd = &cobra.Command{
if verbose {
Write("Staged "+fsStatTemplate+"\n", resp.JSON201)
}
staged = staged + 1
stagedBytes = stagedBytes + api.Int64Value(resp.JSON201.SizeBytes)
summary.Objects += 1
summary.Bytes += api.Int64Value(resp.JSON201.SizeBytes)
return nil
})
if err != nil {
DieFmt("error walking object store: %v", err)
}

if err != nil {
DieErr(err)
}

// print summary
if staged > 0 {
Write(ingestSummaryTemplate, struct {
Objects int64
Bytes int64
}{
Objects: staged,
Bytes: stagedBytes,
})
}
Write(ingestSummaryTemplate, summary)
},
}

Expand Down
14 changes: 7 additions & 7 deletions cmd/lakectl/cmd/store/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@ import (
)

var (
ErrAzureBlobMisconfigured = errors.New("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
ErrAzureInvalidURL = errors.New("invalid Azure storage URL")
ErrAzureCredentials = errors.New("azure credentials error")
)

func GetAzureClient() (pipeline.Pipeline, error) {
// From the Azure portal, get your storage account name and key and set environment variables.
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
if len(accountName) == 0 || len(accountKey) == 0 {
return nil, ErrAzureBlobMisconfigured
return nil, fmt.Errorf("%w: either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set", ErrAzureCredentials)
}

// Create a default request pipeline using your storage account name and account key.
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("Invalid credentials with error: %s", err.Error())
return nil, fmt.Errorf("invalid credentials with error: %w", err)
}
return azblob.NewPipeline(credential, azblob.PipelineOptions{}), nil
}
Expand All @@ -35,12 +36,12 @@ type AzureBlobWalker struct {
client pipeline.Pipeline
}

// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container and a prefix, if one exists
func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
// take a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container, and the URL for the prefix if any
path := strings.TrimLeft(storageURI.Path, "/")
if len(path) == 0 {
return nil, "", fmt.Errorf("invalid storage URI: could not parse container: %s", storageURI)
return nil, "", fmt.Errorf("%w: could not parse container URL: %s", ErrAzureInvalidURL, storageURI)
}
parts := strings.SplitN(path, "/", 2)
if len(parts) == 1 {
Expand All @@ -50,7 +51,6 @@ func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
// we have both prefix and storage container, rebuild URL
relativePath := url.URL{Path: "/" + parts[0]}
return storageURI.ResolveReference(&relativePath), parts[1], nil

}

func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL {
Expand Down
29 changes: 21 additions & 8 deletions cmd/lakectl/cmd/store/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,46 @@ package store

import (
"context"
"errors"
"fmt"
"net/url"
"time"
)

var (
ErrNoStorageAdapter = errors.New("no storage adapter found")
)

type ObjectStoreEntry struct {
FullKey string
// FullKey represents the fully qualified path in the object store namespace for the given entry
FullKey string
// RelativeKey represents a path relative to prefix (or directory). If none specified, will be identical to FullKey
RelativeKey string
Address string
ETag string
Mtime time.Time
Size int64
// Address is a full URI for the entry, including the storage namespace (i.e. s3://bucket/path/to/key)
Address string
// ETag represents a hash of the entry's content. Generally as hex encoded MD5,
// but depends on the underlying object store
ETag string
// Mtime is the last-modified datetime of the entry
Mtime time.Time
// Size in bytes
Size int64
}

type Walker interface {
Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error
}

func (e ObjectStoreEntry) String() string {
return fmt.Sprintf("%s\t%s\t%s\t%d\t%s\n", e.Address, e.RelativeKey, e.ETag, e.Size, e.Mtime)
return fmt.Sprintf("ObjectStoreEntry: {Address:%s, RelativeKey:%s, ETag:%s, Size:%d, Mtime:%s}",
e.Address, e.RelativeKey, e.ETag, e.Size, e.Mtime)
}

func Walk(ctx context.Context, storageURI string, walkFn func(e ObjectStoreEntry) error) error {
var walker Walker
uri, err := url.Parse(storageURI)
if err != nil {
return fmt.Errorf("could not parse storage URI %s: %v", uri, err)
return fmt.Errorf("could not parse storage URI %s: %w", uri, err)
}
switch uri.Scheme {
case "s3":
Expand All @@ -50,7 +63,7 @@ func Walk(ctx context.Context, storageURI string, walkFn func(e ObjectStoreEntry
}
walker = &AzureBlobWalker{client: svc}
default:
return fmt.Errorf("no matching object store adapter for scheme: %s", uri.Scheme)
return fmt.Errorf("%w: for scheme: %s", ErrNoStorageAdapter, uri.Scheme)
}
return walker.Walk(ctx, uri, walkFn)
}
3 changes: 1 addition & 2 deletions cmd/lakectl/cmd/store/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"net/url"
"strings"

"google.golang.org/api/iterator"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)

func GetGCSClient(ctx context.Context) (*storage.Client, error) {
Expand Down
1 change: 1 addition & 0 deletions docs/reference/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,7 @@ lakectl fs stage <path uri> [flags]
-h, --help help for stage
--location string fully qualified storage location (i.e. "s3://bucket/path/to/object")
--meta strings key value pairs in the form of key=value
--mtime int Object modified time (Unix Epoch in seconds). Defaults to current time.
--size int Object size in bytes
```

Expand Down
Loading