-
Notifications
You must be signed in to change notification settings - Fork 365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Export Hooks: Glue Catalog exporter #6653
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
1cf8162
prefix array
Isan-Rivkin 6a92708
working create table
Isan-Rivkin 9643f0f
update table
Isan-Rivkin e13b1a0
Merge branch 'master' into 6575-glue-exporter
Isan-Rivkin 3451f33
update
Isan-Rivkin 3d75553
working glue lib
Isan-Rivkin e06ceb4
remove todos
Isan-Rivkin cc9e4d2
fix linter
Isan-Rivkin 2f55597
Merge branch 'master' into 6575-glue-exporter
Isan-Rivkin af6d707
linter fix
Isan-Rivkin 33724d2
review comments
Isan-Rivkin fba4e03
review comments
Isan-Rivkin 3823378
rename aws file
Isan-Rivkin 1222bf1
minor
Isan-Rivkin e49f891
add comments final review fixes
Isan-Rivkin 679f2b2
Merge branch 'master' into 6575-glue-exporter
Isan-Rivkin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
local pathlib = require("path") | ||
local json = require("encoding/json") | ||
local lakefs = require("lakefs") | ||
local extractor = require("lakefs/catalogexport/table_extractor") | ||
local utils = require("lakefs/catalogexport/internal") | ||
local sym_exporter = require("lakefs/catalogexport/symlink_exporter") | ||
|
||
--[[ | ||
Generate glue table name | ||
@descriptor(Table): object from (i.e _lakefs_tables/my_table.yaml) | ||
@action_info(Table): the global action object | ||
]] | ||
local function get_full_table_name(descriptor, action_info) | ||
local commit_id = action_info.commit_id | ||
local repo_id = action_info.repository_id | ||
local branch_or_tag = utils.ref_from_branch_or_tag(action_info) | ||
local sha = utils.short_digest(commit_id) | ||
return string.format("%s_%s_%s_%s", descriptor.name, repo_id, branch_or_tag, sha) | ||
end | ||
|
||
-- map hive to glue types | ||
local typesMapping = { | ||
integer = "int" | ||
} | ||
|
||
-- helper function to convert hive col to part of glue create table input | ||
local function hive_col_to_glue(col) | ||
return { | ||
Name = col.name, | ||
Type = typesMapping[col.type] or col.type, | ||
Comment = col.comment, | ||
Parameters = col.parameters | ||
} | ||
end | ||
|
||
-- Create list of partitions for Glue input from a Hive descriptor | ||
local function hive_partitions_to_glue_input(descriptor) | ||
local partitions = {} | ||
local cols = descriptor.schema.fields or {} | ||
-- columns list to map by name | ||
for _, c in ipairs(cols) do | ||
cols[c.name] = c | ||
end | ||
-- iterate partitions order and find them in the fields, the order determines the path in storage | ||
for _, part_key in ipairs(descriptor.partition_columns) do | ||
local col = cols[part_key] | ||
if col == nil then | ||
error(string.format("partition name `%s` not found in table `%s`", part_key, descriptor.name)) | ||
end | ||
table.insert(partitions, hive_col_to_glue(col)) | ||
end | ||
return partitions | ||
end | ||
|
||
-- Create list of columns for Glue excluding partitions | ||
local function hive_columns_to_glue_input(descriptor) | ||
-- create set of partition names since they must not appear in the columns input in glue | ||
local partition_names = {} | ||
for _, p in ipairs(descriptor.partition_columns) do | ||
partition_names[p] = true | ||
end | ||
-- create columns as inputs for glue | ||
local columns = {} | ||
local cols = descriptor.schema.fields or {} | ||
for _, col in ipairs(cols) do | ||
if not partition_names[col.name] then -- not a partition | ||
table.insert(columns, hive_col_to_glue(col)) | ||
end | ||
end | ||
return columns | ||
end | ||
|
||
-- default location value (e.g root location of either partitions or flat symlink.txt file) | ||
local function get_table_location(storage_base_prefix, descriptor, action_info) | ||
local commit_id = action_info.commit_id | ||
local export_base_uri = sym_exporter.get_storage_uri_prefix(storage_base_prefix, commit_id, action_info) | ||
return pathlib.join("/", export_base_uri, descriptor.name) | ||
end | ||
|
||
-- create a standard AWS Glue table input (i.e not Apache Iceberg), add input values to base input and configure the rest | ||
local function build_glue_create_table_input(base_input, descriptor, symlink_location, columns, partitions, action_info, | ||
options) | ||
local input = utils.deepcopy(base_input) | ||
local opts = options or {} | ||
input.Name = opts.table_name or get_full_table_name(descriptor, action_info) | ||
input.PartitionKeys = array(partitions) | ||
input.TableType = "EXTERNAL_TABLE" | ||
input.StorageDescriptor.Columns = array(columns) | ||
input.StorageDescriptor.Location = symlink_location | ||
return input | ||
end | ||
|
||
--[[ | ||
create a standard glue table in glue catalog | ||
@glue: AWS glue client | ||
@db(string): glue database name | ||
@table_src_path(string): path to table spec (i.e _lakefs_tables/my_table.yaml) | ||
@create_table_input(Table): struct mapping to table_input in AWS https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTable.html#API_CreateTable_RequestSyntax | ||
should contain inputs describing the data format (i.e InputFormat, OutputFormat, SerdeInfo) since the exporter is agnostic to this. | ||
by default this function will configure table location and schema. | ||
@action_info(Table): the global action object | ||
@options: | ||
- table_name(string): override default glue table name | ||
- debug(boolean) | ||
- export_base_uri(string): override the default prefix in S3 for symlink location i.e s3://other-bucket/path/ | ||
]] | ||
local function export_glue(glue, db, table_src_path, create_table_input, action_info, options) | ||
local opts = options or {} | ||
local repo_id = action_info.repository_id | ||
local commit_id = action_info.commit_id | ||
|
||
-- get table desctiptor from _lakefs_tables/ | ||
local descriptor = extractor.get_table_descriptor(lakefs, repo_id, commit_id, table_src_path) | ||
|
||
-- get table symlink location uri | ||
local base_prefix = opts.export_base_uri or action_info.storage_namespace | ||
local symlink_location = get_table_location(base_prefix, descriptor, action_info) | ||
|
||
-- parse Hive table | ||
local columns = {} | ||
local partitions = {} | ||
if descriptor.type == "hive" then | ||
-- convert hive cols/partitions to glue | ||
partitions = hive_partitions_to_glue_input(descriptor) | ||
columns = hive_columns_to_glue_input(descriptor) | ||
else | ||
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported") | ||
end | ||
|
||
-- finallize create glue table input | ||
local table_input = build_glue_create_table_input(create_table_input, descriptor, symlink_location, columns, | ||
partitions, action_info, opts) | ||
|
||
-- create table | ||
local json_input = json.marshal(table_input) | ||
if opts.debug then | ||
print("Creating Glue Table - input:", json_input) | ||
end | ||
glue.create_table(db, json_input) | ||
return { | ||
table_input = table_input | ||
} | ||
end | ||
|
||
return { | ||
get_full_table_name=get_full_table_name, | ||
export_glue = export_glue | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package aws | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/Shopify/go-lua" | ||
) | ||
|
||
func Open(l *lua.State, ctx context.Context) { | ||
open := func(l *lua.State) int { | ||
lua.NewLibrary(l, []lua.RegistryFunction{ | ||
{Name: "s3_client", Function: newS3Client(ctx)}, | ||
{Name: "glue_client", Function: newGlueClient(ctx)}, | ||
}) | ||
return 1 | ||
} | ||
lua.Require(l, "aws", open, false) | ||
l.Pop(1) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you see any real usage for this one in the near future? if not inline it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I see usage incase user want's to use
opts.override_create_table_input
and change certain things inbuild_glue_create_table_input
output.Actually I added now export for this function as well outside of the module.