Skip to content

Commit

Permalink
use a worker pool to ingest entries faster (#1964)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozkatz authored May 19, 2021
1 parent 035aca2 commit 4bfead3
Showing 1 changed file with 87 additions and 35 deletions.
122 changes: 87 additions & 35 deletions cmd/lakectl/cmd/ingest.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package cmd

import (
"context"
"strings"
"sync"
"time"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/cmd/lakectl/cmd/store"
Expand All @@ -12,6 +15,23 @@ const ingestSummaryTemplate = `
Staged {{ .Objects | yellow }} external objects (total of {{ .Bytes | human_bytes | yellow }})
`

type stageRequest struct {
repository string
branch string
params *api.StageObjectParams
body api.StageObjectJSONRequestBody
}

func stageWorker(ctx context.Context, client api.ClientWithResponsesInterface, wg *sync.WaitGroup, requests <-chan *stageRequest, responses chan<- *api.StageObjectResponse) {
defer wg.Done()
for req := range requests {
resp, err := client.StageObjectWithResponse(
ctx, req.repository, req.branch, req.params, req.body)
DieOnResponseError(resp, err)
responses <- resp
}
}

var ingestCmd = &cobra.Command{
Use: "ingest --from <object store URI> --to <lakeFS path URI> [--dry-run]",
Short: "Ingest objects from an external source into a lakeFS branch (without actually copying them)",
Expand All @@ -21,51 +41,82 @@ var ingestCmd = &cobra.Command{
dryRun := MustBool(cmd.Flags().GetBool("dry-run"))
from := MustString(cmd.Flags().GetString("from"))
to := MustString(cmd.Flags().GetString("to"))
concurrency := MustInt(cmd.Flags().GetInt("concurrency"))
lakefsURI := MustParsePathURI("to", to)

// initialize worker pool
client := getClient()
var wg sync.WaitGroup
wg.Add(concurrency)
requests := make(chan *stageRequest)
responses := make(chan *api.StageObjectResponse)
for w := 0; w < concurrency; w++ {
go stageWorker(ctx, client, &wg, requests, responses)
}

summary := struct {
Objects int64
Bytes int64
}{}
client := getClient()
err := store.Walk(ctx, from, func(e store.ObjectStoreEntry) error {
if dryRun {
Fmt("%s\n", e)
return nil
}
key := e.RelativeKey
if lakefsURI.Path != nil && *lakefsURI.Path != "" {
path := *lakefsURI.Path
if strings.HasSuffix(*lakefsURI.Path, "/") {
key = path + key
} else {
key = path + "/" + key

var path string
if lakefsURI.Path != nil {
path = *lakefsURI.Path
}
if !strings.HasSuffix(path, "/") {
path = path + "/" // append a slash if not passed by the user
}
go func() {
err := store.Walk(ctx, from, func(e store.ObjectStoreEntry) error {
if dryRun {
Fmt("%s\n", e)
return nil
}
// iterate entries and feed our pool
key := e.RelativeKey
mtime := e.Mtime.Unix()
requests <- &stageRequest{
repository: lakefsURI.Repository,
branch: lakefsURI.Ref,
params: &api.StageObjectParams{
Path: path + key,
},
body: api.StageObjectJSONRequestBody{
Checksum: e.ETag,
Mtime: &mtime,
PhysicalAddress: e.Address,
SizeBytes: e.Size,
},
}
return nil
})
if err != nil {
DieFmt("error walking object store: %v", err)
}
mtime := e.Mtime.Unix()
resp, err := client.StageObjectWithResponse(ctx,
lakefsURI.Repository,
lakefsURI.Ref,
&api.StageObjectParams{
Path: key,
},
api.StageObjectJSONRequestBody{
Checksum: e.ETag,
Mtime: &mtime,
PhysicalAddress: e.Address,
SizeBytes: e.Size,
},
)
DieOnResponseError(resp, err)
close(requests) // we're done feeding work!
wg.Wait() // until all responses have been written
close(responses) // so we're also done with responses
}()

elapsed := time.Now()
for response := range responses {
summary.Objects += 1
summary.Bytes += api.Int64Value(response.JSON201.SizeBytes)

if verbose {
Write("Staged "+fsStatTemplate+"\n", resp.JSON201)
Write("Staged "+fsStatTemplate+"\n", response.JSON201)
continue
}
summary.Objects += 1
summary.Bytes += api.Int64Value(resp.JSON201.SizeBytes)
return nil
})
if err != nil {
DieFmt("error walking object store: %v", err)

// If not verbose, at least update no more than once a second
if time.Since(elapsed) > time.Second {
Write("Staged {{ .Objects | green }} objects so far...\r", summary)
elapsed = time.Now()
}

}
if !verbose {
Fmt("\n")
}

// print summary
Expand All @@ -81,5 +132,6 @@ func init() {
_ = ingestCmd.MarkFlagRequired("to")
ingestCmd.Flags().Bool("dry-run", false, "only print the paths to be ingested")
ingestCmd.Flags().BoolP("verbose", "v", false, "print stats for each individual object staged")
ingestCmd.Flags().IntP("concurrency", "C", 64, "max concurrent API calls to make to the lakeFS server")
rootCmd.AddCommand(ingestCmd)
}

0 comments on commit 4bfead3

Please sign in to comment.