-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[luigi.contrib.hive] WarehouseHiveClient #2826
Conversation
@honnix @Tarrasch @dlstadther could you please review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense. I've never used this part of luigi and i don't use a hive warehouse.
Could you ping the other contrib/hive contributors for review?
luigi/contrib/hive.py
Outdated
if ( | ||
partition | ||
and len(partition) > 1 | ||
and not isinstance(partition, collections.OrderedDict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this handle the fact that python 3.7+ dictionaries guarantee insertion order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
I should treat that case separately
sth like
def _is_ordered_dict(dikt):
import sys
if sys.version_info >= (3, 7):
return isinstance(dikt, dict)
return isinstance(dikt, collections.OrderedDict)
def _validate_partition(partition):
"""
If partition is set and its size is more than one and not ordered,
then we're unable to restore its path in the warehouse
"""
if (
partition
and len(partition) > 1
and _is_ordered_dict(partition)
):
raise ValueError('Unable to restore table/partition location')
Agree?
@dlstadther
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think collections.OrderedDict still exists in py3.7 so we'd still want to support that. If we want to allow both, i'd think check for ordereddict first then elif 3.7 and dict. else fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlstadther
I added handling of dicts in py3.7+ and modified a unit-test test_table_exists_ambiguous_partition
for that
Could you please have a look at that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check if order exists when we require one or less elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stroykova this code in comments is sketch, have a look at "Files changed" tab, please
@rizzatti |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello!
I did not use hive warehouse. So I can not check warehouse specific features.
I used HiveClient, and this PR looks good implementing one more hive feature.
Added two small suggestions. Up to you to decide are they important in hive warehouse context.
return '/'.join([ | ||
'{}={}'.format(k, v) for (k, v) in six.iteritems(partition or {}) | ||
]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is good to use inheritance here. Code is very similar to parent class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for suggestion
In parent class this method is declared abstract
I agreee that this code looks very similar to other inheritors of base class
luigi/contrib/hive.py
Outdated
ignored_files = get_ignored_file_masks() | ||
filenames = self.hdfs_client.listdir(path) | ||
if ignored_files is None: | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to check ignored_files before calling listdir? It will be a little faster.
I am not familiar with hive warehouse, so I do not know is it needed to call listdir for existence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely agree
@stroykova thank you for review so much |
done |
Description
I've added WarehouseHiveClient which allows to determine if table\partition exists when it's files exist on hdfs
Motivation and Context
Sometimes it's not enough to have an entry in hive metastore that table\partition exists.
In particular cases we want to be sure if files for that table/partition actually exists
Using of this client is suitable if you want to be sure that your task doesn't write zero rows.
It can be suitable for tasks like "insert overwrite table (...) partition (...)"
Have you tested this? If so, how?
I've provided several unit tests for that