Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta Lake Diff: Change default plugin loading #5495

Merged
10 changes: 8 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ type PluginProps struct {
Version *int `mapstructure:"version"`
}

// Plugins struct holds the plugins dir default location and a map of optional plugin location with higher precedence
type Plugins struct {
DefaultPath string `mapstructure:"default_path"`
Properties map[string]PluginProps `mapstructure:"properties"`
}

// DiffProps struct holds the properties that define the details necessary to run a diff.
type DiffProps struct {
PluginName string `mapstructure:"plugin"`
Expand Down Expand Up @@ -326,8 +332,8 @@ type Config struct {
Code string `mapstructure:"code"`
} `mapstructure:"snippets"`
} `mapstructure:"ui"`
Diff map[string]DiffProps `mapstructure:"diff"`
Plugins map[string]PluginProps `mapstructure:"plugins"`
Diff map[string]DiffProps `mapstructure:"diff"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should define the supported formats and not leave it dynamic as it is not dynamic.
If the code was able to support new formats dynamically I would keep the map. But not it looks like we just moved the lookup at runtime to the specific format we support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue for that and will be handled in the following iteration.

Plugins Plugins `mapstructure:"plugins"`
}

func NewConfig() (*Config, error) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package config

import (
"fmt"
"os"
"time"

"github.com/spf13/viper"
Expand Down Expand Up @@ -112,9 +110,6 @@ func setDefaults(local bool) {
viper.SetDefault("graveler.commit_cache.size", 50_000)
viper.SetDefault("graveler.commit_cache.expiry", 10*time.Minute)
viper.SetDefault("graveler.commit_cache.jitter", 2*time.Second)
}

func DefaultPluginLocation(pluginName string) string {
hd, _ := os.UserHomeDir()
return fmt.Sprintf("%s/.lakefs/plugins/%s", hd, pluginName)
viper.SetDefault("plugins.default_path", "~/.lakefs/plugins")
}
42 changes: 37 additions & 5 deletions pkg/plugins/diff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package tablediff
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/mitchellh/go-homedir"

"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/plugins"
Expand Down Expand Up @@ -133,7 +137,7 @@ func (s *Service) appendClosingFunction(diffType string, f func()) {
}

// NewService is used to initialize a new Differ service. The returned function is a closing function for the service.
func NewService(diffProps map[string]config.DiffProps, pluginProps map[string]config.PluginProps) (*Service, func()) {
func NewService(diffProps map[string]config.DiffProps, pluginProps config.Plugins) (*Service, func()) {
service := &Service{
pluginHandler: internal.NewManager[Differ](),
closeFunctions: make(map[string]func()),
Expand All @@ -142,14 +146,20 @@ func NewService(diffProps map[string]config.DiffProps, pluginProps map[string]co
return service, service.Close
}

func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pluginProps map[string]config.PluginProps) {
func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pluginProps config.Plugins) {
registerDefaultPlugins(service, pluginProps.DefaultPath)
for n, p := range diffProps {
pluginName := p.PluginName
// If the requested plugin wasn't configured with a path, it will be defined under the default location
pluginPath := config.DefaultPluginLocation(pluginName)
pluginPath := filepath.Join(diffPluginsDefaultPath(pluginProps.DefaultPath), pluginName)
pluginVersion := 1 // default version
if props, ok := pluginProps[pluginName]; ok {
pluginPath = props.Path
if props, ok := pluginProps.Properties[pluginName]; ok {
pp, err := homedir.Expand(props.Path)
if err != nil {
logging.Default().Errorf("failed to register a plugin for an invalid path: '%s'", props.Path)
continue
}
pluginPath = pp
if props.Version != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the Version pointer from the configuration as we fallback to version 1 in the code - we can document that 0 is the default.

pluginVersion = *props.Version
}
Expand All @@ -166,3 +176,25 @@ func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pl
logging.Default().Infof("successfully registered a plugin for diff type: '%s'", n)
}
}

func registerDefaultPlugins(service *Service, pluginsPath string) {
diffPluginsDir := diffPluginsDefaultPath(pluginsPath)
deltaPath := filepath.Join(diffPluginsDir, "delta")
_, err := os.Stat(deltaPath)

if err != nil {
Jonathan-Rosenberg marked this conversation as resolved.
Show resolved Hide resolved
if !os.IsNotExist(err) {
logging.Default().WithError(err).Error("failed to access delta lake diff plugin")
}
return
}

pid := plugins.PluginIdentity{ProtocolVersion: 1, ExecutableLocation: deltaPath}
pa := plugins.PluginHandshake{}
RegisterDeltaLakeDiffPlugin(service, pid, pa)
}

func diffPluginsDefaultPath(pluginsPath string) string {
pp, _ := homedir.Expand(pluginsPath)
return filepath.Join(pp, "diff")
}
38 changes: 23 additions & 15 deletions pkg/plugins/diff/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tablediff
import (
"context"
"errors"
"path/filepath"
"strconv"
"testing"

Expand Down Expand Up @@ -59,7 +60,7 @@ func Test_registerPlugins(t *testing.T) {
type args struct {
service *Service
diffProps map[string]config.DiffProps
pluginProps map[string]config.PluginProps
pluginProps config.Plugins
}
testCases := []struct {
description string
Expand All @@ -71,14 +72,15 @@ func Test_registerPlugins(t *testing.T) {
{
description: "register delta diff plugin - default path and version - success",
diffTypes: []string{"delta"},
pluginName: pluginName,
args: args{
service: NewMockService(),
diffProps: map[string]config.DiffProps{
"delta": {
PluginName: pluginName,
},
},
pluginProps: nil,
pluginProps: config.Plugins{},
},
},
{
Expand All @@ -92,10 +94,13 @@ func Test_registerPlugins(t *testing.T) {
PluginName: pluginName,
},
},
pluginProps: map[string]config.PluginProps{
pluginName: {
Path: customPluginPath,
Version: &customPluginVersion,
pluginProps: config.Plugins{
DefaultPath: "",
Properties: map[string]config.PluginProps{
pluginName: {
Path: customPluginPath,
Version: &customPluginVersion,
},
},
},
},
Expand All @@ -116,7 +121,7 @@ func Test_registerPlugins(t *testing.T) {
PluginName: pluginName,
},
},
pluginProps: nil,
pluginProps: config.Plugins{},
},
expectedErr: ErrNotFound,
},
Expand All @@ -134,10 +139,13 @@ func Test_registerPlugins(t *testing.T) {
PluginName: pluginName,
},
},
pluginProps: map[string]config.PluginProps{
pluginName: {
Path: customPluginPath,
Version: &customPluginVersion,
pluginProps: config.Plugins{
DefaultPath: "",
Properties: map[string]config.PluginProps{
pluginName: {
Path: customPluginPath,
Version: &customPluginVersion,
},
},
},
},
Expand All @@ -157,17 +165,17 @@ func Test_registerPlugins(t *testing.T) {
t.Errorf("'%s' failed: %s", tc.description, err)
}
pluginDetails := diffs.Diffs[0].OperationContent
tcPath := config.DefaultPluginLocation(tc.args.diffProps[dt].PluginName)
if tc.args.pluginProps[tc.pluginName].Path != "" {
tcPath = tc.args.pluginProps[tc.pluginName].Path
tcPath := filepath.Join(tc.args.pluginProps.DefaultPath, "diff", tc.pluginName)
if tc.args.pluginProps.Properties[tc.pluginName].Path != "" {
tcPath = tc.args.pluginProps.Properties[tc.pluginName].Path
}
if pluginDetails[PluginPath] != tcPath {
t.Errorf("'%s' failed: incorrect plugin path. got '%s' instead of '%s'",
tc.description,
pluginDetails[PluginPath],
tcPath)
}
tcVersion := tc.args.pluginProps[tc.pluginName].Version
tcVersion := tc.args.pluginProps.Properties[tc.pluginName].Version
if tcVersion != nil && pluginDetails[PluginVersion] != strconv.Itoa(*tcVersion) {
t.Errorf("'%s' failed: incorrect plugin version. got '%s' instead of '%s'",
tc.description,
Expand Down