Skip to content

Commit

Permalink
Task: Support Azure Unity Catalog export (#7554)
Browse files Browse the repository at this point in the history
* Task: Support Azure Unity Catalog export

* Fix test

* Fix transform func name

* Fix test for real

* CR Fixes

* CR Fixes 2
  • Loading branch information
N-o-Z authored Mar 14, 2024
1 parent 305080d commit ccd6db6
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 21 deletions.
12 changes: 11 additions & 1 deletion docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ Sets the object at the given bucket and key to the value of the supplied value s
Deletes the object at the given key
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/abfss_transform_path(path)`

Transform an HTTPS Azure URL to a ABFSS scheme. Used by the delta_exporter function to support Azure Unity catalog use cases
`path` - A valid Azure blob storage URL in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `crypto`

### `crypto/aes/encryptCBC(key, plaintext)`
Expand Down Expand Up @@ -476,7 +481,7 @@ Parameters:

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)`
### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path, path_transformer)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data) and latest Delta table version's metadata.
Expand All @@ -490,6 +495,7 @@ Parameters:
- `write_object`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3_client.put_object` or `azure/blob_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_def_names` reside
- `path_transformer`: (Optional) A function(path) used for transforming the path of the saved delta logs path fields as well as the saved table physical path (used to support Azure Unity catalog use cases)

Delta export example for AWS S3:

Expand Down Expand Up @@ -739,6 +745,10 @@ The registration will use the following paths to register the table:
`<catalog>.<branch name>.<table_name>` where the branch name will be used as the schema name.
The return value is a table with mapping of table names to registration request status.

**Note: (Azure users)** Databricks catalog external locations is supported only for ADLS Gen2 storage accounts.
When exporting Delta tables using the `lakefs/catalogexport/delta_exporter.export_delta_log` function, the `path_transformer` must be
used to convert the paths scheme to `abfss`. The built-in `azure` lua library provides this functionality with `transformPathToAbfss`.

Parameters:

- `action(table)`: The global action table
Expand Down
97 changes: 97 additions & 0 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,100 @@ func getStorageNamespace(t *testing.T, ctx context.Context, repo string) string
require.NotNil(t, resp.JSON200)
return resp.JSON200.StorageNamespace
}

func TestDeltaCatalogExportAbfss(t *testing.T) {
requireBlockstoreType(t, block.BlockstoreTypeAzure)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

accessKeyID := viper.GetString("access_key_id")
secretAccessKey := viper.GetString("secret_access_key")
testData := &exportHooksTestData{
Repository: repo,
Branch: mainBranch,
LakeFSAccessKeyID: accessKeyID,
LakeFSSecretAccessKey: secretAccessKey,
AzureStorageAccount: viper.GetString("azure_storage_account"),
AzureAccessKey: viper.GetString("azure_storage_access_key"),
}

tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta")
require.NoError(t, err)
err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
buf, err := fs.ReadFile(tmplDir, path)
if err != nil {
return err
}
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
if err != nil {
return err
}
require.Equal(t, http.StatusCreated, uploadResp.StatusCode())
}
return nil
})
require.NoError(t, err)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, "azure_adls/_lakefs_actions/delta_export.yaml"),
})

runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1)
run := runs.Results[0]
require.Equal(t, "completed", run.Status)

amount := apigen.PaginationAmount(1)
tasks, err := client.ListRunHooksWithResponse(ctx, repo, run.RunId, &apigen.ListRunHooksParams{
Amount: &amount,
})
require.NoError(t, err)
require.NotNil(t, tasks.JSON200)
require.Equal(t, 1, len(tasks.JSON200.Results))
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportAbfss(t, ctx, headCommit.Id, testData)
}

func validateExportAbfss(t *testing.T, ctx context.Context, commit string, testData *exportHooksTestData) {
resp, err := client.GetRepositoryWithResponse(ctx, testData.Repository)
require.NoError(t, err)
require.NotNil(t, resp.JSON200)
namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json"
tableStat, err := client.StatObjectWithResponse(ctx, testData.Repository, mainBranch, &apigen.StatObjectParams{
Path: "tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet",
})
require.NoError(t, err)
require.NotNil(t, tableStat.JSON200)
u, err := url.Parse(tableStat.JSON200.PhysicalAddress)
require.NoError(t, err)
storageAccount, _, found := strings.Cut(u.Host, ".")
require.True(t, found)
container, path, found := strings.Cut(strings.TrimPrefix(u.Path, "/"), "/")
require.True(t, found)
expectedPath := fmt.Sprintf("abfss://%s@%s.dfs.core.windows.net/%s", container, storageAccount, path)
var reader io.ReadCloser
azClient, err := azure.BuildAzureServiceClient(params.Azure{
StorageAccount: testData.AzureStorageAccount,
StorageAccessKey: testData.AzureAccessKey,
})
require.NoError(t, err)

containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6])
readResp, err := azClient.NewContainerClient(containerName).NewBlockBlobClient(key).DownloadStream(ctx, nil)
require.NoError(t, err)
reader = readResp.Body

defer func() {
err := reader.Close()
require.NoError(t, err)
}()
contents, err := io.ReadAll(reader)
require.NoError(t, err)
require.Contains(t, string(contents), expectedPath)
}
16 changes: 8 additions & 8 deletions esti/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/treeverse/lakefs/pkg/api/apiutil"
)

func found(ctx context.Context, repo, ref, path string) (bool, error) {
func objectFound(ctx context.Context, repo, ref, path string) (bool, error) {
res, err := client.GetObjectWithResponse(ctx, repo, ref, &apigen.GetObjectParams{Path: path})
if err == nil && res.HTTPResponse.StatusCode == http.StatusOK {
return true, nil
Expand All @@ -37,15 +37,15 @@ func TestDeleteStaging(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

resp, err := client.DeleteObjectWithResponse(ctx, repo, mainBranch, &apigen.DeleteObjectParams{Path: objPath})
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, resp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand All @@ -57,7 +57,7 @@ func TestDeleteCommitted(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

Expand All @@ -69,7 +69,7 @@ func TestDeleteCommitted(t *testing.T) {
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, getResp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestDeleteObjectsReadOnlyRepository(t *testing.T) {
require.NoError(t, err, "failed to delete object")
require.Equal(t, http.StatusNoContent, deleteResp.StatusCode())

f, err := found(ctx, repoName, mainBranch, objPath)
f, err := objectFound(ctx, repoName, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Expand All @@ -130,7 +130,7 @@ func TestCommitDeleteCommitted(t *testing.T) {

_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)

f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.True(t, f, "uploaded object found")

Expand All @@ -150,7 +150,7 @@ func TestCommitDeleteCommitted(t *testing.T) {
require.NoError(t, err, "commit delete file")
require.Equal(t, http.StatusCreated, commitResp.StatusCode())

f, err = found(ctx, repo, mainBranch, objPath)
f, err = objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
assert.False(t, f, "deleted object found")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local azure = require("azure")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local json = require("encoding/json")
local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_details = delta_exporter.export_delta_log(action, args.table_names, write_object, delta_client, table_descriptors_path, azure.abfss_transform_path)
for t, details in pairs(delta_table_details) do
if details["path"] == nil then
error("Delta Lake exported table \"" .. t .. "\"'s location is not available\n")
end
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. details["path"] .. "\n")
if details["metadata"] == nil then
error("Delta Lake exported table \"" .. t .. "\"'s metadata is not available\n")
end
print("Delta Lake exported table \"" .. t .. "\"'s metadata:\n")
for k, v in pairs(details["metadata"]) do
if type(v) ~= "table" then
print("\t" .. k .. " = " .. v .. "\n")
else
print("\t" .. k .. " = " .. json.marshal(v) .. "\n")
end
end
end
args:
azure:
storage_account: "{{ .AzureStorageAccount }}"
access_key: "{{ .AzureAccessKey }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_names:
- test-table
22 changes: 11 additions & 11 deletions esti/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestResetAll(t *testing.T) {

// upload file
_, objContent := uploadFileRandomData(ctx, t, repo, mainBranch, objPath)
f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestHardReset(t *testing.T) {

// upload file
_, _ = uploadFileRandomData(ctx, t, repo, mainBranch, objPath)
f, err := found(ctx, repo, mainBranch, objPath)
f, err := objectFound(ctx, repo, mainBranch, objPath)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -110,12 +110,12 @@ func TestResetPath(t *testing.T) {

// upload files
_, objContent1 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestResetPath(t *testing.T) {
require.Equal(t, objContent1, body, fmt.Sprintf("path: %s, expected: %s, actual:%s", objPath1, objContent1, body))

// assert file2 doesn't exists
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.False(t, f, "object not found")
}
Expand All @@ -177,12 +177,12 @@ func TestResetObject(t *testing.T) {

// upload files
_, objContent1 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand Down Expand Up @@ -230,7 +230,7 @@ func TestResetObject(t *testing.T) {
require.Equal(t, objContent1, body, fmt.Sprintf("path: %s, expected: %s, actual:%s", objPath1, objContent1, body))

// assert file2 doesn't exists
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.False(t, f, "object not found")
}
Expand All @@ -243,7 +243,7 @@ func TestRevert(t *testing.T) {

// upload file1
uploadFileRandomData(ctx, t, repo, mainBranch, objPath1)
f, err := found(ctx, repo, mainBranch, objPath1)
f, err := objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand All @@ -260,7 +260,7 @@ func TestRevert(t *testing.T) {

// upload file2
_, objContent2 := uploadFileRandomData(ctx, t, repo, mainBranch, objPath2)
f, err = found(ctx, repo, mainBranch, objPath2)
f, err = objectFound(ctx, repo, mainBranch, objPath2)
require.NoError(t, err)
require.True(t, f, "uploaded object found")

Expand All @@ -281,7 +281,7 @@ func TestRevert(t *testing.T) {
"failed to revert commit %s repo %s branch %s", commitId, repo, mainBranch)

// assert file1 doesn't exist
f, err = found(ctx, repo, mainBranch, objPath1)
f, err = objectFound(ctx, repo, mainBranch, objPath1)
require.NoError(t, err)
require.False(t, f, "object not found")

Expand Down
25 changes: 25 additions & 0 deletions examples/hooks/unity_table_export_azure.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
As an exhaustive example, it will first start off with a Delta Lake tables export, then continue to register the table
with Unity Catalog
]]

local azure = require("azure")
local formats = require("formats")
local databricks = require("databricks")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local unity_exporter = require("lakefs/catalogexport/unity_exporter")

local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_details = delta_exporter.export_delta_log(action, args.table_defs, write_object, delta_client, table_descriptors_path, azure.abfss_transform_path)

-- Register the exported table in Unity Catalog:
local databricks_client = databricks.client(args.databricks_host, args.databricks_token)
local registration_statuses = unity_exporter.register_tables(action, "_lakefs_tables", delta_table_details, databricks_client, args.warehouse_id)
for t, status in pairs(registration_statuses) do
print("Unity catalog registration for table \"" .. t .. "\" completed with status: " .. status .. "\n")
end
Loading

0 comments on commit ccd6db6

Please sign in to comment.