Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Extractor Hook and _lakefs_tables format #6589

Merged
merged 30 commits into from
Sep 19, 2023
Merged

Conversation

Isan-Rivkin
Copy link
Contributor

@Isan-Rivkin Isan-Rivkin commented Sep 12, 2023

Closes #6573
PR Includes:

  • Initial export hook support: _lakefs_tables/ support in lua run time.
  • Lua runtime: loading .lua packages into actions, not only .go.

Reviewer info:

  • No breaking Change
  • Not included: tests, need to prioritize adding tests for the lakefs lua package in general.

Usage Example (with screen shots 😮 )

  1. Prerequisite - table data to exist! Let's write bunch of Hive compatible table data to lakeFS under the prefix tables/animals1:
$lakectl fs ls lakefs://lua/main/tables/animals1/ --recursive

object          2023-09-11 18:40:41 +0300 IDT    210 B           tables/animals1/type=axolotl/weight=12/name=doe/animals.csv
object          2023-09-11 18:40:44 +0300 IDT    210 B           tables/animals1/type=axolotl/weight=12/name=ok/animals.csv
object          2023-09-11 18:40:46 +0300 IDT    210 B           tables/animals1/type=axolotl/weight=22/name=john/animals.csv
object          2023-09-11 18:40:48 +0300 IDT    210 B           tables/animals1/type=axolotl/weight=22/name=tom/animals.csv
object          2023-09-11 18:40:50 +0300 IDT    210 B           tables/animals1/type=axolotl/weight=3/name=goget/animals.csv
object          2023-09-11 18:40:51 +0300 IDT    210 B           tables/animals1/type=bird/weight=3/name=flycard/animals.csv
object          2023-09-11 18:40:53 +0300 IDT    210 B           tables/animals1/type=cat/weight=1/name=foo/animals.csv
object          2023-09-11 18:40:54 +0300 IDT    210 B           tables/animals1/type=cat/weight=12/name=goodboi/animals.csv
object          2023-09-11 18:40:55 +0300 IDT    210 B           tables/animals1/type=cat/weight=99/name=meow/animals.csv
object          2023-09-11 18:40:57 +0300 IDT    210 B           tables/animals1/type=dog/weight=1/name=badboi/animals.csv
object          2023-09-11 18:40:58 +0300 IDT    210 B           tables/animals1/type=dog/weight=34/name=name2/animals.csv
object          2023-09-11 18:41:07 +0300 IDT    210 B           tables/animals1/type=dog/weight=43/name=snoop/animals.csv
object          2023-09-11 18:41:09 +0300 IDT    210 B           tables/animals1/type=fish/weight=12/name=bloop/animals.csv
object          2023-09-12 10:19:27 +0300 IDT    210 B           tables/animals1/type=fish/weight=3/name=avi/animals.csv
object          2023-09-12 10:19:23 +0300 IDT    210 B           tables/animals1/type=fish/weight=3/name=dan/animals.csv
object          2023-09-12 10:19:26 +0300 IDT    210 B           tables/animals1/type=fish/weight=3/name=haim/animals.csv
object          2023-09-12 10:19:25 +0300 IDT    210 B           tables/animals1/type=fish/weight=3/name=ronen/animals.csv
object          2023-09-11 18:41:10 +0300 IDT    210 B           tables/animals1/type=fish/weight=3/name=water/animals.csv
  1. Table definition: add table spec to _lakefs_tables/hive.yaml - The table records under the prefix tables/animals1:
name: animals
type: hive
path: tables/animals1
partition_columns: ['type', 'weight']
schema:
  type: struct
  fields:
    - name: weight
      type: integer
      nullable: false
      metadata: {}
    - name: name
      type: string
      nullable: false
      metadata: {}
    - name: type
      type: string
      nullable: true
      metadata:
        comment: axolotl, cat, dog, fish etc  
  1. Add a our lua script (This is the interesting part) scripts/table_extractor.lua:
-- local yaml = require("encoding/yaml")
-- local strings = require("strings")
local extractor = require("lakefs/catalogexport/table_extractor")
local hive = require("lakefs/catalogexport/hive")
local lakefs_client = require("lakefs")
local utils = require("lakefs/catalogexport/internal")

-- config 
local repo_id = action.repository_id
local commit_id = action.commit_id
local ref = utils.ref_from_branch_or_tag(action)


