Skip to content

Commit

Permalink
Feature/ingest cmd (#1864)
Browse files Browse the repository at this point in the history
* added ingest command to lakectl to do client-side mapping of objects
  • Loading branch information
ozkatz authored May 4, 2021
1 parent 7c22cd5 commit 2e2e005
Show file tree
Hide file tree
Showing 17 changed files with 620 additions and 40 deletions.
4 changes: 4 additions & 0 deletions api/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ components:
size_bytes:
type: integer
format: int64
mtime:
type: integer
format: int64
description: Unix Epoch in seconds
metadata:
type: object
additionalProperties:
Expand Down
5 changes: 5 additions & 0 deletions clients/java/api/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/java/docs/ObjectStageCreation.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/python/docs/ObjectStageCreation.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Name | Type | Description | Notes
**physical_address** | **str** | |
**checksum** | **str** | |
**size_bytes** | **int** | |
**mtime** | **int** | Unix Epoch in seconds | [optional]
**metadata** | **{str: (str,)}** | | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
Expand Down
1 change: 1 addition & 0 deletions clients/python/docs/ObjectsApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
physical_address="physical_address_example",
checksum="checksum_example",
size_bytes=1,
mtime=1,
metadata={
"key": "key_example",
},
Expand Down
3 changes: 3 additions & 0 deletions clients/python/lakefs_client/model/object_stage_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def openapi_types():
'physical_address': (str,), # noqa: E501
'checksum': (str,), # noqa: E501
'size_bytes': (int,), # noqa: E501
'mtime': (int,), # noqa: E501
'metadata': ({str: (str,)},), # noqa: E501
}

Expand All @@ -88,6 +89,7 @@ def discriminator():
'physical_address': 'physical_address', # noqa: E501
'checksum': 'checksum', # noqa: E501
'size_bytes': 'size_bytes', # noqa: E501
'mtime': 'mtime', # noqa: E501
'metadata': 'metadata', # noqa: E501
}

