Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta Lake Diff: Change default plugin loading #5495

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker-publish-exp-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
uses: docker/build-push-action@v3
with:
context: .
target: lakefs
target: lakefs-plugins
push: true
platforms: linux/amd64,linux/arm64,darwin/amd64,darwin/arm64
build-args: VERSION=${{ steps.version.outputs.tag }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
uses: docker/build-push-action@v3
with:
context: .
target: lakefs
target: lakefs-plugins
push: true
platforms: linux/amd64,linux/arm64,darwin/amd64,darwin/arm64
build-args: VERSION=${{ steps.version.outputs.tag }}
Expand Down
56 changes: 53 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.19.2-alpine AS build
FROM --platform=$BUILDPLATFORM golang:1.19.2-alpine3.16 AS build

ARG VERSION=dev

Expand All @@ -25,8 +25,33 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
GOOS=$TARGETOS GOARCH=$TARGETARCH \
go build -ldflags "-X github.com/treeverse/lakefs/pkg/version.Version=${VERSION}" -o lakectl ./cmd/lakectl

# Build delta diff binary
FROM --platform=$BUILDPLATFORM rust:1.68-alpine3.16 AS build-delta-diff-plugin
RUN apk update && apk add build-base pkgconfig openssl-dev alpine-sdk
RUN cargo new --bin delta-diff
WORKDIR /delta-diff

# 2. Copy our manifests
COPY ./pkg/plugins/diff/delta_diff_server/Cargo.lock ./Cargo.lock
COPY ./pkg/plugins/diff/delta_diff_server/Cargo.toml ./Cargo.toml

# 3. Build only the dependencies to cache them in this layer

# Rust default behavior is to build a static binary (default target is <arch>-unknown-linux-musl on Alpine, and musl
# is assumed to be static). It links to openssl statically, but these are dynamic libraries. Setting RUSTFLAGS=-Ctarget-feature=-crt-static
# forces Rust to create a dynamic binary, despite asking for musl.
RUN RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release
RUN rm src/*.rs

# 4. Now that the dependency is built, copy your source code
COPY ./pkg/plugins/diff/delta_diff_server/src ./src

# 5. Build for release.
RUN rm ./target/release/deps/delta_diff*
RUN RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release

# lakectl image
FROM --platform=$BUILDPLATFORM alpine:3.15.0 AS lakectl
FROM --platform=$BUILDPLATFORM alpine:3.16.0 AS lakectl
RUN apk add -U --no-cache ca-certificates
WORKDIR /app
ENV PATH /app:$PATH
Expand All @@ -37,7 +62,7 @@ WORKDIR /home/lakefs
ENTRYPOINT ["/app/lakectl"]

# lakefs image
FROM --platform=$BUILDPLATFORM alpine:3.15.0 AS lakefs
FROM --platform=$BUILDPLATFORM alpine:3.16.0 AS lakefs-lakectl

RUN apk add -U --no-cache ca-certificates
# Be Docker compose friendly (i.e. support wait-for)
Expand All @@ -58,3 +83,28 @@ WORKDIR /home/lakefs
ENTRYPOINT ["/app/lakefs"]
CMD ["run"]

# lakefs image
FROM --platform=$BUILDPLATFORM alpine:3.16.0 AS lakefs-plugins

RUN apk add -U --no-cache ca-certificates
RUN apk add openssl-dev libc6-compat alpine-sdk
# Be Docker compose friendly (i.e. support wait-for)
RUN apk add netcat-openbsd

WORKDIR /app
COPY ./scripts/wait-for ./
ENV PATH /app:$PATH
COPY --from=build /build/lakefs /build/lakectl ./
COPY --from=build-delta-diff-plugin /delta-diff/target/release/delta_diff ./

EXPOSE 8000/tcp

# Setup user
RUN addgroup -S lakefs && adduser -S lakefs -G lakefs
USER lakefs
WORKDIR /home/lakefs

RUN mkdir -p /home/lakefs/.lakefs/plugins/diff && ln -s /app/delta_diff /home/lakefs/.lakefs/plugins/diff/delta

ENTRYPOINT ["/app/lakefs"]
CMD ["run"]
2 changes: 1 addition & 1 deletion esti/ops/docker-compose-dynamodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ services:
ports:
- "6432:8000"
esti:
image: "golang:1.19.2-alpine"
image: "golang:1.19.2-alpine3.16"
links:
- lakefs:s3.local.lakefs.io
- lakefs:testmultipartupload.s3.local.lakefs.io
Expand Down
6 changes: 3 additions & 3 deletions esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- "8000:8000"
depends_on:
- "postgres"
volumes:
volumes:
- lakefs-app:/app:ro
environment:
- LAKEFS_AUTH_ENCRYPT_SECRET_KEY=some random secret string
Expand Down Expand Up @@ -37,7 +37,7 @@ services:
POSTGRES_USER: lakefs
POSTGRES_PASSWORD: lakefs
esti:
image: "golang:1.19.2-alpine"
image: "golang:1.19.2-alpine3.16"
links:
- lakefs:s3.local.lakefs.io
- lakefs:testmultipartupload.s3.local.lakefs.io
Expand Down Expand Up @@ -70,7 +70,7 @@ services:
go test -v $ESTI_GOTEST_FLAGS ./esti --system-tests $ESTI_FLAGS --skip=".*GC"
volumes:
- lakefs-code:/lakefs
- lakefs-app:/app:ro
- lakefs-app:/app:ro

volumes:
lakefs-code:
Expand Down
12 changes: 9 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ type S3AuthInfo struct {
// PluginProps struct holds the properties needed to run a plugin
type PluginProps struct {
Path string `mapstructure:"path"`
Version *int `mapstructure:"version"`
Version int `mapstructure:"version"`
}

// Plugins struct holds the plugins dir default location and a map of optional plugin location with higher precedence
type Plugins struct {
DefaultPath string `mapstructure:"default_path"`
Properties map[string]PluginProps `mapstructure:"properties"`
}

// DiffProps struct holds the properties that define the details necessary to run a diff.
Expand Down Expand Up @@ -326,8 +332,8 @@ type Config struct {
Code string `mapstructure:"code"`
} `mapstructure:"snippets"`
} `mapstructure:"ui"`
Diff map[string]DiffProps `mapstructure:"diff"`
Plugins map[string]PluginProps `mapstructure:"plugins"`
Diff map[string]DiffProps `mapstructure:"diff"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should define the supported formats and not leave it dynamic as it is not dynamic.
If the code was able to support new formats dynamically I would keep the map. But not it looks like we just moved the lookup at runtime to the specific format we support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue for that and will be handled in the following iteration.

Plugins Plugins `mapstructure:"plugins"`
}

func NewConfig() (*Config, error) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package config

import (
"fmt"
"os"
"time"

"github.com/spf13/viper"
Expand Down Expand Up @@ -112,9 +110,6 @@ func setDefaults(local bool) {
viper.SetDefault("graveler.commit_cache.size", 50_000)
viper.SetDefault("graveler.commit_cache.expiry", 10*time.Minute)
viper.SetDefault("graveler.commit_cache.jitter", 2*time.Second)
}

func DefaultPluginLocation(pluginName string) string {
hd, _ := os.UserHomeDir()
return fmt.Sprintf("%s/.lakefs/plugins/%s", hd, pluginName)
viper.SetDefault("plugins.default_path", "~/.lakefs/plugins")
}
16 changes: 9 additions & 7 deletions pkg/plugins/diff/delta_diff_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl TableDiffer for DifferService {
let diff_props: DiffProps = r.props.expect("Missing diff properties");
let left_table_path: TablePath = diff_props.left_table_path.expect("Missing left table's path");
let right_table_path: TablePath = diff_props.right_table_path.expect("Missing right table's path");

let s3_config_map: HashMap<String, String> = utils::construct_storage_config(s3_gateway_config_req);
let left_table_res =
delta_ops::get_delta_table(&s3_config_map, &diff_props.repo, &left_table_path);
Expand Down Expand Up @@ -120,12 +119,14 @@ async fn get_diff_type(left_table_res: &Result<DeltaTable, Status>,
(Err(status), Ok(_)) => {
// left table wasn't found, and it doesn't exist on the base: table created
if matches!(status.code(), Code::NotFound) {
if base_table_res.is_err() && matches!(base_table_res.as_ref().unwrap_err().code(), Code::NotFound) {
Ok(DiffType::Created)
} else if base_table_res.is_ok() { // The table existed on the base branch
if base_table_res.is_err() {
if matches!(base_table_res.as_ref().unwrap_err().code(), Code::NotFound | Code::InvalidArgument) { // didn't exist in base ref or no base ref provided
Ok(DiffType::Created)
} else { // There was other kind of error with the base branch
Err(base_table_res.as_ref().unwrap_err().clone())
}
} else { // The table existed on the base branch
Ok(DiffType::Changed)
} else { // There was other kind of error with the base branch
Err(base_table_res.as_ref().unwrap_err().clone())
}
} else {
Err(left_table_res.as_ref().unwrap_err().clone())
Expand Down Expand Up @@ -166,12 +167,13 @@ impl HistoryAndVersion {
}
}

fn show_history(mut hist: Vec<Map<String, Value>>, table_version: DeltaDataTypeVersion) -> Result<Vec<TableOperation>, Status> {
fn show_history(mut hist: Vec<Map<String, Value>>, mut table_version: DeltaDataTypeVersion) -> Result<Vec<TableOperation>, Status> {
hist.reverse();
let mut ans: Vec<TableOperation> = Vec::with_capacity(hist.len());
for commit in hist {
let table_op = construct_table_op(&commit, table_version)?;
ans.push(table_op);
table_version -= 1;
}
Ok(ans)
}
Expand Down
45 changes: 38 additions & 7 deletions pkg/plugins/diff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package tablediff
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/mitchellh/go-homedir"

"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/plugins"
Expand Down Expand Up @@ -133,7 +137,7 @@ func (s *Service) appendClosingFunction(diffType string, f func()) {
}

// NewService is used to initialize a new Differ service. The returned function is a closing function for the service.
func NewService(diffProps map[string]config.DiffProps, pluginProps map[string]config.PluginProps) (*Service, func()) {
func NewService(diffProps map[string]config.DiffProps, pluginProps config.Plugins) (*Service, func()) {
service := &Service{
pluginHandler: internal.NewManager[Differ](),
closeFunctions: make(map[string]func()),
Expand All @@ -142,16 +146,22 @@ func NewService(diffProps map[string]config.DiffProps, pluginProps map[string]co
return service, service.Close
}

func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pluginProps map[string]config.PluginProps) {
func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pluginProps config.Plugins) {
registerDefaultPlugins(service, pluginProps.DefaultPath)
for n, p := range diffProps {
pluginName := p.PluginName
// If the requested plugin wasn't configured with a path, it will be defined under the default location
pluginPath := config.DefaultPluginLocation(pluginName)
pluginPath := filepath.Join(diffPluginsDefaultPath(pluginProps.DefaultPath), pluginName)
pluginVersion := 1 // default version
if props, ok := pluginProps[pluginName]; ok {
pluginPath = props.Path
if props.Version != nil {
pluginVersion = *props.Version
if props, ok := pluginProps.Properties[pluginName]; ok {
pp, err := homedir.Expand(props.Path)
if err != nil {
logging.Default().Errorf("failed to register a plugin for an invalid path: '%s'", props.Path)
continue
}
pluginPath = pp
if props.Version != 0 {
pluginVersion = props.Version
}
}

Expand All @@ -166,3 +176,24 @@ func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pl
logging.Default().Infof("successfully registered a plugin for diff type: '%s'", n)
}
}

func registerDefaultPlugins(service *Service, pluginsPath string) {
diffPluginsDir := diffPluginsDefaultPath(pluginsPath)
deltaPath := filepath.Join(diffPluginsDir, "delta")
_, err := os.Stat(deltaPath)
if err != nil {
if !os.IsNotExist(err) {
logging.Default().WithError(err).Error("failed to access delta lake diff plugin")
}
return
}

pid := plugins.PluginIdentity{ProtocolVersion: 1, ExecutableLocation: deltaPath}
pa := plugins.PluginHandshake{}
RegisterDeltaLakeDiffPlugin(service, pid, pa)
}

func diffPluginsDefaultPath(pluginsPath string) string {
pp, _ := homedir.Expand(pluginsPath)
return filepath.Join(pp, "diff")
}
Loading