function test_table_extractor() 
    local files = extractor.list_table_descriptor_entries(lakefs_client, repo_id, commit_id)
    for _, file in ipairs(files) do 
        local descriptor = extractor.get_table_descriptor(lakefs_client, repo_id, commit_id, file.path)
        local base_path = descriptor.path 
        local pager = hive.extract_partition_pager(lakefs_client, repo_id, commit_id, base_path, descriptor.partition_columns)
        local sha = utils.short_digest(commit_id)
        print(string.format('%s table representation for `lakefs://%s/%s/%s` digest: %s',descriptor.type,  repo_id, ref,descriptor.path, sha))
        for part_key, entries in pager do
            print("Partition Result: " .. part_key .. " #: " .. tostring(#entries))
            for _, entry in ipairs(entries) do
                print(" > path: " .. entry.path .. " physical: " .. entry.physical_address)
            end
        end        
    end
end 

test_table_extractor()

  1. Finally: Invoke the script with a hook (add _lakefs_hooks/table_extractor.yaml):
name: table extractor
on:
  post-create-branch:
    branches: ["main*"]
  post-commit:
    branches: ["main*"]
hooks:
  - id: table_extractor
    type: lua
    properties:
      script_path: scripts/table_extractor.lua
  1. How it looks:

image

@Isan-Rivkin Isan-Rivkin self-assigned this Sep 12, 2023
@Isan-Rivkin Isan-Rivkin added the include-changelog PR description should be included in next release changelog label Sep 13, 2023
@Isan-Rivkin
Copy link
Contributor Author

@nopcoder Merged and updated your PR 👍

Copy link
Contributor

@nopcoder nopcoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review - common and lib.

)

//go:embed *.lua
var modulePath embed.FS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we generalize this one and have fs for all our lua files. Adding modulePath to the lua loader search path will load our code too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by you 💪

Comment on lines 21 to 22
SHORT_DIGEST_LEN=6,
LAKEFS_DEFAULT_PAGE_SIZE=30,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • short digest looks like an helper function
  • default page size - prefer to have default per type of request and not a global one. also not sure catalog export is the place for them. You can have DEFAULT_EXPORT_PAGE_SIZE and DEFAULT_HIVE_PAGE_SIZE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed both!

tag_events = { ["pre-create-tag"] = true, ["post-create-tag"] = true }
branch_events = { ["pre-create-branch"] = true, ["post-create-branch"] = true }
commit_events = { ["post-commit"] = true, ["post-merge"] = true }
local ref
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

local common = require("lakefs/catalogexport/common")

-- resolve ref value from action.action.event_type
function ref_from_branch_or_tag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although action is global variable injected if you are building a common code in lib - which I'm not sure it is related to this package. get the table as first argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 also added locals to the dict.

local ref
if tag_events[action.event_type] then
return action.tag_id, nil
elseif branch_events[action.event_type] or commit_events[action.event_type] then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why commit gets branch_id and not commit_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because i have the commit_id, i need the reference to the HEAD of the current commit (specifically when creating symlinks ref + commit_id are used)

Comment on lines 6 to 8
tag_events = { ["pre-create-tag"] = true, ["post-create-tag"] = true }
branch_events = { ["pre-create-branch"] = true, ["post-create-branch"] = true }
commit_events = { ["post-commit"] = true, ["post-merge"] = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are using three tables for 2 items lookup - use if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

local common = require("lakefs/catalogexport/common")

-- resolve ref value from action.action.event_type
function ref_from_branch_or_tag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like that - verify there is a commit based on the event and the fact that we like to raise an error / alternative return empty ref

-- Based on the event type, resolve the ref value from the action.
function action_event_ref(action)
    local ref
    if action.event_type == "pre-create-tag" or action.event_type "post-create-tag" then
        return action.tag_id
    elseif action.event_type == "pre-create-branch" or action.event_type == "post-create-branch" then
        ref = action.branch_id
    elseif action.event_type == "post-commit" or action.event_type == "post-merge" then
        ref = action.commit_id
    else
        error("unsupported event type: " .. action.event_type)
    end
    return ref
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did something similiar see 👍

Comment on lines 1 to 17
function lakefs_object_it(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter)
local next_offset = after
local has_more = true
return function()
if not has_more then
return nil
end
local code, resp = lakefs_client.list_objects(repo_id, commit_id, next_offset, prefix, delimiter, page_size)
if code ~= 200 then
error("lakeFS: could not list objects in: " .. prefix .. ", error: " .. resp.message)
end
local objects = resp.results
has_more = resp.pagination.has_more
next_offset = resp.pagination.next_offset
return objects
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to make it iterator you need to return an object for each call to the callback.
The implementation enable pagination like iteration.
Suggest to make this function more generic lakefs_paginiated_api - callback function that accepts next_offset and the function will work with all our paginated API.
Also, I don't think this code is related to this function and should move to a different package.

function lakefs_paginiated_api(api_call, after="")
    local next_offset = after
    local has_more = true
    return function()
        if not has_more then
            return nil
        end
        local code, resp = api_call(next_offset)
        if code < 200 or code >= 300 then
            error("lakeFS: api return non-2xx" .. code)
        end
        has_more = resp.pagination.has_more
        next_offset = resp.pagination.next_offset
        return resp.results
    end
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 👍


function TableExtractor.new(repository_id, ref, commit_id)
local self = setmetatable({}, TableExtractor)
self.tables_registry_base = pathlib.join(pathlib.default_separator(), '_lakefs_tables/')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this call provides?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

self._iter_page_size, "")
for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. is_hidden should accept prefix
  2. check hidden is result of checking prefix after parse as we do parse the path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the code now a bit to make sure the rest of the path is considered.
Trying to make it work with only is_hidden params (suffix, separator) makes it look very confusing and convoluted.

Comment on lines 151 to 152
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
is_yaml = strings.has_suffix(entry.path, ".yaml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use local for temporary variables
  2. suggest: we are not doing lazy eval - if a complex if is hard to read we can extract the check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 👍

end

-- list all YAML files in _lakefs_tables
function TableExtractor:list_table_definitions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module should extract this function - I don't see any reason to have this one as an object and initialize it with repo, commit, ref while for listing I don't use ref and for get table in specific case I use it.

Should be enable to get a list of tables from the function.
About get_table - it does more than get it uses the ref we provide to HiveTable - so I think getting a table is just parse yaml object, the rest is related to hive and should be handle by hive function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed everything :)

end

-- Define methods
function HiveTable:name()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see a reason to generate a class for hive table - we can just return a table with this information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Comment on lines 8 to 33
-- return partition table from path by based on columns in partition_cols
function get_partition_values(partition_cols, path)
local vals = {}
splitted_path = strings.split(path, pathlib.default_separator())
for _, part in pairs(splitted_path) do
for _, col in ipairs(partition_cols) do
local prefix = col .. "="
if strings.has_prefix(part, prefix) then
vals[col] = part:sub(#prefix + 1)
end
end
end
return vals
end

-- extract partition substr from full path
function extract_partition_prefix_from_path(partition_cols, path)
local partitions = get_partition_values(partition_cols, path)
local partitions_list = {}
-- iterate partition_cols to maintain order
for _, col in pairs(partition_cols) do
table.insert(partitions_list, col .. "=" .. partitions[col])
end
local partition_key = table.concat(partitions_list, pathlib.default_separator())
return partition_key
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got it right the extract_partition_prefix_from_path users get_parition_values and the end result is getting the part of the object path based on the paritions / columns.

I assume that the columns are ordered based on the same structure as the object path.
Also, that the number of partitions must match the path information.

Base on the above I suggest the following:

function extract_partitions_path(partitions, path)
    local idx = 0
    for _, partition in ipairs(partitions) do
        local pattern = partition .. "=[^/]*"
        local i, j = string.find(path, pattern, idx)
        if i == nil then
            return nil
        end
        idx = j + 1
    end
    return string.sub(path, 1, idx)
end

It should return the relevant part of the path based on the partitioning used to write the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better thanks! Did almost that (added comment in the code) 👍

end

-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, delimiter, partition_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is relevant to some functions here - how do we plan to support delimiter?
As I see it, we can use delimiter and pass it to list object as long as it is not the default delimiter.
When using a delimiter for listing we will not get recursive listing under prefix - it means that we will not get any partition information.

Please verify that we need recursive listing, and if this is the case I prefer not to pass any delimiter to these APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the support for delimiter, we only need recursive.

for entries in iter do
for _, entry in ipairs(entries) do
is_hidden = pathlib.is_hidden(pathlib.parse(entry.path).base_name)
if not is_hidden and entry.path_type == "object" then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of recursive listing, all are objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed 👍

if not has_more then
return nil
end
local partition_entries = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_entries should be the same level as target_partition? or it means that the caller will have to handle multiple calls that will have the same target.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be like this, not sure what you mean? From testing it looks fine.

-- break if current entry does not belong to the target_partition
if partition_key ~= target_partition then
-- next time start searching AFTER the last used key
after = partition_entries[#partition_entries].path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are not using yield and trust our after it can have very bad impact on our users as we may do call per record where each call we pull a page of records and continue to request per record in case of unique values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed but, when we need we can add coroutine (yield) into the runtime and improve this, for now I think that's the tradeoff of paging over partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the issue is not with paging is multiple listing over lakefs because of partition.

@Isan-Rivkin
Copy link
Contributor Author

Isan-Rivkin commented Sep 14, 2023

@nopcoder I addressed all of the comments, including refactoring OOP table extractor!
see "internal" usage in the description.
please re-review.

@nopcoder
Copy link
Contributor

@nopcoder I addressed all of the comments, including refactoring OOP table extractor! see "internal" usage in the description. please re-review.

answered some comments - review now

Copy link
Contributor

@nopcoder nopcoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added my comments in the body.
About the layout I think it will be more useful to expose the capability per file. For example if we like to have a class for HiveTableExporter - we will need to require a the file and just use it as a class.
The implementation will return the class and when we require we use the variable and call new for new instances.

Comment on lines 10 to 12
if entry == nil or entry.path_type ~= "object" then
return false
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need to check for api bugs in this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, removed the nil check, kept the object since an entry can be other things as well.

return false
end
-- remove _lakefs_tables/ from path
local suffix = entry.path:sub(#tables_base, #entry.path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think suffix reflects what we extract there - we got the path under lakefs tables.
if this function is for general use - we need to match the prefix before we remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added checked for prefix match and changed a bit.

Comment on lines 15 to 17
local is_hidden = pathlib.is_hidden(suffix)
local is_yaml = strings.has_suffix(entry.path, ".yaml")
return not is_hidden and is_yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would return early as we extracted the check to a function to enable that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


local LAKEFS_TABLES_BASE = "_lakefs_tables/"

function _is_table_obj(entry, tables_base)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why _ in the func prefix?

Copy link
Contributor Author

@Isan-Rivkin Isan-Rivkin Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common convention for private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are not exporting it - it is private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 38 to 46
-- table as parsed YAML object
function get_table_descriptor(client, repo_id, commit_id, logical_path)
code, content = client.get_object(repo_id, commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end
descriptor = yaml.unmarshal(content)
return descriptor
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need this function get+unmarshal for specific api

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convenience

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are bloating the code before we have a use-case.
Lets agree that if we are not going to use it more than once we should remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, we are going to use it more than once, next PR's will have 1+ exporters calling get_table_descriptor and potentially more than once depends on table pattern.
For example if we have Symlink exporter with 3 table's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand the internal use - but I am not sure why we need to expose it.
We can expose an helper function that will verify 2xx and from here you can compose any api wrapper that will verify and unmarshal as needed.

Comment on lines 10 to 16
local re = regexp.compile(pattern)
local match = re.find(path, pattern)
if match == "" then
return nil
end
-- expanding the pattern to a match regex because string.find() does not implement pattern matching https://github.com/Shopify/go-lua/blob/main/string.go#L37
local i, j = string.find(path, match, idx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the case and we require to compile the pattern - I suggest to update the implementation and match the partition + "=" and lookup for "/" after a successful match.
It will be much better than compile expression for each extract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed that to call the compile once only.

@Isan-Rivkin
Copy link
Contributor Author

@nopcoder please re-review, updated based on comments + different implementation for partition pager.

end
end

function lakefs_object_pager(lakefs_client, repo_id, commit_id, after, prefix, page_size, delimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lakefs_paginiated_api is still not exported

-- remove entry only if its part of the current partition
table.remove(page, 1)
end
until not true -- check if has while True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer while true do ... end it will be more readable top-down as endless loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 97 to 100
return {
HiveTableExtractor = HiveTableExtractor,
lakefs_hive_partition_it = lakefs_hive_partition_it
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Think we need to return HiveTableExtractor here
  2. lakefs_hive_partition_it is still relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's relevant, in general for iterating hive-like partitions but not in HiveTable context. There are other compatible tables.
Exporters will be able to use this for Hive Partition base table but extend to different structure.
In GlueExporter we will want to iterate hive like partitions but will have different table spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. how is this one is used?
  2. why it is part of lib? or under catalog export

Comment on lines 38 to 46
-- table as parsed YAML object
function get_table_descriptor(client, repo_id, commit_id, logical_path)
code, content = client.get_object(repo_id, commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code))
end
descriptor = yaml.unmarshal(content)
return descriptor
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand the internal use - but I am not sure why we need to expose it.
We can expose an helper function that will verify 2xx and from here you can compose any api wrapper that will verify and unmarshal as needed.

return {
list_table_descriptor_files = list_table_descriptor_files,
get_table_descriptor = get_table_descriptor,
HiveTableExtractor = hive.HiveTableExtractor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already expose hive

@Isan-Rivkin
Copy link
Contributor Author

@nopcoder updated, see usage ref example (by internal code)

local extractor = require("lakefs/catalogexport/table_extractor")
local hive = require("lakefs/catalogexport/hive")
local lakefs_client = require("lakefs")
local utils = require("lakefs/catalogexport/internal/utils")

-- config 
local repo_id = action.repository_id
local commit_id = action.commit_id
local ref = utils.ref_from_branch_or_tag(action)


function test_table_extractor() 
    local files = extractor.list_table_descriptor_files(lakefs_client, repo_id, commit_id)
    for _, file in ipairs(files) do 
        local descriptor = extractor.get_table_descriptor(lakefs_client, repo_id, commit_id, file.path)
        local base_path = descriptor.path 
        local page_size = hive.TableExtractor.DEFAULT_PAGE_SIZE_PARTITION
        local pager = hive.TableExtractor.lakefs_hive_partition_pager(lakefs_client, repo_id, commit_id, base_path, page_size, descriptor.partition_columns)
        local sha = utils.short_digest(commit_id, utils.DEFAULT_SHORT_DIGEST_LEN)
        print(string.format('%s table representation for `lakefs://%s/%s/%s` digest: %s',descriptor.type,  repo_id, ref,descriptor.path, sha))
        for part_key, entries in pager do
            print("Partition Result: " .. part_key .. " #: " .. tostring(#entries))
            for _, entry in ipairs(entries) do
                print(" > path: " .. entry.path .. " physical: " .. entry.physical_address)
            end
        end        
    end
end 

test_table_extractor()


-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols)
local prefix = base_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you call it prefix - change the argument name and not reassign

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you push the fix? there is still base_path and prefix

end

-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. reorder the arguments to enable default values - page_size for example, if we do not pass it to the call we should use the default
  2. think we can drop the lakefs_ prefix or just keep partition_pager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. its already re-ordered, you're not rebased
  2. i prefer to keep the prefix explicit since there are going to be iterations around tables and objects in the export destination as well.


return {
TableExtractor = {
DEFAULT_PAGE_SIZE_PARTITION = 30,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. no need to expose this one - we should use it in case we do not provide page_size for the call
  2. we can just expose the pager or short version of the function:
local hive = require('lakefs/catalogexport/hive')
hive.partition_pager(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't exist anymore rebase

end
local code, resp = api_call(next_offset)
if code < 200 or code >= 300 then
error("lakeFS: api return non-2xx" .. code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tostring the code (my bug)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug is still there

end

return {
DEFAULT_SHORT_DIGEST_LEN=DEFAULT_SHORT_DIGEST_LEN,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to expose this one - you use it internally

end

-- list all YAML files under _lakefs_tables/*
function list_table_descriptor_files(client, repo_id, commit_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe entries instead of files as we return an entry with logical and physical location

Copy link
Contributor

@nopcoder nopcoder Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not plan to add more internal stuff - I think temporary internal.lua under catalogexport is enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file is under internal path - do we plan to keep internal folder?

pkg/actions/lua/lakefs/catalogexport/hive.lua Outdated Show resolved Hide resolved
pkg/actions/lua/lakefs/catalogexport/hive.lua Outdated Show resolved Hide resolved
pkg/actions/lua/lakefs/catalogexport/hive.lua Show resolved Hide resolved
pkg/actions/lua/lakefs/catalogexport/hive.lua Outdated Show resolved Hide resolved

-- Hive format partition iterator each result set is a collection of files under the same partition
function lakefs_hive_partition_pager(client, repo_id, commit_id, base_path, page_size, partition_cols)
local prefix = base_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you push the fix? there is still base_path and prefix

end

return {
TableExtractor = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the package level - why do we need TableExtractor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to group it under extractor because there is also export functionality that is right now missing but will be added there and with that more functionality related to extraction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file is under internal path - do we plan to keep internal folder?

end
local code, resp = api_call(next_offset)
if code < 200 or code >= 300 then
error("lakeFS: api return non-2xx" .. code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug is still there

pkg/actions/lua/lakefs/catalogexport/hive.lua Outdated Show resolved Hide resolved
return nil
end
local partition_entries = {}
while(true) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while(true) do
while true do

@Isan-Rivkin Isan-Rivkin enabled auto-merge (squash) September 19, 2023 13:58
@Isan-Rivkin Isan-Rivkin merged commit e036ddf into master Sep 19, 2023
35 checks passed
@Isan-Rivkin Isan-Rivkin deleted the 6573-table-extractor branch September 19, 2023 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
include-changelog PR description should be included in next release changelog
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Export Hooks: Table extractor
2 participants