Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Hooks: Glue Catalog exporter #6653

Merged
merged 16 commits into from
Oct 3, 2023
123 changes: 123 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/glue_exporter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
local pathlib = require("path")
local json = require("encoding/json")
local lakefs = require("lakefs")
local extractor = require("lakefs/catalogexport/table_extractor")
local utils = require("lakefs/catalogexport/internal")
local sym_exporter = require("lakefs/catalogexport/symlink_exporter")

local function get_full_table_name(descriptor, action_info)
local commit_id = action_info.commit_id
local repo_id = action_info.repository_id
local branch_or_tag = utils.ref_from_branch_or_tag(action_info)
local sha = utils.short_digest(commit_id)
return string.format("%s_%s_%s_%s", descriptor.name, repo_id, branch_or_tag, sha)
end

-- map hive to glue types
local typesMapping = {
integer = "int"
}

local function hive_col_to_glue(col)
return {
Name = col.name,
Type = typesMapping[col.type] or col.type,
Comment = col.comment,
Parameters = col.parameters
}
end

local function hive_partitions_to_glue_input(descriptor)
local partitions = {}
local cols = descriptor.schema.fields or {}
-- columns list to map by name
for _, c in ipairs(cols) do
cols[c.name] = c
end
-- iterate partitions order and find them in the fields, the order determines the path in storage
for _, part_key in ipairs(descriptor.partition_columns) do
local col = cols[part_key]
if col == nil then
error(string.format("partition name `%s` not found in table `%s`", part_key, descriptor.name))
end
table.insert(partitions, hive_col_to_glue(col))
end
return partitions
end

local function hive_columns_to_glue_input(descriptor)
-- create set of partition names since they must not appear in the columns input in glue
local partition_names = {}
for _, p in ipairs(descriptor.partition_columns) do
partition_names[p] = true
end
-- create columns as inputs for glue
local columns = {}
local cols = descriptor.schema.fields or {}
for _, col in ipairs(cols) do
if not partition_names[col.name] then -- not a partition
table.insert(columns, hive_col_to_glue(col))
end
end
return columns
end

-- default location value (e.g root location of either partitions or flat symlink.txt file)
local function get_table_location(storage_base_prefix, descriptor, action_info)
local commit_id = action_info.commit_id
local export_base_uri = sym_exporter.get_storage_uri_prefix(storage_base_prefix, commit_id, action_info)
return pathlib.join("/", export_base_uri, descriptor.name)
end

-- create a standard AWS Glue table input (i.e not Apache Iceberg), add input values to base input and configure the rest
local function build_glue_create_table_input(base_input, descriptor, symlink_location, columns, partitions, action_info,
Comment on lines +80 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you see any real usage for this one in the near future? if not inline it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see usage incase user want's to use opts.override_create_table_input and change certain things in build_glue_create_table_input output.
Actually I added now export for this function as well outside of the module.

options)
local input = utils.deepcopy(base_input)
local opts = options or {}
input.Name = opts.table_name or get_full_table_name(descriptor, action_info)
input.PartitionKeys = array(partitions)
input.TableType = "EXTERNAL_TABLE"
input.StorageDescriptor.Columns = array(columns)
input.StorageDescriptor.Location = symlink_location
return input
end

local function build_glue_create_table_input_from_hive(descriptor, base_input, action_info, options)
local opts = options or {}
-- get table symlink location uri
local base_prefix = opts.export_base_uri or action_info.storage_namespace
local symlink_location = get_table_location(base_prefix, descriptor, action_info)

-- convert hive cols/partitions to glue
local partitions = hive_partitions_to_glue_input(descriptor)
local cols = hive_columns_to_glue_input(descriptor)
return build_glue_create_table_input(base_input, descriptor, symlink_location, cols, partitions, action_info, opts)
end

-- create table in glue
local function export_glue(glue, db, table_src_path, create_table_table_input, action_info, options)
local opts = options or {}
local repo_id = action_info.repository_id
local commit_id = action_info.commit_id
-- get table desctiptor
local descriptor = extractor.get_table_descriptor(lakefs, repo_id, commit_id, table_src_path)

-- build table creation input for glue
local table_input = opts.override_create_table_input or
build_glue_create_table_input_from_hive(descriptor, create_table_table_input, action_info,
opts)
-- create table
local json_input = json.marshal(table_input)
if opts.debug then
print("Creating Glue Table - input:", json_input)
end
glue.create_table(db, json_input)
return {
create_table_input = table_input
}
end

