Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task: Support Azure Unity Catalog export #7554

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/howto/hooks/lua.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the export_delta_log method with the new parameter?

Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ Sets the object at the given bucket and key to the value of the supplied value s
Deletes the object at the given key
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/abfss_transform_path(path)`
Isan-Rivkin marked this conversation as resolved.
Show resolved Hide resolved

Transform an HTTPS Azure URL to a ABFSS scheme. Used by the delta_exporter function to support Azure Unity catalog use cases
`path` - A valid Azure blob storage URL in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `crypto`

### `crypto/aes/encryptCBC(key, plaintext)`
Expand Down Expand Up @@ -476,7 +481,7 @@ Parameters:

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)`
### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path, path_transformer)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data) and latest Delta table version's metadata.
Expand All @@ -490,6 +495,7 @@ Parameters:
- `write_object`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3_client.put_object` or `azure/blob_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_def_names` reside
- `path_transformer`: (Optional) A function(path) used for transforming the path of the saved delta logs path fields as well as the saved table physical path (used to support Azure Unity catalog use cases)

Delta export example for AWS S3:

Expand Down Expand Up @@ -739,6 +745,10 @@ The registration will use the following paths to register the table:
`<catalog>.<branch name>.<table_name>` where the branch name will be used as the schema name.
The return value is a table with mapping of table names to registration request status.

**Note: (Azure users)** Databricks catalog external locations is supported only for ADLS Gen2 storage accounts.
When exporting Delta tables using the `lakefs/catalogexport/delta_exporter.export_delta_log` function, the `path_transformer` must be
used to convert the paths scheme to `abfss`. the built-in `azure` lua library provides this functionality in `transformPathToAbfss`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
used to convert the paths scheme to `abfss`. the built-in `azure` lua library provides this functionality in `transformPathToAbfss`.
used to convert the paths scheme to `abfss`. The built-in `azure` lua library provides this functionality in `transformPathToAbfss`.

Just had to...


Parameters:

- `action(table)`: The global action table
Expand Down
97 changes: 97 additions & 0 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,100 @@ func getStorageNamespace(t *testing.T, ctx context.Context, repo string) string
require.NotNil(t, resp.JSON200)
return resp.JSON200.StorageNamespace
}

func TestDeltaCatalogExportAbfss(t *testing.T) {
requireBlockstoreType(t, block.BlockstoreTypeAzure)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

accessKeyID := viper.GetString("access_key_id")
secretAccessKey := viper.GetString("secret_access_key")
testData := &exportHooksTestData{
Repository: repo,
Branch: mainBranch,
LakeFSAccessKeyID: accessKeyID,
LakeFSSecretAccessKey: secretAccessKey,
AzureStorageAccount: viper.GetString("azure_storage_account"),
AzureAccessKey: viper.GetString("azure_storage_access_key"),
}

tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta")
require.NoError(t, err)
err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
buf, err := fs.ReadFile(tmplDir, path)
if err != nil {
return err
}
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
if err != nil {
return err
}
require.Equal(t, http.StatusCreated, uploadResp.StatusCode())
}
return nil
})
require.NoError(t, err)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, "azure_adls/_lakefs_actions/delta_export.yaml"),
})

runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1)
run := runs.Results[0]
require.Equal(t, "completed", run.Status)

amount := apigen.PaginationAmount(1)
tasks, err := client.ListRunHooksWithResponse(ctx, repo, run.RunId, &apigen.ListRunHooksParams{
Amount: &amount,
})
require.NoError(t, err)
require.NotNil(t, tasks.JSON200)
require.Equal(t, 1, len(tasks.JSON200.Results))
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportAbfss(t, ctx, headCommit.Id, testData)
}

func validateExportAbfss(t *testing.T, ctx context.Context, commit string, testData *exportHooksTestData) {
resp, err := client.GetRepositoryWithResponse(ctx, testData.Repository)
require.NoError(t, err)
require.NotNil(t, resp.JSON200)
namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json"
tableStat, err := client.StatObjectWithResponse(ctx, testData.Repository, mainBranch, &apigen.StatObjectParams{
Path: "tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet",
})
require.NoError(t, err)
require.NotNil(t, tableStat.JSON200)
u, err := url.Parse(tableStat.JSON200.PhysicalAddress)
require.NoError(t, err)
storageAccount, _, found := strings.Cut(u.Host, ".")
require.True(t, found)
container, path, found := strings.Cut(strings.TrimPrefix(u.Path, "/"), "/")
require.True(t, found)
expectedPath := fmt.Sprintf("abfss://%s@%s.dfs.core.windows.net/%s", container, storageAccount, path)
var reader io.ReadCloser
azClient, err := azure.BuildAzureServiceClient(params.Azure{
StorageAccount: testData.AzureStorageAccount,
StorageAccessKey: testData.AzureAccessKey,
})
require.NoError(t, err)

containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6])
readResp, err := azClient.NewContainerClient(containerName).NewBlockBlobClient(key).DownloadStream(ctx, nil)
require.NoError(t, err)
reader = readResp.Body

defer func() {
err := reader.Close()
require.NoError(t, err)
}()
contents, err := io.ReadAll(reader)
require.NoError(t, err)
require.Contains(t, string(contents), expectedPath)
}
16 changes: 8 additions & 8 deletions esti/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/treeverse/lakefs/pkg/api/apiutil"
)

func found(ctx context.Context, repo, ref, path string) (bool, error) {
func objectFound(ctx context.Context, repo, ref, path string) (bool, error) {
res, err := client.GetObjectWithResponse(ctx, repo, ref, &apigen.GetObjectParams{Path: path})
if err == nil && res.HTTPResponse.StatusCode == http.StatusOK {
return true, nil
Expand All @@ -37,15 +37,15 @@ func TestDeleteStaging(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

resp, err := client.DeleteObjectWithResponse(ctx, repo, mainBranch, &apigen.DeleteObjectParams{Path: objPath})
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, resp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand All @@ -57,7 +57,7 @@ func TestDeleteCommitted(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

Expand All @@ -69,7 +69,7 @@ func TestDeleteCommitted(t *testing.T) {
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, getResp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestDeleteObjectsReadOnlyRepository(t *testing.T) {
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, deleteResp.StatusCode())

f, err := found(ctx, repoName, mainBranch, objPath)
f, err := objectFound(ctx, repoName, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand All @@ -130,7 +130,7 @@ func TestCommitDeleteCommitted(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

Expand All @@ -150,7 +150,7 @@ func TestCommitDeleteCommitted(t *testing.T) {
require.NoError(t, err, "commit delete file")
require.Equal(t, http.StatusCreated, commitResp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local azure = require("azure")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local json = require("encoding/json")

local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_details = delta_exporter.export_delta_log(action, args.table_names, write_object, delta_client, table_descriptors_path, azure.abfss_transform_path)

for t, details in pairs(delta_table_details) do
if details["path"] == nil then
error("Delta Lake exported table \"" .. t .. "\"'s location is not available\n")
end
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. details["path"] .. "\n")
if details["metadata"] == nil then
error("Delta Lake exported table \"" .. t .. "\"'s metadata is not available\n")
end
print("Delta Lake exported table \"" .. t .. "\"'s metadata:\n")
for k, v in pairs(details["metadata"]) do
if type(v) ~= "table" then
print("\t" .. k .. " = " .. v .. "\n")
else
print("\t" .. k .. " = " .. json.marshal(v) .. "\n")
end
end
end
args:
azure:
storage_account: "{{ .AzureStorageAccount }}"
access_key: "{{ .AzureAccessKey }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_names:
- test-table
22 changes: 11 additions & 11 deletions esti/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestResetAll(t *testing.T) {

// upload file
_, objContent := uploadFileRandomData(ctx, t, repo, mainBranch, objPath)
f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestHardReset(t *testing.T) {

// upload file
_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)
f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -110,12 +110,12 @@ func TestResetPath(t *testing.T) {

// upload files
_, objContent1 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestResetPath(t *testing.T) {
require.Equal(t, objContent1, body, fmt.Sprintf("path: %s, expected: %s, actual:%s", objPath1, objContent1, body))

// assert file2 doesn't exists
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.False(t, f, "object not found")
}
Expand All @@ -177,12 +177,12 @@ func TestResetObject(t *testing.T) {

// upload files
_, objContent1 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -230,7 +230,7 @@ func TestResetObject(t *testing.T) {
require.Equal(t, objContent1, body, fmt.Sprintf("path: %s, expected: %s, actual:%s", objPath1, objContent1, body))

// assert file2 doesn't exists
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.False(t, f, "object not found")
}
Expand All @@ -243,7 +243,7 @@ func TestRevert(t *testing.T) {

// upload file1
uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand All @@ -260,7 +260,7 @@ func TestRevert(t *testing.T) {

// upload file2
_, objContent2 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand All @@ -281,7 +281,7 @@ func TestRevert(t *testing.T) {
"failed to revert commit %s repo %s branch %s", commitId, repo, mainBranch)

// assert file1 doesn't exist
f, err = found(ctx, repo, mainBranch, objPath1)
f, err = objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.False(t, f, "object not found")

Expand Down
25 changes: 25 additions & 0 deletions examples/hooks/unity_table_export_azure.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
As an exhaustive example, it will first start off with a Delta Lake tables export, then continue to register the table
with Unity Catalog
]]

local azure = require("azure")
local formats = require("formats")
local databricks = require("databricks")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local unity_exporter = require("lakefs/catalogexport/unity_exporter")

local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_details = delta_exporter.export_delta_log(action, args.table_defs, write_object, delta_client, table_descriptors_path, azure.abfss_transform_path)

-- Register the exported table in Unity Catalog:
local databricks_client = databricks.client(args.databricks_host, args.databricks_token)
local registration_statuses = unity_exporter.register_tables(action, "_lakefs_tables", delta_table_details, databricks_client, args.warehouse_id)
for t, status in pairs(registration_statuses) do
print("Unity catalog registration for table \"" .. t .. "\" completed with status: " .. status .. "\n")
end
Loading
Loading