Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented May 4, 2016

What changes were proposed in this pull request?

Waiting #11956 to be merged.

This PR generates Java code to get a value of each compressed column from CachedBatch when DataFrame.cache() is called. This is done in whole stage code generation.

When DataFrame.cache() is called, data is stored as column-oriented storage (columnar cache) in CachedBatch. This PR avoid conversion from column-oriented storage to row-oriented storage. This PR handles several primitive types (boolean/byte/short/int/long) that may be stored into a column witha compressed data format.

This PR consists of two parts.

  1. Pass data in CachedBatch to generated code by using decompress() method. CachedBatch consists of multiple ByteBuffer arrays. A ByteBuffer may have compressed data. If the array is compressed, decompress it and pass it to generated code. If the array is not compressed, it is just passed to generated code.
  2. Generate code both for row-oriented storage and column-oriented storage only if
    • InMemoryColumnarTableScan exists in a plan sub-tree. A decision is performed by checking an given iterator is ColumnaIterator at runtime
    • Sort or join does not exist in a plan sub-tree.

This PR generates Java code for columnar cache only if types in all columns, which are accessed in operations, are primitive

This PR improves performance of aggregate sum by 2.0x - 5.2x. This benchmark is available at here

Performance results:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64
Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz

Running benchmark: Int Sum with PassThrough cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Int Sum with PassThrough cache:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       450 /  459         69.9          14.3       1.0X
ColumnVector codegen                       86 /   92        363.9           2.7       5.2X

Running benchmark: Int Sum with RunLength cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Int Sum with RunLength cache:       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       475 /  499         66.3          15.1       1.0X
ColumnVector codegen                      169 /  177        185.8           5.4       2.8X

Running benchmark: Int Sum with Dictionary cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Int Sum with Dictionary cache:      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       595 /  643         52.9          18.9       1.0X
ColumnVector codegen                      297 /  306        106.1           9.4       2.0X

Running benchmark: Int Sum with Delta cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Int Sum with Delta cache:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       450 /  455         69.9          14.3       1.0X
ColumnVector codegen                      211 /  216        149.1           6.7       2.1X

Running benchmark: Long Sum with PassThrough cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Long Sum with PassThrough cache:    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       353 /  378         44.6          22.4       1.0X
ColumnVector codegen                       94 /  104        167.8           6.0       3.8X

Motivating example:

val df = sc.parallelize(0 to 9, 1).toDF()
df.cache().filter("value <= 5").show()

