Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ingest cmd #1864

Merged
merged 10 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ components:
size_bytes:
type: integer
format: int64
mtime:
type: integer
format: int64
description: Unix Epoch in seconds
metadata:
type: object
additionalProperties:
Expand Down
5 changes: 5 additions & 0 deletions clients/java/api/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/java/docs/ObjectStageCreation.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/python/docs/ObjectStageCreation.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Name | Type | Description | Notes
**physical_address** | **str** | |
**checksum** | **str** | |
**size_bytes** | **int** | |
**mtime** | **int** | Unix Epoch in seconds | [optional]
**metadata** | **{str: (str,)}** | | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
Expand Down
1 change: 1 addition & 0 deletions clients/python/docs/ObjectsApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
physical_address="physical_address_example",
checksum="checksum_example",
size_bytes=1,
mtime=1,
metadata={
"key": "key_example",
},
Expand Down
3 changes: 3 additions & 0 deletions clients/python/lakefs_client/model/object_stage_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def openapi_types():
'physical_address': (str,), # noqa: E501
'checksum': (str,), # noqa: E501
'size_bytes': (int,), # noqa: E501
'mtime': (int,), # noqa: E501
'metadata': ({str: (str,)},), # noqa: E501
}

Expand All @@ -88,6 +89,7 @@ def discriminator():
'physical_address': 'physical_address', # noqa: E501
'checksum': 'checksum', # noqa: E501
'size_bytes': 'size_bytes', # noqa: E501
'mtime': 'mtime', # noqa: E501
'metadata': 'metadata', # noqa: E501
}

Expand Down Expand Up @@ -142,6 +144,7 @@ def __init__(self, physical_address, checksum, size_bytes, *args, **kwargs): #
Animal class but this time we won't travel
through its discriminator because we passed in
_visited_composed_classes = (Animal,)
mtime (int): Unix Epoch in seconds. [optional] # noqa: E501
metadata ({str: (str,)}): [optional] # noqa: E501
"""

Expand Down
8 changes: 8 additions & 0 deletions cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,19 @@ var fsStageCmd = &cobra.Command{
client := getClient()
pathURI := MustParsePathURI("path", args[0])
size, _ := cmd.Flags().GetInt64("size")
mtimeSeconds, _ := cmd.Flags().GetInt64("mtime")
location, _ := cmd.Flags().GetString("location")
checksum, _ := cmd.Flags().GetString("checksum")
meta, metaErr := getKV(cmd, "meta")

var mtime *int64
if mtimeSeconds != 0 {
mtime = &mtimeSeconds
}

obj := api.ObjectStageCreation{
Checksum: checksum,
Mtime: mtime,
PhysicalAddress: location,
SizeBytes: size,
}
Expand Down Expand Up @@ -304,6 +311,7 @@ func init() {
fsStageCmd.Flags().String("location", "", "fully qualified storage location (i.e. \"s3://bucket/path/to/object\")")
fsStageCmd.Flags().Int64("size", 0, "Object size in bytes")
fsStageCmd.Flags().String("checksum", "", "Object MD5 checksum as a hexadecimal string")
fsStageCmd.Flags().Int64("mtime", 0, "Object modified time (Unix Epoch in seconds). Defaults to current time.")
ozkatz marked this conversation as resolved.
Show resolved Hide resolved
fsStageCmd.Flags().StringSlice("meta", []string{}, "key value pairs in the form of key=value")
_ = fsStageCmd.MarkFlagRequired("location")
_ = fsStageCmd.MarkFlagRequired("size")
Expand Down
85 changes: 85 additions & 0 deletions cmd/lakectl/cmd/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cmd

import (
"strings"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/cmd/lakectl/cmd/store"
"github.com/treeverse/lakefs/pkg/api"
)

const ingestSummaryTemplate = `
Staged {{ .Objects | yellow }} external objects (total of {{ .Bytes | human_bytes | yellow }})
`

var ingestCmd = &cobra.Command{
Use: "ingest --from <object store URI> --to <lakeFS path URI> [--dry-run]",
Short: "Ingest objects from an external source into a lakeFS branch (without actually copying them)",
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
verbose := MustBool(cmd.Flags().GetBool("verbose"))
dryRun := MustBool(cmd.Flags().GetBool("dry-run"))
from := MustString(cmd.Flags().GetString("from"))
to := MustString(cmd.Flags().GetString("to"))
lakefsURI := MustParsePathURI("to", to)

summary := struct {
Objects int64
Bytes int64
}{}
client := getClient()
err := store.Walk(ctx, from, func(e store.ObjectStoreEntry) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple progress bar will help the user understand that things are progressing and in which rate. See import progress bars as an example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently possible with --verbose which will print each individual iobject being staged. Progress bars are nice for things that are done interactively, I feel the use case here is more "do this before running my job in airflow" in which case the output will appear in a log file, rendering the progress bar unreadable.. Happy to hear objections or suggestions though..

if dryRun {
Fmt("%s\n", e)
return nil
}
key := e.RelativeKey
if lakefsURI.Path != nil && *lakefsURI.Path != "" {
path := *lakefsURI.Path
if strings.HasSuffix(*lakefsURI.Path, "/") {
key = path + key
} else {
key = path + "/" + key
}
}
mtime := e.Mtime.Unix()
resp, err := client.StageObjectWithResponse(ctx,
lakefsURI.Repository,
lakefsURI.Ref,
&api.StageObjectParams{
Path: key,
},
api.StageObjectJSONRequestBody{
Checksum: e.ETag,
Mtime: &mtime,
PhysicalAddress: e.Address,
SizeBytes: e.Size,
},
)
DieOnResponseError(resp, err)
if verbose {
Write("Staged "+fsStatTemplate+"\n", resp.JSON201)
}
summary.Objects += 1
summary.Bytes += api.Int64Value(resp.JSON201.SizeBytes)
return nil
})
if err != nil {
DieFmt("error walking object store: %v", err)
}

// print summary
Write(ingestSummaryTemplate, summary)
},
}

//nolint:gochecknoinits
func init() {
ingestCmd.Flags().String("from", "", "prefix to read from (e.g. \"s3://bucket/sub/path/\")")
_ = ingestCmd.MarkFlagRequired("from")
ingestCmd.Flags().String("to", "", "lakeFS path to load objects into (e.g. \"lakefs://repo/branch/sub/path/\")")
_ = ingestCmd.MarkFlagRequired("to")
ingestCmd.Flags().Bool("dry-run", false, "only print the paths to be ingested")
ingestCmd.Flags().BoolP("verbose", "v", false, "print stats for each individual object staged")
rootCmd.AddCommand(ingestCmd)
}
88 changes: 88 additions & 0 deletions cmd/lakectl/cmd/store/azure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package store

import (
"context"
"errors"
"fmt"
"net/url"
"os"
"strings"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
)

var (
ErrAzureInvalidURL = errors.New("invalid Azure storage URL")
ErrAzureCredentials = errors.New("azure credentials error")
)

func GetAzureClient() (pipeline.Pipeline, error) {
// From the Azure portal, get your storage account name and key and set environment variables.
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
if len(accountName) == 0 || len(accountKey) == 0 {
return nil, fmt.Errorf("%w: either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set", ErrAzureCredentials)
}

// Create a default request pipeline using your storage account name and account key.
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("invalid credentials with error: %w", err)
}
return azblob.NewPipeline(credential, azblob.PipelineOptions{}), nil
}

type AzureBlobWalker struct {
client pipeline.Pipeline
}

// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container and a prefix, if one exists
func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
path := strings.TrimLeft(storageURI.Path, "/")
if len(path) == 0 {
return nil, "", fmt.Errorf("%w: could not parse container URL: %s", ErrAzureInvalidURL, storageURI)
}
parts := strings.SplitN(path, "/", 2)
if len(parts) == 1 {
// we only have a container
return storageURI, "", nil
}
// we have both prefix and storage container, rebuild URL
relativePath := url.URL{Path: "/" + parts[0]}
return storageURI.ResolveReference(&relativePath), parts[1], nil
}

func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL {
relativePath := url.URL{Path: containerURL.Path + "/" + blobName}
return containerURL.ResolveReference(&relativePath)
}

func (a *AzureBlobWalker) Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error {
// we use bucket as container and prefix as path
containerURL, prefix, err := extractAzurePrefix(storageURI)
if err != nil {
return err
}
container := azblob.NewContainerURL(*containerURL, a.client)
for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := container.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
}
marker = listBlob.NextMarker
for _, blobInfo := range listBlob.Segment.BlobItems {
if err := walkFn(ObjectStoreEntry{
FullKey: blobInfo.Name,
RelativeKey: strings.TrimPrefix(blobInfo.Name, prefix),
Address: getAzureBlobURL(containerURL, blobInfo.Name).String(),
ETag: string(blobInfo.Properties.Etag),
Mtime: blobInfo.Properties.LastModified,
Size: *blobInfo.Properties.ContentLength,
}); err != nil {
return err
}
}
}
return nil
}
Loading