Skip to content

Commit

Permalink
Delta Lake Diff: Change default plugin loading (#5495)
Browse files Browse the repository at this point in the history
* change default behavior of plugin loading to scan in the default plugins directory for known plugins

* get delta lake diff plugin from default path if available

* add homedir.Expand call to custom plugin path

* remove empty line

Co-authored-by: Barak Amar <barak.amar@treeverse.io>

* change version from a pointer to an int

* Delta Lake Diff: Dockerfile (#5496)

* Display images in markdown with support for lakefs:// URIs (#5449)

* Display images in markdown with support for lakefs:// URIs

* change dockerfile to include delta diff

* change Dockerfile to include the Delta Lake diff plugin

* fix docker-compose files

* add a stage with both lakefs and plugins and separate it from a lakefs-only stage

* PR fixes

* Delta Lake Diff: Enable feature (#5508)

* fix hover issue when hovering over a tree-entry-row

* enable Delta Lake diff feature

* remove experimental delta diff button

* PR fixes

* change docker image publish target to lakefs-plugins

* fix bug hunt bugs (#5534)

* remove empty line

* Delta Lake: Update Dockerfile base images to Alpine (#5546)

* change the Dockerfile back to Alpine and build the Delta plugin dynamically

* remove alpine-sdk from dockerfile

* add alpine-sdk

* add comment about the 'RUSTFLAGS=-Ctarget-feature=-crt-static' flag

* change back from bullseye to alpine

---------

Co-authored-by: eladlachmi <110764839+eladlachmi@users.noreply.github.com>

---------

Co-authored-by: Barak Amar <barak.amar@treeverse.io>
Co-authored-by: eladlachmi <110764839+eladlachmi@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 22, 2023
1 parent 181d56b commit 39b9aa1
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 135 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-publish-exp-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
uses: docker/build-push-action@v3
with:
context: .
target: lakefs
target: lakefs-plugins
push: true
platforms: linux/amd64,linux/arm64,darwin/amd64,darwin/arm64
build-args: VERSION=${{ steps.version.outputs.tag }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
uses: docker/build-push-action@v3
with:
context: .
target: lakefs
target: lakefs-plugins
push: true
platforms: linux/amd64,linux/arm64,darwin/amd64,darwin/arm64
build-args: VERSION=${{ steps.version.outputs.tag }}
Expand Down
56 changes: 53 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.19.2-alpine AS build
FROM --platform=$BUILDPLATFORM golang:1.19.2-alpine3.16 AS build

ARG VERSION=dev

Expand All @@ -25,8 +25,33 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
GOOS=$TARGETOS GOARCH=$TARGETARCH \
go build -ldflags "-X github.com/treeverse/lakefs/pkg/version.Version=${VERSION}" -o lakectl ./cmd/lakectl

# Build delta diff binary
FROM --platform=$BUILDPLATFORM rust:1.68-alpine3.16 AS build-delta-diff-plugin
RUN apk update && apk add build-base pkgconfig openssl-dev alpine-sdk
RUN cargo new --bin delta-diff
WORKDIR /delta-diff

# 2. Copy our manifests
COPY ./pkg/plugins/diff/delta_diff_server/Cargo.lock ./Cargo.lock
COPY ./pkg/plugins/diff/delta_diff_server/Cargo.toml ./Cargo.toml

# 3. Build only the dependencies to cache them in this layer

# Rust default behavior is to build a static binary (default target is <arch>-unknown-linux-musl on Alpine, and musl
# is assumed to be static). It links to openssl statically, but these are dynamic libraries. Setting RUSTFLAGS=-Ctarget-feature=-crt-static
# forces Rust to create a dynamic binary, despite asking for musl.
RUN RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release
RUN rm src/*.rs

# 4. Now that the dependency is built, copy your source code
COPY ./pkg/plugins/diff/delta_diff_server/src ./src

# 5. Build for release.
RUN rm ./target/release/deps/delta_diff*
RUN RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release

# lakectl image
FROM --platform=$BUILDPLATFORM alpine:3.15.0 AS lakectl
FROM --platform=$BUILDPLATFORM alpine:3.16.0 AS lakectl
RUN apk add -U --no-cache ca-certificates
WORKDIR /app
ENV PATH /app:$PATH
Expand All @@ -37,7 +62,7 @@ WORKDIR /home/lakefs
ENTRYPOINT ["/app/lakectl"]

# lakefs image
FROM --platform=$BUILDPLATFORM alpine:3.15.0 AS lakefs
FROM --platform=$BUILDPLATFORM alpine:3.16.0 AS lakefs-lakectl

RUN apk add -U --no-cache ca-certificates
# Be Docker compose friendly (i.e. support wait-for)
Expand All @@ -58,3 +83,28 @@ WORKDIR /home/lakefs
ENTRYPOINT ["/app/lakefs"]
CMD ["run"]

# lakefs image
FROM --platform=$BUILDPLATFORM alpine:3.16.0 AS lakefs-plugins

RUN apk add -U --no-cache ca-certificates
RUN apk add openssl-dev libc6-compat alpine-sdk
# Be Docker compose friendly (i.e. support wait-for)
RUN apk add netcat-openbsd

WORKDIR /app
COPY ./scripts/wait-for ./
ENV PATH /app:$PATH
COPY --from=build /build/lakefs /build/lakectl ./
COPY --from=build-delta-diff-plugin /delta-diff/target/release/delta_diff ./

EXPOSE 8000/tcp

# Setup user
RUN addgroup -S lakefs && adduser -S lakefs -G lakefs
USER lakefs
WORKDIR /home/lakefs

RUN mkdir -p /home/lakefs/.lakefs/plugins/diff && ln -s /app/delta_diff /home/lakefs/.lakefs/plugins/diff/delta

ENTRYPOINT ["/app/lakefs"]
CMD ["run"]
2 changes: 1 addition & 1 deletion esti/ops/docker-compose-dynamodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ services:
ports:
- "6432:8000"
esti:
image: "golang:1.19.2-alpine"
image: "golang:1.19.2-alpine3.16"
links:
- lakefs:s3.local.lakefs.io
- lakefs:testmultipartupload.s3.local.lakefs.io
Expand Down
6 changes: 3 additions & 3 deletions esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- "8000:8000"
depends_on:
- "postgres"
volumes:
volumes:
- lakefs-app:/app:ro
environment:
- LAKEFS_AUTH_ENCRYPT_SECRET_KEY=some random secret string
Expand Down Expand Up @@ -37,7 +37,7 @@ services:
POSTGRES_USER: lakefs
POSTGRES_PASSWORD: lakefs
esti:
image: "golang:1.19.2-alpine"
image: "golang:1.19.2-alpine3.16"
links:
- lakefs:s3.local.lakefs.io
- lakefs:testmultipartupload.s3.local.lakefs.io
Expand Down Expand Up @@ -70,7 +70,7 @@ services:
go test -v $ESTI_GOTEST_FLAGS ./esti --system-tests $ESTI_FLAGS --skip=".*GC"
volumes:
- lakefs-code:/lakefs
- lakefs-app:/app:ro
- lakefs-app:/app:ro

volumes:
lakefs-code:
Expand Down
12 changes: 9 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ type S3AuthInfo struct {
// PluginProps struct holds the properties needed to run a plugin
type PluginProps struct {
Path string `mapstructure:"path"`
Version *int `mapstructure:"version"`
Version int `mapstructure:"version"`
}

// Plugins struct holds the plugins dir default location and a map of optional plugin location with higher precedence
type Plugins struct {
DefaultPath string `mapstructure:"default_path"`
Properties map[string]PluginProps `mapstructure:"properties"`
}

// DiffProps struct holds the properties that define the details necessary to run a diff.
Expand Down Expand Up @@ -326,8 +332,8 @@ type Config struct {
Code string `mapstructure:"code"`
} `mapstructure:"snippets"`
} `mapstructure:"ui"`
Diff map[string]DiffProps `mapstructure:"diff"`
Plugins map[string]PluginProps `mapstructure:"plugins"`
Diff map[string]DiffProps `mapstructure:"diff"`
Plugins Plugins `mapstructure:"plugins"`
}

func NewConfig() (*Config, error) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package config

import (
"fmt"
"os"
"time"

"github.com/spf13/viper"
Expand Down Expand Up @@ -112,9 +110,6 @@ func setDefaults(local bool) {
viper.SetDefault("graveler.commit_cache.size", 50_000)
viper.SetDefault("graveler.commit_cache.expiry", 10*time.Minute)
viper.SetDefault("graveler.commit_cache.jitter", 2*time.Second)
}

func DefaultPluginLocation(pluginName string) string {
hd, _ := os.UserHomeDir()
return fmt.Sprintf("%s/.lakefs/plugins/%s", hd, pluginName)
viper.SetDefault("plugins.default_path", "~/.lakefs/plugins")
}
16 changes: 9 additions & 7 deletions pkg/plugins/diff/delta_diff_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl TableDiffer for DifferService {
let diff_props: DiffProps = r.props.expect("Missing diff properties");
let left_table_path: TablePath = diff_props.left_table_path.expect("Missing left table's path");
let right_table_path: TablePath = diff_props.right_table_path.expect("Missing right table's path");

let s3_config_map: HashMap<String, String> = utils::construct_storage_config(s3_gateway_config_req);
let left_table_res =
delta_ops::get_delta_table(&s3_config_map, &diff_props.repo, &left_table_path);
Expand Down Expand Up @@ -120,12 +119,14 @@ async fn get_diff_type(left_table_res: &Result<DeltaTable, Status>,
(Err(status), Ok(_)) => {
// left table wasn't found, and it doesn't exist on the base: table created
if matches!(status.code(), Code::NotFound) {
if base_table_res.is_err() && matches!(base_table_res.as_ref().unwrap_err().code(), Code::NotFound) {
Ok(DiffType::Created)
} else if base_table_res.is_ok() { // The table existed on the base branch
if base_table_res.is_err() {
if matches!(base_table_res.as_ref().unwrap_err().code(), Code::NotFound | Code::InvalidArgument) { // didn't exist in base ref or no base ref provided
Ok(DiffType::Created)
} else { // There was other kind of error with the base branch
Err(base_table_res.as_ref().unwrap_err().clone())
}
} else { // The table existed on the base branch
Ok(DiffType::Changed)
} else { // There was other kind of error with the base branch
Err(base_table_res.as_ref().unwrap_err().clone())
}
} else {
Err(left_table_res.as_ref().unwrap_err().clone())
Expand Down Expand Up @@ -166,12 +167,13 @@ impl HistoryAndVersion {
}
}

fn show_history(mut hist: Vec<Map<String, Value>>, table_version: DeltaDataTypeVersion) -> Result<Vec<TableOperation>, Status> {
fn show_history(mut hist: Vec<Map<String, Value>>, mut table_version: DeltaDataTypeVersion) -> Result<Vec<TableOperation>, Status> {
hist.reverse();
let mut ans: Vec<TableOperation> = Vec::with_capacity(hist.len());
for commit in hist {
let table_op = construct_table_op(&commit, table_version)?;
ans.push(table_op);
table_version -= 1;
}
Ok(ans)
}
Expand Down
45 changes: 38 additions & 7 deletions pkg/plugins/diff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package tablediff
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/mitchellh/go-homedir"

"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/plugins"
Expand Down Expand Up @@ -133,7 +137,7 @@ func (s *Service) appendClosingFunction(diffType string, f func()) {
}

// NewService is used to initialize a new Differ service. The returned function is a closing function for the service.
func NewService(diffProps map[string]config.DiffProps, pluginProps map[string]config.PluginProps) (*Service, func()) {
func NewService(diffProps map[string]config.DiffProps, pluginProps config.Plugins) (*Service, func()) {
service := &Service{
pluginHandler: internal.NewManager[Differ](),
closeFunctions: make(map[string]func()),
Expand All @@ -142,16 +146,22 @@ func NewService(diffProps map[string]config.DiffProps, pluginProps map[string]co
return service, service.Close
}

func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pluginProps map[string]config.PluginProps) {
func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pluginProps config.Plugins) {
registerDefaultPlugins(service, pluginProps.DefaultPath)
for n, p := range diffProps {
pluginName := p.PluginName
// If the requested plugin wasn't configured with a path, it will be defined under the default location
pluginPath := config.DefaultPluginLocation(pluginName)
pluginPath := filepath.Join(diffPluginsDefaultPath(pluginProps.DefaultPath), pluginName)
pluginVersion := 1 // default version
if props, ok := pluginProps[pluginName]; ok {
pluginPath = props.Path
if props.Version != nil {
pluginVersion = *props.Version
if props, ok := pluginProps.Properties[pluginName]; ok {
pp, err := homedir.Expand(props.Path)
if err != nil {
logging.Default().Errorf("failed to register a plugin for an invalid path: '%s'", props.Path)
continue
}
pluginPath = pp
if props.Version != 0 {
pluginVersion = props.Version
}
}

Expand All @@ -166,3 +176,24 @@ func registerPlugins(service *Service, diffProps map[string]config.DiffProps, pl
logging.Default().Infof("successfully registered a plugin for diff type: '%s'", n)
}
}

func registerDefaultPlugins(service *Service, pluginsPath string) {
diffPluginsDir := diffPluginsDefaultPath(pluginsPath)
deltaPath := filepath.Join(diffPluginsDir, "delta")
_, err := os.Stat(deltaPath)
if err != nil {
if !os.IsNotExist(err) {
logging.Default().WithError(err).Error("failed to access delta lake diff plugin")
}
return
}

pid := plugins.PluginIdentity{ProtocolVersion: 1, ExecutableLocation: deltaPath}
pa := plugins.PluginHandshake{}
RegisterDeltaLakeDiffPlugin(service, pid, pa)
}

func diffPluginsDefaultPath(pluginsPath string) string {
pp, _ := homedir.Expand(pluginsPath)
return filepath.Join(pp, "diff")
}
Loading

0 comments on commit 39b9aa1

Please sign in to comment.