return {
build_glue_create_table_input_from_hive=build_glue_create_table_input_from_hive,
export_glue = export_glue
}
17 changes: 9 additions & 8 deletions pkg/actions/lua/lakefs/catalogexport/hive.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local pathlib = require("path")
local utils = require("lakefs/catalogexport/internal")
local strings = require("strings")
local DEFAULT_PAGE_SIZE = 30
local DEFAULT_PAGE_SIZE = 30

-- extract partition prefix from full path
local function extract_partitions_path(partitions, path)
Expand All @@ -11,7 +11,7 @@ local function extract_partitions_path(partitions, path)
local idx = 1
local is_partition_prefix = strings.has_prefix(path, partitions[1])
for part_idx, partition in ipairs(partitions) do
local col_substr = "/" .. partition .. "="
local col_substr = "/" .. partition .. "="
-- if partition is the path prefix and we are the that first partition remove /
if part_idx == 1 and is_partition_prefix then
col_substr = partition .. "="
Expand All @@ -20,7 +20,7 @@ local function extract_partitions_path(partitions, path)
if i == nil then
return nil
end
local separator_idx = string.find(path, "/", j+1)
local separator_idx = string.find(path, "/", j + 1)
-- verify / found and there is something in between = ... /
if separator_idx == nil or separator_idx <= (j + 1) then
return nil
Expand All @@ -33,7 +33,8 @@ end
-- Hive format partition iterator each result set is a collection of files under the same partition
local function extract_partition_pager(client, repo_id, commit_id, base_path, partition_cols, page_size)
local target_partition = ""
local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path,"", page_size or DEFAULT_PAGE_SIZE)
local pager = utils.lakefs_object_pager(client, repo_id, commit_id, "", base_path, "",
page_size or DEFAULT_PAGE_SIZE)
local page = pager()
return function()
if page == nil then
Expand Down Expand Up @@ -66,13 +67,13 @@ local function extract_partition_pager(client, repo_id, commit_id, base_path, pa
size = entry.size_bytes,
checksum = entry.checksum
})
-- remove entry only if its part of the current partition
table.remove(page, 1)
end
-- remove entry (if its part of current partition, hidden files etc) from the entry set
table.remove(page, 1)
end
end
end

return {
extract_partition_pager=extract_partition_pager,
}
extract_partition_pager = extract_partition_pager
}
16 changes: 16 additions & 0 deletions pkg/actions/lua/lakefs/catalogexport/internal.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
local url = require("net/url")
local DEFAULT_SHORT_DIGEST_LEN=6

local function deepcopy(orig)
local orig_type = type(orig)
local copy
if orig_type == 'table' then
copy = {}
for orig_key, orig_value in next, orig, nil do
copy[deepcopy(orig_key)] = deepcopy(orig_value)
end
setmetatable(copy, deepcopy(getmetatable(orig)))
else -- number, string, boolean, etc
copy = orig
end
return copy
end

local function short_digest(digest, len)
return digest:sub(1, len or DEFAULT_SHORT_DIGEST_LEN)
end
Expand Down Expand Up @@ -52,6 +67,7 @@ local function parse_storage_uri(uri)
end

return {
deepcopy=deepcopy,
parse_storage_uri=parse_storage_uri,
short_digest=short_digest,
ref_from_branch_or_tag=ref_from_branch_or_tag,
Expand Down
5 changes: 3 additions & 2 deletions pkg/actions/lua/lakefs/catalogexport/symlink_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ local function export_s3(s3_client, table_src_path, action_info, options)
if opts.debug then
print("S3 writing bucket: " .. location.bucket .. " key: " .. key)
end
put_obj(location.bucket, key, symlink.data)
put_object(location.bucket, key, symlink.data)
end
return {
location = location
}
end

return {
export_s3 = export_s3
export_s3 = export_s3,
get_storage_uri_prefix=get_storage_uri_prefix,
}
19 changes: 19 additions & 0 deletions pkg/actions/lua/storage/aws/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package aws

import (
"context"

"github.com/Shopify/go-lua"
)

func Open(l *lua.State, ctx context.Context) {
open := func(l *lua.State) int {
lua.NewLibrary(l, []lua.RegistryFunction{
{Name: "s3_client", Function: newS3Client(ctx)},
{Name: "glue_client", Function: newGlueClient(ctx)},
})
return 1
}
lua.Require(l, "aws", open, false)
l.Pop(1)
}
Loading