Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class ParquetHiveSerDe extends AbstractSerDe implements SchemaInference {

private ObjectInspector objInspector;
private ParquetHiveRecord parquetRow;
private ObjectInspector writableObjectInspector;

public ParquetHiveSerDe() {
parquetRow = new ParquetHiveRecord();
Expand All @@ -114,6 +117,7 @@ public void initialize(Configuration configuration, Properties tableProperties,
}
}
this.objInspector = new ArrayWritableObjectInspector(completeTypeInfo, prunedTypeInfo);
this.writableObjectInspector = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(completeTypeInfo);
}

@Override
Expand Down Expand Up @@ -143,7 +147,14 @@ public Writable serialize(final Object obj, final ObjectInspector objInspector)
}

parquetRow.value = obj;
parquetRow.inspector= (StructObjectInspector)objInspector;
// The 'objInspector' coming from Operator may have different type infos than table column type infos which will lead to the issues like HIVE-26877
// so comparing the object inspector created during initialize phase of this SerDe class and the object inspector coming from Operator
// if they are different then using the object inspector created during initialize phase which is proper
if (!ObjectInspectorUtils.compareTypes(writableObjectInspector, objInspector)) {
parquetRow.inspector = (StructObjectInspector) writableObjectInspector;
} else {
parquetRow.inspector = (StructObjectInspector) objInspector;
}
Comment on lines +153 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we can always just use writableObjectInspector? And if so, then do we even need objInspector?
In the else block, can we use writableObjectInspector if its type is same as objInspector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah actually my initial commit(commit 1) has the same solution but it has some impacts(53 test cases are failed) when analyzed those failures found that if the data is coming from a TEXT format table then the ObjectInspector coming from Operator is having of type Lazy*ObjectInspector(for some types like Int, String) where as the ObjectInspector created during ParquetHiveSerDe object is of type Writable*ObjectInspector.

For example consider string data type, the ObjectInspector coming from Operator is of type LazyStringObjectInspector which maintains the corresponding primitive java object as LazyString where as WritableStringObjectInspector maintains the primitive java object as Text which results in ClassCastException while getting the actual data.

So we can not always use the ObjectInspector created during initialization phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know much about this tbh (I will have to look into this in detail), but do you think it's a better solution to somehow pass the correct objInspector to this method?
Because what it looks like is we are calling the serialize method and asking it to use a particular inspector. And then, with this patch, we may not even use that inspector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it would be better for the performance to change the initialize method so that it sets the objInspector to the one that's actually used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if statement here is bit strange cause it says that whenever there is type disagreement I will use the original inspector and when types are equal I will trust what is given to me.

The fact that all tests pass implies that most of the time (if not always) in existing tests the types are equal so we are essentially hitting the else branch.

Type inequality seems to be an outlier and maybe ctas_dec_pre_scale_issue.q is the only test that covers it. Does the proposed solution work if you add more column types in the table/queries?

CREATE TABLE table_a (cint int, cstring string, cdec decimal(12,7));
INSERT INTO table_a(100, 'Bob', 12345.6789101);
...
CREATE TABLE target AS
SELECT ta.cint, ta.cstring, ta.cdec
FROM table_a ta
...

return parquetRow;
}

Expand Down
21 changes: 21 additions & 0 deletions ql/src/test/queries/clientpositive/ctas_dec_pre_scale_issue.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE table_a (col_dec_a decimal(12,7));
CREATE TABLE table_b(col_dec_b decimal(15,5));
INSERT INTO table_a VALUES (12345.6789101);
INSERT INTO table_b VALUES (1234567891.01112);

set hive.default.fileformat=parquet;

explain create table target as
select table_a.col_dec_a target_col
from table_a
left outer join table_b on
table_a.col_dec_a = table_b.col_dec_b;

create table target as
select table_a.col_dec_a target_col
from table_a
left outer join table_b on
table_a.col_dec_a = table_b.col_dec_b;

