diff --git a/api/swagger.yml b/api/swagger.yml index 395c4710de0..0bb9778aab6 100644 --- a/api/swagger.yml +++ b/api/swagger.yml @@ -236,6 +236,10 @@ components: size_bytes: type: integer format: int64 + mtime: + type: integer + format: int64 + description: Unix Epoch in seconds metadata: type: object additionalProperties: diff --git a/clients/java/api/openapi.yaml b/clients/java/api/openapi.yaml index aa56db82a1d..9b280597446 100644 --- a/clients/java/api/openapi.yaml +++ b/clients/java/api/openapi.yaml @@ -3976,6 +3976,7 @@ components: key: metadata size_bytes: 0 checksum: checksum + mtime: 6 properties: physical_address: type: string @@ -3984,6 +3985,10 @@ components: size_bytes: format: int64 type: integer + mtime: + description: Unix Epoch in seconds + format: int64 + type: integer metadata: additionalProperties: type: string diff --git a/clients/java/docs/ObjectStageCreation.md b/clients/java/docs/ObjectStageCreation.md index 80022611152..a811b585219 100644 --- a/clients/java/docs/ObjectStageCreation.md +++ b/clients/java/docs/ObjectStageCreation.md @@ -10,6 +10,7 @@ Name | Type | Description | Notes **physicalAddress** | **String** | | **checksum** | **String** | | **sizeBytes** | **Long** | | +**mtime** | **Long** | Unix Epoch in seconds | [optional] **metadata** | **Map<String, String>** | | [optional] diff --git a/clients/java/src/main/java/io/treeverse/lakefs/clients/api/model/ObjectStageCreation.java b/clients/java/src/main/java/io/treeverse/lakefs/clients/api/model/ObjectStageCreation.java index 5a5e304e55d..6203896546b 100644 --- a/clients/java/src/main/java/io/treeverse/lakefs/clients/api/model/ObjectStageCreation.java +++ b/clients/java/src/main/java/io/treeverse/lakefs/clients/api/model/ObjectStageCreation.java @@ -44,6 +44,10 @@ public class ObjectStageCreation { @SerializedName(SERIALIZED_NAME_SIZE_BYTES) private Long sizeBytes; + public static final String SERIALIZED_NAME_MTIME = "mtime"; + @SerializedName(SERIALIZED_NAME_MTIME) + private Long mtime; + public static final String SERIALIZED_NAME_METADATA = "metadata"; @SerializedName(SERIALIZED_NAME_METADATA) private Map metadata = null; @@ -115,6 +119,29 @@ public void setSizeBytes(Long sizeBytes) { } + public ObjectStageCreation mtime(Long mtime) { + + this.mtime = mtime; + return this; + } + + /** + * Unix Epoch in seconds + * @return mtime + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Unix Epoch in seconds") + + public Long getMtime() { + return mtime; + } + + + public void setMtime(Long mtime) { + this.mtime = mtime; + } + + public ObjectStageCreation metadata(Map metadata) { this.metadata = metadata; @@ -158,12 +185,13 @@ public boolean equals(Object o) { return Objects.equals(this.physicalAddress, objectStageCreation.physicalAddress) && Objects.equals(this.checksum, objectStageCreation.checksum) && Objects.equals(this.sizeBytes, objectStageCreation.sizeBytes) && + Objects.equals(this.mtime, objectStageCreation.mtime) && Objects.equals(this.metadata, objectStageCreation.metadata); } @Override public int hashCode() { - return Objects.hash(physicalAddress, checksum, sizeBytes, metadata); + return Objects.hash(physicalAddress, checksum, sizeBytes, mtime, metadata); } @Override @@ -173,6 +201,7 @@ public String toString() { sb.append(" physicalAddress: ").append(toIndentedString(physicalAddress)).append("\n"); sb.append(" checksum: ").append(toIndentedString(checksum)).append("\n"); sb.append(" sizeBytes: ").append(toIndentedString(sizeBytes)).append("\n"); + sb.append(" mtime: ").append(toIndentedString(mtime)).append("\n"); sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); sb.append("}"); return sb.toString(); diff --git a/clients/python/docs/ObjectStageCreation.md b/clients/python/docs/ObjectStageCreation.md index 5dfaba7597f..3314776cf8e 100644 --- a/clients/python/docs/ObjectStageCreation.md +++ b/clients/python/docs/ObjectStageCreation.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **physical_address** | **str** | | **checksum** | **str** | | **size_bytes** | **int** | | +**mtime** | **int** | Unix Epoch in seconds | [optional] **metadata** | **{str: (str,)}** | | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/clients/python/docs/ObjectsApi.md b/clients/python/docs/ObjectsApi.md index 61c4a1f614d..75360168864 100644 --- a/clients/python/docs/ObjectsApi.md +++ b/clients/python/docs/ObjectsApi.md @@ -463,6 +463,7 @@ with lakefs_client.ApiClient(configuration) as api_client: physical_address="physical_address_example", checksum="checksum_example", size_bytes=1, + mtime=1, metadata={ "key": "key_example", }, diff --git a/clients/python/lakefs_client/model/object_stage_creation.py b/clients/python/lakefs_client/model/object_stage_creation.py index b3835c84dff..bf18c1a0aac 100644 --- a/clients/python/lakefs_client/model/object_stage_creation.py +++ b/clients/python/lakefs_client/model/object_stage_creation.py @@ -76,6 +76,7 @@ def openapi_types(): 'physical_address': (str,), # noqa: E501 'checksum': (str,), # noqa: E501 'size_bytes': (int,), # noqa: E501 + 'mtime': (int,), # noqa: E501 'metadata': ({str: (str,)},), # noqa: E501 } @@ -88,6 +89,7 @@ def discriminator(): 'physical_address': 'physical_address', # noqa: E501 'checksum': 'checksum', # noqa: E501 'size_bytes': 'size_bytes', # noqa: E501 + 'mtime': 'mtime', # noqa: E501 'metadata': 'metadata', # noqa: E501 } @@ -142,6 +144,7 @@ def __init__(self, physical_address, checksum, size_bytes, *args, **kwargs): # Animal class but this time we won't travel through its discriminator because we passed in _visited_composed_classes = (Animal,) + mtime (int): Unix Epoch in seconds. [optional] # noqa: E501 metadata ({str: (str,)}): [optional] # noqa: E501 """ diff --git a/cmd/lakectl/cmd/fs.go b/cmd/lakectl/cmd/fs.go index 8bc843dc5b3..1e81ecd7af6 100644 --- a/cmd/lakectl/cmd/fs.go +++ b/cmd/lakectl/cmd/fs.go @@ -238,12 +238,19 @@ var fsStageCmd = &cobra.Command{ client := getClient() pathURI := MustParsePathURI("path", args[0]) size, _ := cmd.Flags().GetInt64("size") + mtimeSeconds, _ := cmd.Flags().GetInt64("mtime") location, _ := cmd.Flags().GetString("location") checksum, _ := cmd.Flags().GetString("checksum") meta, metaErr := getKV(cmd, "meta") + var mtime *int64 + if mtimeSeconds != 0 { + mtime = &mtimeSeconds + } + obj := api.ObjectStageCreation{ Checksum: checksum, + Mtime: mtime, PhysicalAddress: location, SizeBytes: size, } @@ -304,6 +311,7 @@ func init() { fsStageCmd.Flags().String("location", "", "fully qualified storage location (i.e. \"s3://bucket/path/to/object\")") fsStageCmd.Flags().Int64("size", 0, "Object size in bytes") fsStageCmd.Flags().String("checksum", "", "Object MD5 checksum as a hexadecimal string") + fsStageCmd.Flags().Int64("mtime", 0, "Object modified time (Unix Epoch in seconds). Defaults to current time") fsStageCmd.Flags().StringSlice("meta", []string{}, "key value pairs in the form of key=value") _ = fsStageCmd.MarkFlagRequired("location") _ = fsStageCmd.MarkFlagRequired("size") diff --git a/cmd/lakectl/cmd/ingest.go b/cmd/lakectl/cmd/ingest.go new file mode 100644 index 00000000000..6897e00a6d2 --- /dev/null +++ b/cmd/lakectl/cmd/ingest.go @@ -0,0 +1,85 @@ +package cmd + +import ( + "strings" + + "github.com/spf13/cobra" + "github.com/treeverse/lakefs/cmd/lakectl/cmd/store" + "github.com/treeverse/lakefs/pkg/api" +) + +const ingestSummaryTemplate = ` +Staged {{ .Objects | yellow }} external objects (total of {{ .Bytes | human_bytes | yellow }}) +` + +var ingestCmd = &cobra.Command{ + Use: "ingest --from --to [--dry-run]", + Short: "Ingest objects from an external source into a lakeFS branch (without actually copying them)", + Run: func(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + verbose := MustBool(cmd.Flags().GetBool("verbose")) + dryRun := MustBool(cmd.Flags().GetBool("dry-run")) + from := MustString(cmd.Flags().GetString("from")) + to := MustString(cmd.Flags().GetString("to")) + lakefsURI := MustParsePathURI("to", to) + + summary := struct { + Objects int64 + Bytes int64 + }{} + client := getClient() + err := store.Walk(ctx, from, func(e store.ObjectStoreEntry) error { + if dryRun { + Fmt("%s\n", e) + return nil + } + key := e.RelativeKey + if lakefsURI.Path != nil && *lakefsURI.Path != "" { + path := *lakefsURI.Path + if strings.HasSuffix(*lakefsURI.Path, "/") { + key = path + key + } else { + key = path + "/" + key + } + } + mtime := e.Mtime.Unix() + resp, err := client.StageObjectWithResponse(ctx, + lakefsURI.Repository, + lakefsURI.Ref, + &api.StageObjectParams{ + Path: key, + }, + api.StageObjectJSONRequestBody{ + Checksum: e.ETag, + Mtime: &mtime, + PhysicalAddress: e.Address, + SizeBytes: e.Size, + }, + ) + DieOnResponseError(resp, err) + if verbose { + Write("Staged "+fsStatTemplate+"\n", resp.JSON201) + } + summary.Objects += 1 + summary.Bytes += api.Int64Value(resp.JSON201.SizeBytes) + return nil + }) + if err != nil { + DieFmt("error walking object store: %v", err) + } + + // print summary + Write(ingestSummaryTemplate, summary) + }, +} + +//nolint:gochecknoinits +func init() { + ingestCmd.Flags().String("from", "", "prefix to read from (e.g. \"s3://bucket/sub/path/\")") + _ = ingestCmd.MarkFlagRequired("from") + ingestCmd.Flags().String("to", "", "lakeFS path to load objects into (e.g. \"lakefs://repo/branch/sub/path/\")") + _ = ingestCmd.MarkFlagRequired("to") + ingestCmd.Flags().Bool("dry-run", false, "only print the paths to be ingested") + ingestCmd.Flags().BoolP("verbose", "v", false, "print stats for each individual object staged") + rootCmd.AddCommand(ingestCmd) +} diff --git a/cmd/lakectl/cmd/store/azure.go b/cmd/lakectl/cmd/store/azure.go new file mode 100644 index 00000000000..afb29982101 --- /dev/null +++ b/cmd/lakectl/cmd/store/azure.go @@ -0,0 +1,88 @@ +package store + +import ( + "context" + "errors" + "fmt" + "net/url" + "os" + "strings" + + "github.com/Azure/azure-pipeline-go/pipeline" + "github.com/Azure/azure-storage-blob-go/azblob" +) + +var ( + ErrAzureInvalidURL = errors.New("invalid Azure storage URL") + ErrAzureCredentials = errors.New("azure credentials error") +) + +func GetAzureClient() (pipeline.Pipeline, error) { + // From the Azure portal, get your storage account name and key and set environment variables. + accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") + if len(accountName) == 0 || len(accountKey) == 0 { + return nil, fmt.Errorf("%w: either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set", ErrAzureCredentials) + } + + // Create a default request pipeline using your storage account name and account key. + credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) + if err != nil { + return nil, fmt.Errorf("invalid credentials with error: %w", err) + } + return azblob.NewPipeline(credential, azblob.PipelineOptions{}), nil +} + +type AzureBlobWalker struct { + client pipeline.Pipeline +} + +// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix +// and return the URL for the container and a prefix, if one exists +func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) { + path := strings.TrimLeft(storageURI.Path, "/") + if len(path) == 0 { + return nil, "", fmt.Errorf("%w: could not parse container URL: %s", ErrAzureInvalidURL, storageURI) + } + parts := strings.SplitN(path, "/", 2) + if len(parts) == 1 { + // we only have a container + return storageURI, "", nil + } + // we have both prefix and storage container, rebuild URL + relativePath := url.URL{Path: "/" + parts[0]} + return storageURI.ResolveReference(&relativePath), parts[1], nil +} + +func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL { + relativePath := url.URL{Path: containerURL.Path + "/" + blobName} + return containerURL.ResolveReference(&relativePath) +} + +func (a *AzureBlobWalker) Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error { + // we use bucket as container and prefix as path + containerURL, prefix, err := extractAzurePrefix(storageURI) + if err != nil { + return err + } + container := azblob.NewContainerURL(*containerURL, a.client) + for marker := (azblob.Marker{}); marker.NotDone(); { + listBlob, err := container.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix}) + if err != nil { + return err + } + marker = listBlob.NextMarker + for _, blobInfo := range listBlob.Segment.BlobItems { + if err := walkFn(ObjectStoreEntry{ + FullKey: blobInfo.Name, + RelativeKey: strings.TrimPrefix(blobInfo.Name, prefix), + Address: getAzureBlobURL(containerURL, blobInfo.Name).String(), + ETag: string(blobInfo.Properties.Etag), + Mtime: blobInfo.Properties.LastModified, + Size: *blobInfo.Properties.ContentLength, + }); err != nil { + return err + } + } + } + return nil +} diff --git a/cmd/lakectl/cmd/store/factory.go b/cmd/lakectl/cmd/store/factory.go new file mode 100644 index 00000000000..66c02de4fa3 --- /dev/null +++ b/cmd/lakectl/cmd/store/factory.go @@ -0,0 +1,69 @@ +package store + +import ( + "context" + "errors" + "fmt" + "net/url" + "time" +) + +var ( + ErrNoStorageAdapter = errors.New("no storage adapter found") +) + +type ObjectStoreEntry struct { + // FullKey represents the fully qualified path in the object store namespace for the given entry + FullKey string + // RelativeKey represents a path relative to prefix (or directory). If none specified, will be identical to FullKey + RelativeKey string + // Address is a full URI for the entry, including the storage namespace (i.e. s3://bucket/path/to/key) + Address string + // ETag represents a hash of the entry's content. Generally as hex encoded MD5, + // but depends on the underlying object store + ETag string + // Mtime is the last-modified datetime of the entry + Mtime time.Time + // Size in bytes + Size int64 +} + +type Walker interface { + Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error +} + +func (e ObjectStoreEntry) String() string { + return fmt.Sprintf("ObjectStoreEntry: {Address:%s, RelativeKey:%s, ETag:%s, Size:%d, Mtime:%s}", + e.Address, e.RelativeKey, e.ETag, e.Size, e.Mtime) +} + +func Walk(ctx context.Context, storageURI string, walkFn func(e ObjectStoreEntry) error) error { + var walker Walker + uri, err := url.Parse(storageURI) + if err != nil { + return fmt.Errorf("could not parse storage URI %s: %w", uri, err) + } + switch uri.Scheme { + case "s3": + svc, err := GetS3Client() + if err != nil { + return err + } + walker = &S3Walker{s3: svc} + case "gs": + svc, err := GetGCSClient(ctx) + if err != nil { + return err + } + walker = &GCSWalker{client: svc} + case "http", "https": + svc, err := GetAzureClient() + if err != nil { + return err + } + walker = &AzureBlobWalker{client: svc} + default: + return fmt.Errorf("%w: for scheme: %s", ErrNoStorageAdapter, uri.Scheme) + } + return walker.Walk(ctx, uri, walkFn) +} diff --git a/cmd/lakectl/cmd/store/gcs.go b/cmd/lakectl/cmd/store/gcs.go new file mode 100644 index 00000000000..59b659672de --- /dev/null +++ b/cmd/lakectl/cmd/store/gcs.go @@ -0,0 +1,52 @@ +package store + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "net/url" + "strings" + + "cloud.google.com/go/storage" + "google.golang.org/api/iterator" +) + +func GetGCSClient(ctx context.Context) (*storage.Client, error) { + return storage.NewClient(ctx) +} + +type GCSWalker struct { + client *storage.Client +} + +func (w *GCSWalker) Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error { + prefix := strings.TrimLeft(storageURI.Path, "/") + iter := w.client. + Bucket(storageURI.Host). + Objects(ctx, &storage.Query{Prefix: prefix}) + + for { + attrs, err := iter.Next() + + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + return fmt.Errorf("error listing objects at storage uri %s: %w", storageURI, err) + } + + if err := walkFn(ObjectStoreEntry{ + FullKey: attrs.Name, + RelativeKey: strings.TrimPrefix(attrs.Name, prefix), + Address: fmt.Sprintf("gs://%s/%s", attrs.Bucket, attrs.Name), + ETag: hex.EncodeToString(attrs.MD5), + Mtime: attrs.Updated, + Size: attrs.Size, + }); err != nil { + return err + } + } + + return nil +} diff --git a/cmd/lakectl/cmd/store/s3.go b/cmd/lakectl/cmd/store/s3.go new file mode 100644 index 00000000000..458426dbb9e --- /dev/null +++ b/cmd/lakectl/cmd/store/s3.go @@ -0,0 +1,67 @@ +package store + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" +) + +func GetS3Client() (*s3.S3, error) { + sess, err := session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + }) + if err != nil { + return nil, err + } + svc := s3.New(sess) + return svc, nil +} + +type S3Walker struct { + s3 s3iface.S3API +} + +func (s *S3Walker) Walk(ctx context.Context, storageURI *url.URL, walkFn func(e ObjectStoreEntry) error) error { + var continuation *string + const maxKeys = 1000 + prefix := strings.TrimLeft(storageURI.Path, "/") + bucket := storageURI.Host + for { + result, err := s.s3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + ContinuationToken: continuation, + MaxKeys: aws.Int64(maxKeys), + Prefix: aws.String(prefix), + }) + if err != nil { + return err + } + for _, record := range result.Contents { + key := aws.StringValue(record.Key) + addr := fmt.Sprintf("s3://%s/%s", bucket, key) + ent := ObjectStoreEntry{ + FullKey: key, + RelativeKey: strings.TrimPrefix(key, prefix), + Address: addr, + ETag: strings.Trim(aws.StringValue(record.ETag), "\""), + Mtime: aws.TimeValue(record.LastModified), + Size: aws.Int64Value(record.Size), + } + err := walkFn(ent) + if err != nil { + return err + } + } + if !aws.BoolValue(result.IsTruncated) { + break + } + continuation = result.ContinuationToken + } + return nil +} diff --git a/docs/reference/commands.md b/docs/reference/commands.md index 29bda25ef9d..9ee1c99258b 100644 --- a/docs/reference/commands.md +++ b/docs/reference/commands.md @@ -53,9 +53,10 @@ lakectl is a CLI tool allowing exploration and manipulation of a lakeFS environm #### Options ``` - -c, --config string config file (default is $HOME/.lakectl.yaml) - -h, --help help for lakectl - --no-color don't use fancy output colors (default when not attached to an interactive terminal) + --base-uri string base URI used for lakeFS address parse + -c, --config string config file (default is $HOME/.lakectl.yaml) + -h, --help help for lakectl + --no-color don't use fancy output colors (default when not attached to an interactive terminal) ``` @@ -135,6 +136,25 @@ lakectl abuse random-read [flags] +### lakectl abuse random-write + +Generate random writes to the source branch + +``` +lakectl abuse random-write [flags] +``` + +#### Options + +``` + --amount int amount of writes to do (default 1000000) + -h, --help help for random-write + --parallelism int amount of writes to do in parallel (default 100) + --prefix string prefix to create paths under (default "abuse/") +``` + + + ### lakectl actions Manage Actions commands @@ -202,7 +222,7 @@ lakectl actions runs describe lakefs:// ``` --after string show results after this value (used for pagination) - --amount int how many results to return, or '-1' for default (used for pagination) (default -1) + --amount int number of results to return. By default, all results are returned. -h, --help help for describe ``` @@ -251,7 +271,7 @@ lakectl actions runs list lakefs:// [--branch ] [--commit [--branch ] [--commit +``` + +#### Options + +``` + -h, --help help for validate +``` + + + ### lakectl auth manage authentication and authorization @@ -1062,7 +1108,7 @@ lakectl branch list lakefs:// ``` --after string show results after this value (used for pagination) - --amount int how many results to return, or '-1' for default (used for pagination) (default -1) + --amount int number of results to return (default 100) -h, --help help for list ``` @@ -1320,6 +1366,9 @@ lakectl docs [outfile] [flags] ### lakectl fs +**note:** This command is a lakeFS plumbing command. Don't use it unless you're really sure you know what you're doing. +{: .note .note-warning } + view and manipulate objects #### Options @@ -1341,7 +1390,8 @@ lakectl fs cat [flags] #### Options ``` - -h, --help help for cat + -d, --direct read directly from backing store (faster but requires more credentials) + -h, --help help for cat ``` @@ -1378,7 +1428,8 @@ lakectl fs ls [flags] #### Options ``` - -h, --help help for ls + -h, --help help for ls + --recursive list all objects under the specified prefix ``` @@ -1416,6 +1467,8 @@ lakectl fs stage [flags] --checksum string Object MD5 checksum as a hexadecimal string -h, --help help for stage --location string fully qualified storage location (i.e. "s3://bucket/path/to/object") + --meta strings key value pairs in the form of key=value + --mtime int Object modified time (Unix Epoch in seconds). Defaults to current time. --size int Object size in bytes ``` @@ -1448,6 +1501,7 @@ lakectl fs upload [flags] #### Options ``` + -d, --direct write directly to backing store (faster but requires more credentials) -h, --help help for upload -r, --recursive recursively copy all files under local source -s, --source string local file to upload, or "-" for stdin @@ -1476,6 +1530,26 @@ lakectl help [command] [flags] +### lakectl ingest + +Ingest objects from an external source into a lakeFS branch (without actually copying them) + +``` +lakectl ingest --from --to [--dry-run] [flags] +``` + +#### Options + +``` + --dry-run only print the paths to be ingested + --from string prefix to read from (e.g. "s3://bucket/sub/path/") + -h, --help help for ingest + --to string lakeFS path to load objects into (e.g. "lakefs://repo/branch/sub/path/") + -v, --verbose print stats for each individual object staged +``` + + + ### lakectl log show log of commits for the given branch @@ -1488,7 +1562,7 @@ lakectl log [flags] ``` --after string show results after this value (used for pagination) - --amount int how many results to return, or '-1' for default (used for pagination) (default -1) + --amount int number of results to return. By default, all results are returned. -h, --help help for log --show-meta-range-id also show meta range ID ``` @@ -1542,17 +1616,46 @@ lakectl metastore copy [flags] #### Options ``` - --catalog-id string Glue catalog ID - --from-schema string source schema name - --from-table string source table name - -h, --help help for copy - --metastore-uri string Hive metastore URI - -p, --partition strings partition to copy - --serde string serde to set copy to [default is to-table] - --to-branch string lakeFS branch name - --to-schema string destination schema name [default is from-branch] - --to-table string destination table name [default is from-table] - --type string metastore type [hive, glue] + --catalog-id string Glue catalog ID + --from-client-type string metastore type [hive, glue] + --from-schema string source schema name + --from-table string source table name + -h, --help help for copy + --metastore-uri string Hive metastore URI + -p, --partition strings partition to copy + --serde string serde to set copy to [default is to-table] + --to-branch string lakeFS branch name + --to-client-type string metastore type [hive, glue] + --to-schema string destination schema name [default is from-branch] + --to-table string destination table name [default is from-table] +``` + + + +### lakectl metastore copy-all + +copy from one metastore to another + +#### Synopsis + +copy or merge requested tables between hive metastores. the destination tables will point to the selected branch + +``` +lakectl metastore copy-all [flags] +``` + +#### Options + +``` + --branch string lakeFS branch name + --continue-on-error string prevent copy-all from failing when a single table fails + --from-address string source metastore address + --from-client-type string metastore type [hive, glue] + -h, --help help for copy-all + --schema-filter string filter for schemas to copy in metastore pattern (default ".*") + --table-filter string filter for tables to copy in metastore pattern (default ".*") + --to-address string destination metastore address + --to-client-type string metastore type [hive, glue] ``` @@ -1596,14 +1699,17 @@ lakectl metastore diff [flags] #### Options ``` - --catalog-id string Glue catalog ID - --from-schema string source schema name - --from-table string source table name - -h, --help help for diff - --metastore-uri string Hive metastore URI - --to-schema string destination schema name - --to-table string destination table name [default is from-table] - --type string metastore type [hive, glue] + --catalog-id string Glue catalog ID + --from-address string source metastore address + --from-client-type string metastore type [hive, glue] + --from-schema string source schema name + --from-table string source table name + -h, --help help for diff + --metastore-uri string Hive metastore URI + --to-address string destination metastore address + --to-client-type string metastore type [hive, glue] + --to-schema string destination schema name + --to-table string destination table name [default is from-table] ``` @@ -1780,7 +1886,7 @@ lakectl repo list [flags] ``` --after string show results after this value (used for pagination) - --amount int how many results to return, or '-1' for default (used for pagination) (default -1) + --amount int number of results to return (default 100) -h, --help help for list ``` @@ -1891,7 +1997,7 @@ lakectl tag list lakefs:// ``` --after string show results after this value (used for pagination) - --amount int how many results to return, or '-1' for default (used for pagination) (default -1) + --amount int number of results to return (default 100) -h, --help help for list ``` diff --git a/docs/reference/import-mvcc.md b/docs/reference/import-mvcc.md index fed9b4ec040..7db791d4f08 100644 --- a/docs/reference/import-mvcc.md +++ b/docs/reference/import-mvcc.md @@ -4,6 +4,7 @@ title: Importing data from S3 (MVCC) description: In order to import existing data to lakeFS, you may choose to copy it using S3 CLI or using tools like Apache DistCp. parent: Reference nav_exclude: true +search_exclude: true has_children: false --- diff --git a/docs/reference/import.md b/docs/reference/import.md index e0811c90fbd..a9602de8b69 100644 --- a/docs/reference/import.md +++ b/docs/reference/import.md @@ -1,14 +1,15 @@ --- layout: default -title: Importing data from S3 +title: Importing data from existing Object Store description: In order to import existing data to lakeFS, you may choose to copy it using S3 CLI or using tools like Apache DistCp. parent: Reference nav_order: 8 has_children: false --- This page describes importing from versions >= v0.24.0. For ealier versions, see [mvcc import](import-mvcc.md) +{: .note .pb-3 } -# Importing data from S3 +# Importing data from existing Object Store {: .no_toc } ## Table of contents @@ -28,8 +29,61 @@ Unfortunately, copying data is not always feasible for the following reasons: 2. It requires you to stop making changes to the data before starting to copy. 3. It requires you to switch to using the lakeFS endpoint in all places at once. -## Using lakeFS import tool -To solve this we offer an import tool that does not copy any data, allowing for a more gradual onboarding process. +## Importing data from an object store without actually copying it + +The `lakectl` command supports ingesting objects from a source object store without actually copying the data itself. +This is done by listing the source bucket (and optional prefix), and creating pointers to the returned objects in lakeFS. + +By doing this, it's possible to take even large sets of objects, and have them appear as objects in a lakeFS branch, as if they were written directly to it. + +For this to work, we'd need to ensure 2 things first: + +1. The user calling `lakectl ingest` must have permissions to list the object at the source object store +1. The lakeFS installation must have read permissions to the objects being ingested + +### Running lakectl ingest with S3 as the source + +```shell +lakectl ingest \ + --from s3://bucket/optional/prefix/ \ + --to lakefs://my-repo/ingest-branch/optional/path/ +``` + +The `lakectl ingest` command will attempt to use the current user's existing credentials and will respect instance profiles, +environment variables and credential files [in the same way that the AWS cli does](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html){: target="_blank" } + +### Running lakectl ingest with Azure Blob storage as the source + +```shell +export AZURE_STORAGE_ACCOUNT="storageAccountName" +export AZURE_STORAGE_ACCESS_KEY="EXAMPLEroozoo2gaec9fooTieWah6Oshai5Sheofievohthapob0aidee5Shaekahw7loo1aishoonuuquahr3==" +lakectl ingest \ + --from https://storageAccountName.blob.core.windows.net/container/optional/prefix/ \ + --to lakefs://my-repo/ingest-branch/optional/path/ +``` + +The `lakectl ingest` command currently supports storage accounts configured through environment variables as shown above. + +**Note:** Currently `lakectl import` supports the `http://` and `https://` schemes for Azure storage URIs. `wasb`, `abfs` or `adls` are currently not supported. +{: .note } + +### Running lakectl ingest with Google Cloud storage as the source + +```shell +export GOOGLE_APPLICATION_CREDENTIALS="$HOME/.gcs_credentials.json" # Optional, will fallback to the default configured credentials +lakectl ingest \ + --from gs://bucket/optional/prefix/ \ + --to lakefs://my-repo/ingest-branch/optional/path/ +``` + +The `lakectl ingest` command currently supports the standard `GOOGLE_APPLICATION_CREDENTIALS` environment variable [as described in Google Cloud's documentation](https://cloud.google.com/docs/authentication/getting-started). + +## Very large buckets: Using lakeFS S3 inventory import tool + +Importing a very large amount of objects (> ~250M) might take some time using `lakectl ingest` as described above, +since it has to paginate through all the objects in the source using API calls. + +For S3, we provide a utility as part of the `lakefs` binary, called `lakefs import`. The lakeFS import tool will use the [S3 Inventory](https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-inventory.html) feature to create lakeFS metadata. In case the repository is empty, the imported metadata will be committed directly to the main branch. In all other cases, it will be committed to a special branch, called `import-from-inventory`. diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 01ea6c0a737..6735c8ed40d 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -11,6 +11,7 @@ import ( "net/http" "path/filepath" "reflect" + "regexp" "strings" "time" @@ -1791,16 +1792,21 @@ func (c *Controller) StageObject(w http.ResponseWriter, r *http.Request, body St return } - blockStoreType := c.BlockAdapter.BlockstoreType() - if qk.StorageType.String() != blockStoreType { - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid storage type: %s: current block adapter is %s", - qk.StorageType.String(), - blockStoreType, + // see what storage type this is and whether it fits our configuration + uriRegex := c.BlockAdapter.GetStorageNamespaceInfo().ValidityRegex + if match, err := regexp.MatchString(uriRegex, body.PhysicalAddress); err != nil || !match { + writeError(w, http.StatusBadRequest, fmt.Sprintf("physical address is not valid for block adapter: %s", + c.BlockAdapter.BlockstoreType(), )) return } + // take mtime from request, if any writeTime := time.Now() + if body.Mtime != nil { + writeTime = time.Unix(*body.Mtime, 0) + } + entry := catalog.DBEntry{ CommonLevel: false, Path: params.Path,