Skip to content

Commit

Permalink
lakectl log copy and update metastore operation information (#1986)
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder authored May 26, 2021
1 parent 4589933 commit 931be4a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
12 changes: 12 additions & 0 deletions cmd/lakectl/cmd/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/metastore"
"github.com/treeverse/lakefs/pkg/metastore/glue"
"github.com/treeverse/lakefs/pkg/metastore/hive"
Expand Down Expand Up @@ -44,6 +45,17 @@ var metastoreCopyCmd = &cobra.Command{
if len(serde) == 0 {
serde = toTable
}
logging.Default().WithFields(logging.Fields{
"form_client_type": fromClientType,
"from_schema": fromDB,
"from_table": fromTable,
"to_client_type": toClientType,
"to_schema": toDB,
"to_table": toTable,
"to_branch": toBranch,
"serde": serde,
"partition": partition,
}).Info("Metadata copy or merge table")
fmt.Printf("copy %s.%s -> %s.%s\n", fromDB, fromTable, toDB, toTable)
err := metastore.CopyOrMerge(cmd.Context(), fromClient, toClient, fromDB, fromTable, toDB, toTable, toBranch, serde, partition)
if err != nil {
Expand Down
26 changes: 24 additions & 2 deletions pkg/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ import (
"time"

"github.com/aws/aws-sdk-go/service/glue"
"github.com/davecgh/go-spew/spew"
"github.com/treeverse/lakefs/pkg/logging"
)

func (m *Table) Update(db, table, serde string, transformLocation func(location string) (string, error)) error {
log := logging.Default().WithFields(logging.Fields{
"db": db,
"table": table,
"serde": serde,
})
if m.Sd == nil {
m.Sd = &StorageDescriptor{}
}
Expand All @@ -21,10 +28,20 @@ func (m *Table) Update(db, table, serde string, transformLocation func(location
if m.Sd.Location != "" {
m.Sd.Location, err = transformLocation(m.Sd.Location)
}
return err
if err != nil {
log.WithError(err).WithField("table", spew.Sdump(*m)).Error("Update table")
return err
}
log.WithField("table", spew.Sdump(*m)).Debug("Update table")
return nil
}

func (m *Partition) Update(db, table, serde string, transformLocation func(location string) (string, error)) error {
log := logging.Default().WithFields(logging.Fields{
"db": db,
"table": table,
"serde": serde,
})
if m.Sd == nil {
m.Sd = &StorageDescriptor{}
}
Expand All @@ -38,7 +55,12 @@ func (m *Partition) Update(db, table, serde string, transformLocation func(locat
if m.Sd.Location != "" {
m.Sd.Location, err = transformLocation(m.Sd.Location)
}
return err
if err != nil {
log.WithError(err).WithField("partition", spew.Sdump(*m)).Error("Update partition")
return err
}
log.WithField("partition", spew.Sdump(*m)).Error("Update partition")
return nil
}

type Database struct {
Expand Down
12 changes: 12 additions & 0 deletions pkg/metastore/ms_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/logging"
)

type ReadClient interface {
Expand Down Expand Up @@ -43,14 +44,24 @@ func CopyOrMerge(ctx context.Context, fromClient Client, toClient Client, fromDB
}

func copyOrMergeWithTransformLocation(ctx context.Context, fromClient, toClient Client, fromDB, fromTable, toDB, toTable, serde string, partition []string, transformLocation func(location string) (string, error)) error {
log := logging.Default().WithFields(logging.Fields{
"from_db": fromDB,
"from_table": fromTable,
"to_db": toDB,
"to_table": toTable,
"serde": serde,
"partition_len": len(partition),
})
if len(partition) > 0 {
log.Debug("CopyPartition")
return CopyPartition(ctx, fromClient, toClient, fromDB, fromTable, toDB, toTable, serde, partition, transformLocation)
}
hasTable, err := toClient.HasTable(ctx, toDB, toTable)
if err != nil {
return err
}
if !hasTable {
log.Debug("Copy")
table, err := fromClient.GetTable(ctx, fromDB, fromTable)
if err != nil {
return err
Expand All @@ -61,6 +72,7 @@ func copyOrMergeWithTransformLocation(ctx context.Context, fromClient, toClient
}
return Copy(ctx, table, partitions, toDB, toTable, serde, toClient, transformLocation)
}
log.Debug("Merge")
table, err := fromClient.GetTable(ctx, fromDB, fromTable)
if err != nil {
return err
Expand Down

0 comments on commit 931be4a

Please sign in to comment.