Skip to content

Conversation

@tejasapatil
Copy link
Contributor

@tejasapatil tejasapatil commented Sep 24, 2016

What changes were proposed in this pull request?

This PR depends on #15300 and includes following changes to have better planning for Hive bucketed tables:

  • HiveTableScanExec now exposes outputPartitioning and outputOrdering as per bucketing spec.
  • InsertIntoHiveTable now exposes requiredChildDistribution and requiredChildOrdering based on the target table's bucketing spec.

Despite of this PR, Spark still won't produce bucketed data as per Hive's bucketing guarantees, but will allow writes IFF user wishes to do so without caring about bucketing guarantees. I am incrementally working on closing the gaps to have complete Hive bucketing support in Spark but those will be separate PRs (eg. PR to add Hive's hashing function #15047)

How was this patch tested?

This PR depends on #15300 to let Spark create hive bucketed tables. Once that gets in, I can write unit tests for this PR.

@SparkQA
Copy link

SparkQA commented Sep 24, 2016

Test build #65862 has finished for PR 15229 at commit 8726cc6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil tejasapatil changed the title [SPARK-17654] [SQL] Propagate bucketing information for Hive tables to / from Catalog [WIP] [SPARK-17654] [SQL] Propagate bucketing information for Hive tables to / from Catalog Sep 29, 2016
@tejasapatil tejasapatil force-pushed the SPARK-17654_hive_extract_bucketing branch from 8726cc6 to db82040 Compare September 29, 2016 18:09
@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66114 has finished for PR 15229 at commit db82040.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil tejasapatil changed the title [WIP] [SPARK-17654] [SQL] Propagate bucketing information for Hive tables to / from Catalog [SPARK-17654] [SQL] Propagate bucketing information for Hive tables to / from Catalog Sep 29, 2016
@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66128 has finished for PR 15229 at commit 23986a8.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66125 has finished for PR 15229 at commit 9b61e39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil tejasapatil changed the title [SPARK-17654] [SQL] Propagate bucketing information for Hive tables to / from Catalog [SPARK-17654] [SQL] Propagate bucketing information for Hive tables to planner Sep 29, 2016
@rxin
Copy link
Contributor

rxin commented Oct 14, 2016

@tejasapatil how does HIve store bucketing files?

@carlos-verdes
Copy link

Hi @rxin,

In Hive you have two levels, the partition and the buckets.
The partitons are translated to folders on HDFS, for example:

/apps/hive/warehouse/model_table/date=6

Where model_table is the name of the table and date is the partition.

Inside a folder you will have n files and Hive let you decide how many files you want to create (buckets) and which data you want to store within.

If you create a table like this on Hive:

create table events (
  timestamp: long,
  userId: String,
  event: String
)
partitioned by (event_date int)
clustered by (userId) sorted by (userId, timestamp) into 10 buckets;

Then when it will be only 10 files per partition and all the events for one user will be only on one partition and sorted by time.

If you insert data on this table using the next query on Hive you will see that the clustering policy is respected:

set hive.enforce.bucketing = true;  -- (Note: Not needed in Hive 2.x onward)
from  event_feed_source e
insert overwrite table events
partition (event_date = 20170307)
select e.*, 20170307   
where event_day = 20170307;

However... if you do the next insert with Spark:

sqlContext.sql("insert overwrite table events partition (event_date = 20170307) select e.*,1 from event_feed_source e")

You will see that the data is stored with the same partitioning as it is on the source dataset.

What is the benefit of respecting the Hive clustering policy?
The main benefit is to avoid shuffle and have a control on the number or partitions.

To give an example we have a pipeline that reads thousands of events per user and save them into another table (model), so it means the events table is going to have x times more data than the model table (imagine a factor of 10x).

First point is, if the source data are clustered properly we can read all the events per user without shuffle (I mean to do something like events.groupBy(user).mapValues(_.sortBy(timestamp) will be done without shuffle).

Second point is when we generate the model RDD/Dataser from the event RDD/Dataset. Spark respects the source partitioning (unless you indicate otherwise) which means... is going to save into Hive 10 times the number of files for the model as needed (not respecting the clustering policy on Hive).
This implies that we have 10x more partitions than needed and also that the queries over the model table are not "clustered"... which means full scan every time we need to do a query (a full scan over 10 times the optimal number of partitions).

I hope I clarify the point on Hive clusters ;)

@tejasapatil
Copy link
Contributor Author

@carlos-verdes : Thanks for the information. This is moved under an umbrella jira (SPARK-19256) which has a proposal : https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit

I believe all your requirements are captured in the proposal. If not, let me know. Meanwhile, I will close this PR and re-open when the right pieces are together.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants