Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented May 18, 2016

What changes were proposed in this pull request?

This PR generates Java code to store a computed float/double value of each column into ```ColumnarBatch` when DataFrame.cache() is called. This is done in whole stage code generation.

Even when data is read from ParquetReader (data is kept in ColumnarBatch), the computed value is stored into UnsafeRow for now. Then, the data is stored into CachedBatch for DataFrame.cache(). This leads to data format conversions from columnar storage to row-oriented storage and from row-oriented storage to columnar storage. This PR avoid conversions by storing the computed value into a columnar storege.

This PR handles only float and double that are stored in a column without compression. Another PR will handle other primitive types that may be stored in a column in a compressed format. This is for ease of review by reducing the size of PR

This PR will consist of three parts.

  1. Store the computed value into CachedBatch when the original value is read from ColumnarStorage.
  2. Create CachedBatch for df.cache() from the ColumnarStorage
  3. Decide whether 1. will be done or not based on the successor operation. If the successor operation is not df.cache(), 1. will not occur.

This PR generates Java code for columnar cache only if types in all columns, which are accessed in operations, are primitive

Motivating example:

sc.parallelize(1 to 8, 1).map(i => i.toFloat).toDF("f").write.parquet(parquetDir)
val parquetDF = sqlContext.read.parquet(parquetDir)
val df = parquetDF.selectExpr("f + 1").cache.show

Generated code

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Project [(f#7 + 1.0) AS (f + 1)#9]
/* 007 */ +- BatchedScan parquet [f#7] Format: ParquetFormat, InputPaths: file:/C:/Users/ishizaki/AppDa...
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 018 */   private UnsafeRow scan_result;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 021 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch project_columnarBatch;
/* 022 */   private org.apache.spark.sql.execution.vectorized.ColumnVector project_colOutInstance0;
/* 023 */
/* 024 */   public GeneratedIterator(Object[] references) {
/* 025 */     this.references = references;
/* 026 */   }
/* 027 */
/* 028 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 029 */     partitionIndex = index;
/* 030 */     scan_input = inputs[0];
/* 031 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 032 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 033 */     scan_scanTime1 = 0;
/* 034 */     scan_batch = null;
/* 035 */     scan_batchIdx = 0;
/* 036 */     scan_colInstance0 = null;
/* 037 */     scan_result = new UnsafeRow(1);
/* 038 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 0);
/* 039 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 1);
/* 040 */
/* 041 */     project_allocateColumnarStorage();
/* 042 */   }
/* 043 */
/* 044 */   private void scan_nextBatch() throws java.io.IOException {
/* 045 */     long getBatchStart = System.nanoTime();
/* 046 */     if (scan_input.hasNext()) {
/* 047 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 048 */       scan_numOutputRows.add(scan_batch.numRows());
/* 049 */       scan_batchIdx = 0;
/* 050 */       scan_colInstance0 = scan_batch.column(0);
/* 051 */
/* 052 */     }
/* 053 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 054 */   }
/* 055 */
/* 056 */   void project_allocateColumnarStorage() {
/* 057 */     org.apache.spark.sql.types.StructType project_batchSchema =
/* 058 */     new org.apache.spark.sql.types.StructType(
/* 059 */       new org.apache.spark.sql.types.StructField[] {
/* 060 */         new org.apache.spark.sql.types.StructField(
/* 061 */           "col0", org.apache.spark.sql.types.DataTypes.FloatType, true, org.apache.spark.sql.types.Metadata.empty())
/* 062 */
/* 063 */       });
/* 064 */
/* 065 */     project_columnarBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 066 */       project_batchSchema, org.apache.spark.memory.MemoryMode.ON_HEAP);
/* 067 */     registerColumnarBatch(project_columnarBatch);
/* 068 */     project_colOutInstance0 = project_columnarBatch.column(0);
/* 069 */
/* 070 */   }
/* 071 */
/* 072 */   protected void processNext() throws java.io.IOException {
/* 073 */     /*** PRODUCE: Project [(f#7 + 1.0) AS (f + 1)#9] */
/* 074 */
/* 075 */     /*** PRODUCE: BatchedScan parquet [f#7] Format: ParquetFormat, InputPaths: ... */
/* 076 */
/* 077 */     if (scan_batch == null) {
/* 078 */       isColumnarBatchAccessed = true;
/* 079 */       scan_nextBatch();
/* 080 */     }
/* 081 */     int scan_rowWriteIdx = 0;
/* 082 */     while (scan_batch != null) {
/* 083 */       int numRows = scan_batch.numRows();
/* 084 */       while (scan_batchIdx < numRows) {
/* 085 */         int scan_rowIdx = scan_batchIdx++;
/* 086 */         /*** CONSUME: Project [(f#7 + 1.0) AS (f + 1)#9] */
/* 087 */
/* 088 */         /*** CONSUME: WholeStageCodegen */
/* 089 */
/* 090 */         /* (input[0, float] + 1.0) */
/* 091 */         boolean project_isNull = true;
/* 092 */         float project_value = -1.0f;
/* 093 */         /* input[0, float] */
/* 094 */         /* columnVector[scan_colInstance0, scan_rowIdx, float] */
/* 095 */         boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 096 */         float scan_value = scan_isNull ? -1.0f : (scan_colInstance0.getFloat(scan_rowIdx));
/* 097 */         if (!scan_isNull) {
/* 098 */           project_isNull = false; // resultCode could change nullability.
/* 099 */           project_value = scan_value + 1.0f;
/* 100 */
/* 101 */         }
/* 102 */         if (project_isNull) {
/* 103 */           project_colOutInstance0.putNull(scan_rowWriteIdx);
/* 104 */         } else {
/* 105 */           System.out.println("rowIdx["+scan_rowWriteIdx+"]: v="+project_value);
/* 106 */           project_colOutInstance0.putFloat(scan_rowWriteIdx, project_value);
/* 107 */         }
/* 108 */         scan_rowWriteIdx++;
/* 109 */         project_columnarBatch.setNumRows(scan_rowWriteIdx);
/* 110 */         if (shouldStop()) return;
/* 111 */       }
/* 112 */       scan_batch = null;
/* 113 */       scan_nextBatch();
/* 114 */     }
/* 115 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 116 */     scan_scanTime1 = 0;
/* 117 */   }
/* 118 */ }

How was this patch tested?

Not tested yet

@SparkQA
Copy link

SparkQA commented May 18, 2016

Test build #58774 has finished for PR 13171 at commit b0b5950.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ColumnIterator[T] extends Iterator[T]

@SparkQA
Copy link

SparkQA commented May 18, 2016

Test build #58783 has finished for PR 13171 at commit 34151b3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

gentle ping @kiszk. Probably, this is the same instance with #12894 (comment).

@kiszk
Copy link
Member Author

kiszk commented Jun 4, 2017

@HyukjinKwon Thank you for your gentle ping.
While there is no follow-up PR for this, this is out-of-date. Let me close this for now.

@kiszk kiszk closed this Jun 4, 2017
@HyukjinKwon
Copy link
Member

Yea, I just happened to ping here and there. Thanks for bearing with my ping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants