Skip to content

Commit

Permalink
lakefs import with merge support (#726)
Browse files Browse the repository at this point in the history
* import with merge

* import with merge

* Update cmd/lakefs/cmd/import.go

Co-authored-by: johnnyaug <yoni.augarten@treeverse.io>

* code review changes

Co-authored-by: johnnyaug <yoni.augarten@treeverse.io>
  • Loading branch information
nopcoder and johnnyaug authored Oct 1, 2020
1 parent 8d9257e commit 995d5ba
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions cmd/lakefs/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (

const (
DryRunFlagName = "dry-run"
WithMergeFlagName = "with-merge"
ManifestURLFlagName = "manifest"
ManifestURLFormat = "s3://example-bucket/inventory/YYYY-MM-DDT00-00Z/manifest.json"
ImportCmdNumArgs = 1
CommitterName = "lakefs"
)

var importCmd = &cobra.Command{
Expand All @@ -39,6 +41,8 @@ var importCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
dryRun, _ := cmd.Flags().GetBool(DryRunFlagName)
manifestURL, _ := cmd.Flags().GetString(ManifestURLFlagName)
withMerge, _ := cmd.Flags().GetBool(WithMergeFlagName)

ctx := context.Background()
conf := config.NewConfig()
err := db.ValidateSchemaUpToDate(conf.GetDatabaseParams())
Expand All @@ -48,7 +52,11 @@ var importCmd = &cobra.Command{
}
logger := logging.FromContext(ctx)
dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
defer func() { _ = dbPool.Close() }()

cataloger := catalog.NewCataloger(dbPool, catalog.WithParams(conf.GetCatalogerCatalogParams()))
defer func() { _ = cataloger.Close() }()

u := uri.Must(uri.Parse(args[0]))
blockStore, err := factory.BuildBlockAdapter(cfg)
if err != nil {
Expand Down Expand Up @@ -80,7 +88,7 @@ var importCmd = &cobra.Command{
fmt.Print("Starting import dry run. Will not perform any changes.\n\n")
}
importConfig := &onboard.Config{
CommitUsername: "lakefs",
CommitUsername: CommitterName,
InventoryURL: manifestURL,
Repository: repoName,
InventoryGenerator: blockStore,
Expand Down Expand Up @@ -109,14 +117,28 @@ var importCmd = &cobra.Command{
fmt.Println(text.FgYellow.Sprint("Previous import date:"), stats.PreviousImportDate)
fmt.Println()
}
if !dryRun {
fmt.Print(text.FgYellow.Sprint("Commit ref:"), stats.CommitRef)
fmt.Println()
fmt.Printf("Import to branch %s finished successfully.\n", onboard.DefaultBranchName)
if dryRun {
fmt.Println("Dry run successful. No changes were made.")
return
}

fmt.Print(text.FgYellow.Sprint("Commit ref:"), stats.CommitRef)
fmt.Println()
fmt.Printf("Import to branch %s finished successfully.\n", onboard.DefaultBranchName)
fmt.Println()
if withMerge {
fmt.Printf("Merging import changes into lakefs://%s@%s/\n", repoName, repo.DefaultBranch)
msg := fmt.Sprintf(onboard.CommitMsgTemplate, stats.CommitRef)
commitLog, err := cataloger.Merge(ctx, repoName, onboard.DefaultBranchName, repo.DefaultBranch, CommitterName, msg, nil)
if err != nil {
fmt.Printf("Merge failed: %s\n", err)
os.Exit(1)
}
fmt.Println("Merge was completed successfully.")
fmt.Printf("To list imported objects, run:\n\t$ lakectl fs ls lakefs://%s@%s/\n", repoName, commitLog.Reference)
} else {
fmt.Printf("To list imported objects, run:\n\t$ lakectl fs ls lakefs://%s@%s/\n", repoName, stats.CommitRef)
fmt.Printf("To merge the changes to your main branch, run:\n\t$ lakectl merge lakefs://%s@%s lakefs://%s@%s\n", repoName, onboard.DefaultBranchName, repoName, repo.DefaultBranch)
} else {
fmt.Println("Dry run successful. No changes were made.")
}
},
}
Expand Down Expand Up @@ -145,4 +167,5 @@ func init() {
importCmd.Flags().Bool(DryRunFlagName, false, "Only read inventory and print stats, without making any changes")
importCmd.Flags().StringP(ManifestURLFlagName, "m", "", fmt.Sprintf("S3 uri to the manifest.json to use for the import. Format: %s", ManifestURLFormat))
_ = importCmd.MarkFlagRequired(ManifestURLFlagName)
importCmd.Flags().Bool(WithMergeFlagName, false, "Merge imported data to the repository's main branch")
}

0 comments on commit 995d5ba

Please sign in to comment.