desc target;
select * from target;
224 changes: 224 additions & 0 deletions ql/src/test/results/clientpositive/llap/ctas_dec_pre_scale_issue.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
PREHOOK: query: CREATE TABLE table_a (col_dec_a decimal(12,7))
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@table_a
POSTHOOK: query: CREATE TABLE table_a (col_dec_a decimal(12,7))
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@table_a
PREHOOK: query: CREATE TABLE table_b(col_dec_b decimal(15,5))
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@table_b
POSTHOOK: query: CREATE TABLE table_b(col_dec_b decimal(15,5))
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@table_b
PREHOOK: query: INSERT INTO table_a VALUES (12345.6789101)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@table_a
POSTHOOK: query: INSERT INTO table_a VALUES (12345.6789101)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@table_a
POSTHOOK: Lineage: table_a.col_dec_a SCRIPT []
PREHOOK: query: INSERT INTO table_b VALUES (1234567891.01112)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@table_b
POSTHOOK: query: INSERT INTO table_b VALUES (1234567891.01112)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@table_b
POSTHOOK: Lineage: table_b.col_dec_b SCRIPT []
PREHOOK: query: explain create table target as
select table_a.col_dec_a target_col
from table_a
left outer join table_b on
table_a.col_dec_a = table_b.col_dec_b
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@table_a
PREHOOK: Input: default@table_b
PREHOOK: Output: database:default
PREHOOK: Output: default@target
POSTHOOK: query: explain create table target as
select table_a.col_dec_a target_col
from table_a
left outer join table_b on
table_a.col_dec_a = table_b.col_dec_b
POSTHOOK: type: CREATETABLE_AS_SELECT
POSTHOOK: Input: default@table_a
POSTHOOK: Input: default@table_b
POSTHOOK: Output: database:default
POSTHOOK: Output: default@target
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-4 depends on stages: Stage-0, Stage-2
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: table_a
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: col_dec_a (type: decimal(12,7))
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: decimal(17,7))
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: decimal(17,7))
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Execution mode: vectorized, llap
LLAP IO: all inputs
Map 4
Map Operator Tree:
TableScan
alias: table_b
filterExpr: col_dec_b is not null (type: boolean)
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: col_dec_b is not null (type: boolean)
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: col_dec_b (type: decimal(15,5))
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: decimal(17,7))
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: decimal(17,7))
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
Reduce Operator Tree:
Merge Join Operator
condition map:
Left Outer Join 0 to 1
keys:
0 _col0 (type: decimal(17,7))
1 _col0 (type: decimal(17,7))
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
output format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
name: default.target
Select Operator
expressions: _col0 (type: decimal(12,7))
outputColumnNames: col1
Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE
Comment on lines +128 to +131
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Select Operator is interesting. Although it is not involved when writing to the table it seems to have the correct data type for the column that results from the join so if we had something similar before calling the File Output Operator we wouldn't hit the problem in the first place.

Group By Operator
aggregations: min(col1), max(col1), count(1), count(col1), compute_bit_vector_hll(col1)
minReductionHashAggr: 0.4
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: decimal(12,7)), _col1 (type: decimal(12,7)), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary)
Reducer 3
Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: 'DECIMAL' (type: string), _col0 (type: decimal(12,7)), _col1 (type: decimal(12,7)), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
Statistics: Num rows: 1 Data size: 475 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 475 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-2
Dependency Collection

Stage: Stage-4
Create Table
columns: target_col decimal(12,7)
name: default.target
input format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
output format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
serde name: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

Stage: Stage-3
Stats Work
Basic Stats Work:
Column Stats Desc:
Columns: target_col
Column Types: decimal(12,7)
Table: default.target

Stage: Stage-0
Move Operator
files:
hdfs directory: true
#### A masked pattern was here ####

PREHOOK: query: create table target as
select table_a.col_dec_a target_col
from table_a
left outer join table_b on
table_a.col_dec_a = table_b.col_dec_b
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@table_a
PREHOOK: Input: default@table_b
PREHOOK: Output: database:default
PREHOOK: Output: default@target
POSTHOOK: query: create table target as
select table_a.col_dec_a target_col
from table_a
left outer join table_b on
table_a.col_dec_a = table_b.col_dec_b
POSTHOOK: type: CREATETABLE_AS_SELECT
POSTHOOK: Input: default@table_a
POSTHOOK: Input: default@table_b
POSTHOOK: Output: database:default
POSTHOOK: Output: default@target
POSTHOOK: Lineage: target.target_col SIMPLE [(table_a)table_a.FieldSchema(name:col_dec_a, type:decimal(12,7), comment:null), ]
PREHOOK: query: desc target
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@target
POSTHOOK: query: desc target
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@target
target_col decimal(12,7)
PREHOOK: query: select * from target
PREHOOK: type: QUERY
PREHOOK: Input: default@target
#### A masked pattern was here ####
POSTHOOK: query: select * from target
POSTHOOK: type: QUERY
POSTHOOK: Input: default@target
#### A masked pattern was here ####
12345.6789101