Expand Down Expand Up @@ -142,6 +144,7 @@ def __init__(self, physical_address, checksum, size_bytes, *args, **kwargs): #
Animal class but this time we won't travel
through its discriminator because we passed in
_visited_composed_classes = (Animal,)
mtime (int): Unix Epoch in seconds. [optional] # noqa: E501
metadata ({str: (str,)}): [optional] # noqa: E501
"""

Expand Down
8 changes: 8 additions & 0 deletions cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,19 @@ var fsStageCmd = &cobra.Command{
client := getClient()
pathURI := MustParsePathURI("path", args[0])
size, _ := cmd.Flags().GetInt64("size")
mtimeSeconds, _ := cmd.Flags().GetInt64("mtime")
location, _ := cmd.Flags().GetString("location")
checksum, _ := cmd.Flags().GetString("checksum")
meta, metaErr := getKV(cmd, "meta")

var mtime *int64
if mtimeSeconds != 0 {
mtime = &mtimeSeconds
}

obj := api.ObjectStageCreation{
Checksum: checksum,
Mtime: mtime,
PhysicalAddress: location,
SizeBytes: size,
}
Expand Down Expand Up @@ -304,6 +311,7 @@ func init() {
fsStageCmd.Flags().String("location", "", "fully qualified storage location (i.e. \"s3://bucket/path/to/object\")")
fsStageCmd.Flags().Int64("size", 0, "Object size in bytes")
fsStageCmd.Flags().String("checksum", "", "Object MD5 checksum as a hexadecimal string")
fsStageCmd.Flags().Int64("mtime", 0, "Object modified time (Unix Epoch in seconds). Defaults to current time")
fsStageCmd.Flags().StringSlice("meta", []string{}, "key value pairs in the form of key=value")
_ = fsStageCmd.MarkFlagRequired("location")
_ = fsStageCmd.MarkFlagRequired("size")
Expand Down
85 changes: 85 additions & 0 deletions cmd/lakectl/cmd/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cmd

import (
"strings"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/cmd/lakectl/cmd/store"
"github.com/treeverse/lakefs/pkg/api"
)

const ingestSummaryTemplate = `
Staged {{ .Objects | yellow }} external objects (total of {{ .Bytes | human_bytes | yellow }})
`

var ingestCmd = &cobra.Command{
Use: "ingest --from <object store URI> --to <lakeFS path URI> [--dry-run]",
Short: "Ingest objects from an external source into a lakeFS branch (without actually copying them)",
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
verbose := MustBool(cmd.Flags().GetBool("verbose"))
dryRun := MustBool(cmd.Flags().GetBool("dry-run"))
from := MustString(cmd.Flags().GetString("from"))
to := MustString(cmd.Flags().GetString("to"))
lakefsURI := MustParsePathURI("to", to)

summary := struct {
Objects int64
Bytes int64
}{}
client := getClient()
err := store.Walk(ctx, from, func(e store.ObjectStoreEntry) error {
if dryRun {
Fmt("%s\n", e)
return nil
}
key := e.RelativeKey
if lakefsURI.Path != nil && *lakefsURI.Path != "" {
path := *lakefsURI.Path
if strings.HasSuffix(*lakefsURI.Path, "/") {
key = path + key
} else {
key = path + "/" + key
}
}
mtime := e.Mtime.Unix()
resp, err := client.StageObjectWithResponse(ctx,
lakefsURI.Repository,
lakefsURI.Ref,
&api.StageObjectParams{
Path: key,
},
api.StageObjectJSONRequestBody{
Checksum: e.ETag,
Mtime: &mtime,
PhysicalAddress: e.Address,
SizeBytes: e.Size,
},
)
DieOnResponseError(resp, err)
if verbose {
Write("Staged "+fsStatTemplate+"\n", resp.JSON201)
}
summary.Objects += 1
summary.Bytes += api.Int64Value(resp.JSON201.SizeBytes)
return nil
})
if err != nil {
DieFmt("error walking object store: %v", err)
}

// print summary
Write(ingestSummaryTemplate, summary)
},
}

//nolint:gochecknoinits
func init() {
ingestCmd.Flags().String("from", "", "prefix to read from (e.g. \"s3://bucket/sub/path/\")")
_ = ingestCmd.MarkFlagRequired("from")
ingestCmd.Flags().String("to", "", "lakeFS path to load objects into (e.g. \"lakefs://repo/branch/sub/path/\")")
_ = ingestCmd.MarkFlagRequired("to")
ingestCmd.Flags().Bool("dry-run", false, "only print the paths to be ingested")
ingestCmd.Flags().BoolP("verbose", "v", false, "print stats for each individual object staged")
rootCmd.AddCommand(ingestCmd)
}
88 changes: 88 additions & 0 deletions cmd/lakectl/cmd/store/azure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package store

import (
"context"
"errors"
"fmt"
"net/url"
"os"
"strings"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
)

var (
ErrAzureInvalidURL = errors.New("invalid Azure storage URL")
ErrAzureCredentials = errors.New("azure credentials error")
)

func GetAzureClient() (pipeline.Pipeline, error) {
// From the Azure portal, get your storage account name and key and set environment variables.
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
if len(accountName) == 0 || len(accountKey) == 0 {
return nil, fmt.Errorf("%w: either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set", ErrAzureCredentials)
}

// Create a default request pipeline using your storage account name and account key.
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("invalid credentials with error: %w", err)
}
return azblob.NewPipeline(credential, azblob.PipelineOptions{}), nil
}

type AzureBlobWalker struct {
client pipeline.Pipeline
}

// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container and a prefix, if one exists
func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
path := strings.TrimLeft(storageURI.Path, "/")
if len(path) == 0 {
return nil, "", fmt.Errorf("%w: could not parse container URL: %s", ErrAzureInvalidURL, storageURI)
}
parts := strings.SplitN(path, "/", 2)
if len(parts) == 1 {
// we only have a container
return storageURI, "", nil
}
// we have both prefix and storage container, rebuild URL
relativePath := url.URL{Path: "/" + parts[0]}
return storageURI.ResolveReference(&relativePath), parts[1], nil
}

func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL {
relativePath := url.URL{Path: containerURL.Path + "/" + blobName}
return containerURL.ResolveReference(&relativePath)
}

func (a *AzureBlobWalker) Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error {
// we use bucket as container and prefix as path
containerURL, prefix, err := extractAzurePrefix(storageURI)
if err != nil {
return err
}
container := azblob.NewContainerURL(*containerURL, a.client)
for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := container.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
}
marker = listBlob.NextMarker
for _, blobInfo := range listBlob.Segment.BlobItems {
if err := walkFn(ObjectStoreEntry{
FullKey: blobInfo.Name,
RelativeKey: strings.TrimPrefix(blobInfo.Name, prefix),
Address: getAzureBlobURL(containerURL, blobInfo.Name).String(),
ETag: string(blobInfo.Properties.Etag),
Mtime: blobInfo.Properties.LastModified,
Size: *blobInfo.Properties.ContentLength,
}); err != nil {
return err
}
}
}
return nil
}
Loading

0 comments on commit 2e2e005

Please sign in to comment.