Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task: Support Azure Unity Catalog export #7554

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/howto/hooks/lua.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the export_delta_log method with the new parameter?

Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ Sets the object at the given bucket and key to the value of the supplied value s
Deletes the object at the given key
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/abfss_transform_path(path)`
Isan-Rivkin marked this conversation as resolved.
Show resolved Hide resolved

Transform an HTTPS Azure URL to a ABFSS scheme. Used by the delta_exporter function to support Azure Unity catalog use cases
`path` - A valid Azure blob storage URL in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `crypto`

### `crypto/aes/encryptCBC(key, plaintext)`
Expand Down
98 changes: 98 additions & 0 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,101 @@ func getStorageNamespace(t *testing.T, ctx context.Context, repo string) string
require.NotNil(t, resp.JSON200)
return resp.JSON200.StorageNamespace
}

func TestDeltaCatalogExportAbfss(t *testing.T) {
requireBlockstoreType(t, block.BlockstoreTypeAzure)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

accessKeyID := viper.GetString("access_key_id")
secretAccessKey := viper.GetString("secret_access_key")
testData := &exportHooksTestData{
Repository: repo,
Branch: mainBranch,
LakeFSAccessKeyID: accessKeyID,
LakeFSSecretAccessKey: secretAccessKey,
AzureStorageAccount: viper.GetString("azure_storage_account"),
AzureAccessKey: viper.GetString("azure_storage_access_key"),
}
//blockstore := setupCatalogExportTestByStorageType(t, testData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?


tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta")
require.NoError(t, err)
err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
buf, err := fs.ReadFile(tmplDir, path)
if err != nil {
return err
}
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
if err != nil {
return err
}
require.Equal(t, http.StatusCreated, uploadResp.StatusCode())
}
return nil
})
require.NoError(t, err)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, "azure_adls/_lakefs_actions/delta_export.yaml"),
})

runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1)
run := runs.Results[0]
require.Equal(t, "completed", run.Status)

amount := apigen.PaginationAmount(1)
tasks, err := client.ListRunHooksWithResponse(ctx, repo, run.RunId, &apigen.ListRunHooksParams{
Amount: &amount,
})
require.NoError(t, err)
require.NotNil(t, tasks.JSON200)
require.Equal(t, 1, len(tasks.JSON200.Results))
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportAbfss(t, ctx, headCommit.Id, testData)
}

func validateExportAbfss(t *testing.T, ctx context.Context, commit string, testData *exportHooksTestData) {
resp, err := client.GetRepositoryWithResponse(ctx, testData.Repository)
require.NoError(t, err)
require.NotNil(t, resp.JSON200)
namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json"
tableStat, err := client.StatObjectWithResponse(ctx, testData.Repository, mainBranch, &apigen.StatObjectParams{
Path: "tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet",
})
require.NoError(t, err)
require.NotNil(t, tableStat.JSON200)
u, err := url.Parse(tableStat.JSON200.PhysicalAddress)
require.NoError(t, err)
storageAccount, _, found := strings.Cut(u.Host, ".")
require.True(t, found)
container, path, found := strings.Cut(strings.TrimPrefix(u.Path, "/"), "/")
require.True(t, found)
expectedPath := fmt.Sprintf("abfss://%s@%s.dfs.core.windows.net/%s", container, storageAccount, path)
var reader io.ReadCloser
azClient, err := azure.BuildAzureServiceClient(params.Azure{
StorageAccount: testData.AzureStorageAccount,
StorageAccessKey: testData.AzureAccessKey,
})
require.NoError(t, err)

containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6])
readResp, err := azClient.NewContainerClient(containerName).NewBlockBlobClient(key).DownloadStream(ctx, nil)
require.NoError(t, err)
reader = readResp.Body

defer func() {
err := reader.Close()
require.NoError(t, err)
}()
contents, err := io.ReadAll(reader)
require.NoError(t, err)
require.Contains(t, string(contents), expectedPath)
}
16 changes: 8 additions & 8 deletions esti/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/treeverse/lakefs/pkg/api/apiutil"
)

func found(ctx context.Context, repo, ref, path string) (bool, error) {
func objectFound(ctx context.Context, repo, ref, path string) (bool, error) {
res, err := client.GetObjectWithResponse(ctx, repo, ref, &apigen.GetObjectParams{Path: path})
if err == nil && res.HTTPResponse.StatusCode == http.StatusOK {
return true, nil
Expand All @@ -37,15 +37,15 @@ func TestDeleteStaging(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

resp, err := client.DeleteObjectWithResponse(ctx, repo, mainBranch, &apigen.DeleteObjectParams{Path: objPath})
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, resp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand All @@ -57,7 +57,7 @@ func TestDeleteCommitted(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

Expand All @@ -69,7 +69,7 @@ func TestDeleteCommitted(t *testing.T) {
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, getResp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestDeleteObjectsReadOnlyRepository(t *testing.T) {
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, deleteResp.StatusCode())

f, err := found(ctx, repoName, mainBranch, objPath)
f, err := objectFound(ctx, repoName, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand All @@ -130,7 +130,7 @@ func TestCommitDeleteCommitted(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

Expand All @@ -150,7 +150,7 @@ func TestCommitDeleteCommitted(t *testing.T) {
require.NoError(t, err, "commit delete file")
require.Equal(t, http.StatusCreated, commitResp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local azure = require("azure")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local json = require("encoding/json")

local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_details = delta_exporter.export_delta_log(action, args.table_names, write_object, delta_client, table_descriptors_path, azure.abfss_transform_path)

for t, details in pairs(delta_table_details) do
if details["path"] == nil then
error("Delta Lake exported table \"" .. t .. "\"'s location is not available\n")
end
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. details["path"] .. "\n")
if details["metadata"] == nil then
error("Delta Lake exported table \"" .. t .. "\"'s metadata is not available\n")
end
print("Delta Lake exported table \"" .. t .. "\"'s metadata:\n")
for k, v in pairs(details["metadata"]) do
if type(v) ~= "table" then
print("\t" .. k .. " = " .. v .. "\n")
else
print("\t" .. k .. " = " .. json.marshal(v) .. "\n")
end
end
end
args:
azure:
storage_account: "{{ .AzureStorageAccount }}"
access_key: "{{ .AzureAccessKey }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_names:
- test-table
22 changes: 11 additions & 11 deletions esti/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestResetAll(t *testing.T) {

// upload file
_, objContent := uploadFileRandomData(ctx, t, repo, mainBranch, objPath)
f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestHardReset(t *testing.T) {

// upload file
_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)
f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -110,12 +110,12 @@ func TestResetPath(t *testing.T) {

// upload files
_, objContent1 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestResetPath(t *testing.T) {
require.Equal(t, objContent1, body, fmt.Sprintf("path: %s, expected: %s, actual:%s", objPath1, objContent1, body))

// assert file2 doesn't exists
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.False(t, f, "object not found")
}
Expand All @@ -177,12 +177,12 @@ func TestResetObject(t *testing.T) {

// upload files
_, objContent1 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -230,7 +230,7 @@ func TestResetObject(t *testing.T) {
require.Equal(t, objContent1, body, fmt.Sprintf("path: %s, expected: %s, actual:%s", objPath1, objContent1, body))

// assert file2 doesn't exists
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.False(t, f, "object not found")
}
Expand All @@ -243,7 +243,7 @@ func TestRevert(t *testing.T) {

// upload file1
uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand All @@ -260,7 +260,7 @@ func TestRevert(t *testing.T) {

// upload file2
_, objContent2 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand All @@ -281,7 +281,7 @@ func TestRevert(t *testing.T) {
"failed to revert commit %s repo %s branch %s", commitId, repo, mainBranch)

// assert file1 doesn't exist
f, err = found(ctx, repo, mainBranch, objPath1)
f, err = objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.False(t, f, "object not found")

Expand Down
25 changes: 25 additions & 0 deletions examples/hooks/unity_table_export_azure.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
As an exhaustive example, it will first start off with a Delta Lake tables export, then continue to register the table
with Unity Catalog
]]

local azure = require("azure")
local formats = require("formats")
local databricks = require("databricks")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local unity_exporter = require("lakefs/catalogexport/unity_exporter")

local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_details = delta_exporter.export_delta_log(action, args.table_defs, write_object, delta_client, table_descriptors_path, azure.abfss_transform_path)

-- Register the exported table in Unity Catalog:
local databricks_client = databricks.client(args.databricks_host, args.databricks_token)
local registration_statuses = unity_exporter.register_tables(action, "_lakefs_tables", delta_table_details, databricks_client, args.warehouse_id)
for t, status in pairs(registration_statuses) do
print("Unity catalog registration for table \"" .. t .. "\" completed with status: " .. status .. "\n")
end
11 changes: 10 additions & 1 deletion pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ end

delta_client:
- get_table: function(repo, ref, prefix)

path_transformer: function(path) used for transforming path scheme (ex: Azure https to abfss)

]]
local function export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)
local function export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path, path_transformer)
local repo = action.repository_id
local commit_id = action.commit_id
if not commit_id then
Expand Down Expand Up @@ -118,6 +120,9 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
]]
local u = url.parse(obj_stat["physical_address"])
local physical_path = url.build_url(u["scheme"], u["host"], u["path"])
if path_transformer ~= nil then
physical_path = path_transformer(physical_path)
end
if entry.add ~= nil then
entry.add.path = physical_path
elseif entry.remove ~= nil then
Expand Down Expand Up @@ -158,6 +163,10 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
local version_key = storage_props.key .. "/" .. entry_version
write_object(storage_props.bucket, version_key, table_entry_string)
end
-- Save physical path using the path_transformer if exists
if path_transformer ~= nil then
table_physical_path = path_transformer(table_physical_path)
end
local table_val = {
path=table_physical_path,
metadata=metadata,
Expand Down
1 change: 1 addition & 0 deletions pkg/actions/lua/storage/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ func Open(l *lua.State, ctx context.Context) {
open := func(l *lua.State) int {
lua.NewLibrary(l, []lua.RegistryFunction{
{Name: "blob_client", Function: newBlobClient(ctx)},
{Name: "abfss_transform_path", Function: transformPathToAbfss},
})
return 1
}
Expand Down
Loading
Loading