Generated code

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Filter (value#1 <= 5)
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private scala.collection.Iterator inputadapter_input;
/* 012 */   private org.apache.spark.sql.execution.metric.LongSQLMetric filter_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue;
/* 014 */   private UnsafeRow filter_result;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 017 */   private scala.collection.Iterator inputadapter_input1;
/* 018 */   private int columnar_batchIdx;
/* 019 */   private int columnar_numRows;
/* 020 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inputadapter_col0;
/* 021 */   private UnsafeRow inputadapter_result;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inputadapter_holder;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inputadapter_rowWriter;
/* 024 */   private org.apache.spark.sql.execution.metric.LongSQLMetric filter_numOutputRows1;
/* 025 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue1;
/* 026 */   private UnsafeRow filter_result1;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder1;
/* 028 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter1;
/* 029 */   private org.apache.spark.sql.execution.columnar.ColumnarIterator columnar_itr;
/* 030 */
/* 031 */   public GeneratedIterator(Object[] references) {
/* 032 */     this.references = references;
/* 033 */   }
/* 034 */
/* 035 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 036 */     partitionIndex = index;
/* 037 */     inputadapter_input = inputs[0];
/* 038 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[0];
/* 039 */     filter_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) filter_numOutputRows.localValue();
/* 040 */     filter_result = new UnsafeRow(1);
/* 041 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 0);
/* 042 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 1);
/* 043 */     inputadapter_input1 = inputs[0];
/* 044 */     columnar_batchIdx = 0;
/* 045 */     columnar_numRows = 0;
/* 046 */     inputadapter_col0 = null;
/* 047 */     inputadapter_result = new UnsafeRow(1);
/* 048 */     this.inputadapter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inputadapter_result, 0);
/* 049 */     this.inputadapter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inputadapter_holder, 1);
/* 050 */     this.filter_numOutputRows1 = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 051 */     filter_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) filter_numOutputRows1.localValue();
/* 052 */     filter_result1 = new UnsafeRow(1);
/* 053 */     this.filter_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result1, 0);
/* 054 */     this.filter_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder1, 1);
/* 055 */     columnar_itr = null;
/* 056 */   }
/* 057 */
/* 058 */   private void processBatch() throws java.io.IOException {
/* 059 */     /*** PRODUCE: Filter (value#1 <= 5) */
/* 060 */
/* 061 */     /*** PRODUCE: INPUT */
/* 062 */
/* 063 */     while (true) {
/* 064 */       if (columnar_batchIdx == 0) {
/* 065 */         columnar_numRows = columnar_itr.initForColumnar();
/* 066 */         if (columnar_numRows < 0) {
/* 067 */           cleanup();
/* 068 */           break;
/* 069 */         }
/* 070 */         inputadapter_col0 = columnar_itr.getColumn(0);
/* 071 */       }
/* 072 */
/* 073 */       while (columnar_batchIdx < columnar_numRows) {
/* 074 */         int inputadapter_rowIdx = columnar_batchIdx++;
/* 075 */         /*** CONSUME: Filter (value#1 <= 5) */
/* 076 */
/* 077 */         /* columnVector[inputadapter_col0, inputadapter_rowIdx, int] */
/* 078 */         int inputadapter_value1 = inputadapter_col0.getInt(inputadapter_rowIdx);
/* 079 */
/* 080 */         /* (input[0, int] <= 5) */
/* 081 */         boolean filter_value4 = false;
/* 082 */         filter_value4 = inputadapter_value1 <= 5;
/* 083 */         if (!filter_value4) continue;
/* 084 */
/* 085 */         filter_metricValue1.add(1);
/* 086 */
/* 087 */         /*** CONSUME: WholeStageCodegen */
/* 088 */
/* 089 */         filter_rowWriter1.write(0, inputadapter_value1);
/* 090 */         append(filter_result1);
/* 091 */         if (shouldStop()) return;
/* 092 */       }
/* 093 */       columnar_batchIdx = 0;
/* 094 */     }
/* 095 */   }
/* 096 */
/* 097 */   private void processRow() throws java.io.IOException {
/* 098 */     /*** PRODUCE: Filter (value#1 <= 5) */
/* 099 */
/* 100 */     /*** PRODUCE: INPUT */
/* 101 */
/* 102 */     while (inputadapter_input.hasNext()) {
/* 103 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 104 */       /*** CONSUME: Filter (value#1 <= 5) */
/* 105 */
/* 106 */       /* input[0, int] */
/* 107 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 108 */
/* 109 */       /* (input[0, int] <= 5) */
/* 110 */       boolean filter_value = false;
/* 111 */       filter_value = inputadapter_value <= 5;
/* 112 */       if (!filter_value) continue;
/* 112 */       if (!filter_value) continue;
/* 113 */
/* 114 */       filter_metricValue.add(1);
/* 115 */
/* 116 */       /*** CONSUME: WholeStageCodegen */
/* 117 */
/* 118 */       filter_rowWriter.write(0, inputadapter_value);
/* 119 */       append(filter_result);
/* 120 */       if (shouldStop()) return;
/* 121 */     }
/* 122 */   }
/* 123 */
/* 124 */   private void cleanup() {
/* 125 */     inputadapter_col0 = null;
/* 126 */
/* 127 */     columnar_itr = null;
/* 128 */   }
/* 129 */
/* 130 */   protected void processNext() throws java.io.IOException {
/* 131 */     if ((columnar_batchIdx != 0) ||
/* 132 */       (inputadapter_input1 instanceof org.apache.spark.sql.execution.columnar.ColumnarIterator &&
/* 133 */         (columnar_itr = (org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).isSupportColumnarCodeGen())) {
/* 134 */       processBatch();
/* 135 */     } else {
/* 136 */       processRow();
/* 137 */     }
/* 138 */   }
/* 139 */ }

How was this patch tested?

Tested existing test suites
added test suites for operations to dataframe generated by df.cache().

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57746 has finished for PR 12894 at commit a79ace5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public final class ByteBufferColumnVector extends ColumnVector
    • case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57745 has finished for PR 12894 at commit 1660a64.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2016

Test build #60286 has finished for PR 12894 at commit f847ef4.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@kiszk, I do understand it is painful to keep PR up-to-date but shouldn't we probably have the Jenkins pass at the last even if it has conflicts?

@kiszk
Copy link
Member Author

kiszk commented Jun 3, 2017

@HyukjinKwon Thank you for pointing out this.
This PR will be replaced with https://issues.apache.org/jira/browse/SPARK-20823

@kiszk kiszk closed this Jun 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants