Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jul 7, 2016

What changes were proposed in this pull request?

Waiting #11956 to be merged.

This PR generates Java code to directly get an array of each column from CachedBatch when DataFrame.cache() is called. This is done in whole stage code generation.

When DataFrame.cache()is called, data is stored as column-oriented storage (columnar cache) inCachedBatch`. This PR avoid conversion from column-oriented storage to row-oriented storage. This PR handles an array type that is stored into a column.

This PR generates code both for row-oriented storage and column-oriented storage only if

  • InMemoryColumnarTableScan exists in a plan sub-tree. A decision is performed by checking an given iterator is ColumnaIterator at runtime
  • Sort or join does not exist in a plan sub-tree.

This PR generates Java code for columnar cache only if types in all columns, which are accessed in operations, are primitive or an array

I will add benchmark suites into here

Motivating example

val df = sc.parallelize(Seq(Array(1.0, 2.0), Array(3.0, 4.0)), 1).toDF
df.cache.filter("value[0] > 2.0").show

Generated code
Before applying this PR

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * Filter (value#1[0] > 2.0)
 * +- InMemoryTableScan [value#1], [(value#1[0] > 2.0)]
 *    :  +- InMemoryRelation [value#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
 *    :     :  +- Scan ExistingRDD[value#1]
 */
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 010 */   private UnsafeRow filter_result;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter filter_arrayWriter;
/* 014 */
/* 015 */   public GeneratedIterator(Object[] references) {
/* 016 */     this.references = references;
/* 017 */   }
/* 018 */
/* 019 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 020 */     partitionIndex = index;
/* 021 */     inputadapter_input = inputs[0];
/* 022 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 023 */     filter_result = new UnsafeRow(1);
/* 024 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 32);
/* 025 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 1);
/* 026 */     this.filter_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 027 */   }
/* 028 */
/* 029 */   protected void processNext() throws java.io.IOException {
/* 030 */     // PRODUCE: Filter (value#1[0] > 2.0)
/* 031 */     // PRODUCE: InputAdapter
/* 032 */     while (inputadapter_input.hasNext()) {
/* 033 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 034 */       // CONSUME: Filter (value#1[0] > 2.0)
/* 035 */       // input[0, array<double>, true]
/* 036 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 037 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 038 */
/* 039 */       // (input[0, array<double>, true][0] > 2.0)
/* 040 */       boolean filter_isNull = true;
/* 041 */       boolean filter_value = false;
/* 042 */       // input[0, array<double>, true][0]
/* 043 */       boolean filter_isNull1 = true;
/* 044 */       double filter_value1 = -1.0;
/* 045 */
/* 046 */       if (!inputadapter_isNull) {
/* 047 */         filter_isNull1 = false; // resultCode could change nullability.
/* 048 */
/* 049 */         final int filter_index = (int) 0;
/* 050 */         if (filter_index >= inputadapter_value.numElements() || filter_index < 0 || inputadapter_value.isNullAt(filter_index)) {
/* 051 */           filter_isNull1 = true;
/* 052 */         } else {
/* 053 */           filter_value1 = inputadapter_value.getDouble(filter_index);
/* 054 */         }
/* 055 */
/* 056 */       }
/* 057 */       if (!filter_isNull1) {
/* 058 */         filter_isNull = false; // resultCode could change nullability.
/* 059 */         filter_value = org.apache.spark.util.Utils.nanSafeCompareDoubles(filter_value1, 2.0D) > 0;
/* 060 */
/* 061 */       }
/* 062 */       if (filter_isNull || !filter_value) continue;
/* 063 */
/* 064 */       filter_numOutputRows.add(1);
/* 065 */
/* 066 */       // CONSUME: WholeStageCodegen
/* 067 */       filter_holder.reset();
/* 068 */
/* 069 */       filter_rowWriter.zeroOutNullBytes();
/* 070 */
/* 071 */       if (inputadapter_isNull) {
/* 072 */         filter_rowWriter.setNullAt(0);
/* 073 */       } else {
/* 074 */         // Remember the current cursor so that we can calculate how many bytes are
/* 075 */         // written later.
/* 076 */         final int filter_tmpCursor = filter_holder.cursor;
/* 077 */
/* 078 */         if (inputadapter_value instanceof UnsafeArrayData) {
/* 079 */           final int filter_sizeInBytes = ((UnsafeArrayData) inputadapter_value).getSizeInBytes();
/* 080 */           // grow the global buffer before writing data.
/* 081 */           filter_holder.grow(filter_sizeInBytes);
/* 082 */           ((UnsafeArrayData) inputadapter_value).writeToMemory(filter_holder.buffer, filter_holder.cursor);
/* 083 */           filter_holder.cursor += filter_sizeInBytes;
/* 084 */
/* 085 */         } else {
/* 086 */           final int filter_numElements = inputadapter_value.numElements();
/* 087 */           filter_arrayWriter.initialize(filter_holder, filter_numElements, 8);
/* 088 */
/* 089 */           for (int filter_index1 = 0; filter_index1 < filter_numElements; filter_index1++) {
/* 090 */             if (inputadapter_value.isNullAt(filter_index1)) {
/* 091 */               filter_arrayWriter.setNullAt(filter_index1);
/* 092 */             } else {
/* 093 */               final double filter_element = inputadapter_value.getDouble(filter_index1);
/* 094 */               filter_arrayWriter.write(filter_index1, filter_element);
/* 095 */             }
/* 096 */           }
/* 097 */         }
/* 098 */
/* 099 */         filter_rowWriter.setOffsetAndSize(0, filter_tmpCursor, filter_holder.cursor - filter_tmpCursor);
/* 100 */         filter_rowWriter.alignToWords(filter_holder.cursor - filter_tmpCursor);
/* 101 */       }
/* 102 */       filter_result.setTotalSize(filter_holder.totalSize());
/* 103 */       append(filter_result);
/* 104 */       if (shouldStop()) return;
/* 105 */     }
/* 106 */   }
/* 107 */
/* 108 */ }

After applying this PR

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * Filter (value#1[0] > 2.0)
 * +- InMemoryTableScan [value#1], [(value#1[0] > 2.0)]
 *    :  +- InMemoryRelation [value#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
 *    :     :  +- Scan ExistingRDD[value#1]
 */
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 010 */   private UnsafeRow filter_result;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter filter_arrayWriter;
/* 014 */   private scala.collection.Iterator inputadapter_input1;
/* 015 */   private int columnar_batchIdx;
/* 016 */   private int columnar_numRows;
/* 017 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inputadapter_col0;
/* 018 */   private UnsafeRow inputadapter_result;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inputadapter_holder;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inputadapter_rowWriter;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter inputadapter_arrayWriter;
/* 022 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows1;
/* 023 */   private UnsafeRow filter_result1;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder1;
/* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter1;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter filter_arrayWriter1;
/* 027 */   private org.apache.spark.sql.execution.columnar.ColumnarIterator columnar_itr;
/* 028 */
/* 029 */   public GeneratedIterator(Object[] references) {
/* 030 */     this.references = references;
/* 031 */   }
/* 032 */
/* 033 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 034 */     partitionIndex = index;
/* 035 */     inputadapter_input = inputs[0];
/* 036 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 037 */     filter_result = new UnsafeRow(1);
/* 038 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 32);
/* 039 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 1);
/* 040 */     this.filter_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 041 */     inputadapter_input1 = inputs[0];
/* 042 */     columnar_batchIdx = 0;
/* 043 */     columnar_numRows = 0;
/* 044 */     inputadapter_col0 = null;
/* 045 */     inputadapter_result = new UnsafeRow(1);
/* 046 */     this.inputadapter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inputadapter_result, 32);
/* 047 */     this.inputadapter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inputadapter_holder, 1);
/* 048 */     this.inputadapter_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 049 */     this.filter_numOutputRows1 = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 050 */     filter_result1 = new UnsafeRow(1);
/* 051 */     this.filter_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result1, 32);
/* 052 */     this.filter_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder1, 1);
/* 053 */     this.filter_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 054 */     columnar_itr = null;
/* 055 */   }
/* 056 */
/* 057 */   private void processBatch() throws java.io.IOException {
/* 058 */     // PRODUCE: Filter (value#1[0] > 2.0)
/* 059 */     // PRODUCE: InputAdapter
/* 060 */     while (true) {
/* 061 */       if (columnar_batchIdx == 0) {
/* 062 */         columnar_numRows = columnar_itr.initForColumnar();
/* 063 */         if (columnar_numRows < 0) {
/* 064 */           cleanup();
/* 065 */           break;
/* 066 */         }
/* 067 */         inputadapter_col0 = columnar_itr.getColumn(0);
/* 068 */       }
/* 069 */
/* 070 */       while (columnar_batchIdx < columnar_numRows) {
/* 071 */         int inputadapter_rowIdx = columnar_batchIdx++;
/* 072 */         // CONSUME: Filter (value#1[0] > 2.0)
/* 073 */         // columnVector[inputadapter_col0, inputadapter_rowIdx, array<double>]
/* 074 */         boolean inputadapter_isNull1 = inputadapter_col0.isNullAt(inputadapter_rowIdx);
/* 075 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_col0.getArray(inputadapter_rowIdx));
/* 076 */
/* 077 */         // (input[0, array<double>, true][0] > 2.0)
/* 078 */         boolean filter_isNull6 = true;
/* 079 */         boolean filter_value6 = false;
/* 080 */         // input[0, array<double>, true][0]
/* 081 */         boolean filter_isNull7 = true;
/* 082 */         double filter_value7 = -1.0;
/* 083 */
/* 084 */         if (!inputadapter_isNull1) {
/* 085 */           filter_isNull7 = false; // resultCode could change nullability.
/* 086 */
/* 087 */           final int filter_index2 = (int) 0;
/* 088 */           if (filter_index2 >= inputadapter_value1.numElements() || filter_index2 < 0 || inputadapter_value1.isNullAt(filter_index2)) {
/* 089 */             filter_isNull7 = true;
/* 090 */           } else {
/* 091 */             filter_value7 = inputadapter_value1.getDouble(filter_index2);
/* 092 */           }
/* 093 */
/* 094 */         }
/* 095 */         if (!filter_isNull7) {
/* 096 */           filter_isNull6 = false; // resultCode could change nullability.
/* 097 */           filter_value6 = org.apache.spark.util.Utils.nanSafeCompareDoubles(filter_value7, 2.0D) > 0;
/* 098 */
/* 099 */         }
/* 100 */         if (filter_isNull6 || !filter_value6) continue;
/* 101 */
/* 102 */         filter_numOutputRows1.add(1);
/* 103 */
/* 104 */         // CONSUME: WholeStageCodegen
/* 105 */         filter_holder1.reset();
/* 106 */
/* 107 */         filter_rowWriter1.zeroOutNullBytes();
/* 108 */
/* 109 */         if (inputadapter_isNull1) {
/* 110 */           filter_rowWriter1.setNullAt(0);
/* 111 */         } else {
/* 112 */           // Remember the current cursor so that we can calculate how many bytes are
/* 113 */           // written later.
/* 114 */           final int filter_tmpCursor1 = filter_holder1.cursor;
/* 115 */
/* 116 */           if (inputadapter_value1 instanceof UnsafeArrayData) {
/* 117 */             final int filter_sizeInBytes1 = ((UnsafeArrayData) inputadapter_value1).getSizeInBytes();
/* 118 */             // grow the global buffer before writing data.
/* 119 */             filter_holder1.grow(filter_sizeInBytes1);
/* 120 */             ((UnsafeArrayData) inputadapter_value1).writeToMemory(filter_holder1.buffer, filter_holder1.cursor);
/* 121 */             filter_holder1.cursor += filter_sizeInBytes1;
/* 122 */
/* 123 */           } else {
/* 124 */             final int filter_numElements1 = inputadapter_value1.numElements();
/* 125 */             filter_arrayWriter1.initialize(filter_holder1, filter_numElements1, 8);
/* 126 */
/* 127 */             for (int filter_index3 = 0; filter_index3 < filter_numElements1; filter_index3++) {
/* 128 */               if (inputadapter_value1.isNullAt(filter_index3)) {
/* 129 */                 filter_arrayWriter1.setNullAt(filter_index3);
/* 130 */               } else {
/* 131 */                 final double filter_element1 = inputadapter_value1.getDouble(filter_index3);
/* 132 */                 filter_arrayWriter1.write(filter_index3, filter_element1);
/* 133 */               }
/* 134 */             }
/* 135 */           }
/* 136 */
/* 137 */           filter_rowWriter1.setOffsetAndSize(0, filter_tmpCursor1, filter_holder1.cursor - filter_tmpCursor1);
/* 138 */           filter_rowWriter1.alignToWords(filter_holder1.cursor - filter_tmpCursor1);
/* 139 */         }
/* 140 */         filter_result1.setTotalSize(filter_holder1.totalSize());
/* 141 */         append(filter_result1);
/* 142 */         if (shouldStop()) return;
/* 143 */       }
/* 144 */       columnar_batchIdx = 0;
/* 145 */     }
/* 146 */   }
/* 147 */
/* 148 */   private void processRow() throws java.io.IOException {
/* 149 */     // PRODUCE: Filter (value#1[0] > 2.0)
/* 150 */     // PRODUCE: InputAdapter
/* 151 */     while (inputadapter_input.hasNext()) {
/* 152 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 153 */       // CONSUME: Filter (value#1[0] > 2.0)
/* 154 */       // input[0, array<double>, true]
/* 155 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 156 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 157 */
/* 158 */       // (input[0, array<double>, true][0] > 2.0)
/* 159 */       boolean filter_isNull = true;
/* 160 */       boolean filter_value = false;
/* 161 */       // input[0, array<double>, true][0]
/* 162 */       boolean filter_isNull1 = true;
/* 163 */       double filter_value1 = -1.0;
/* 164 */
/* 165 */       if (!inputadapter_isNull) {
/* 166 */         filter_isNull1 = false; // resultCode could change nullability.
/* 167 */
/* 168 */         final int filter_index = (int) 0;
/* 169 */         if (filter_index >= inputadapter_value.numElements() || filter_index < 0 || inputadapter_value.isNullAt(filter_index)) {
/* 170 */           filter_isNull1 = true;
/* 171 */         } else {
/* 172 */           filter_value1 = inputadapter_value.getDouble(filter_index);
/* 173 */         }
/* 174 */
/* 175 */       }
/* 176 */       if (!filter_isNull1) {
/* 177 */         filter_isNull = false; // resultCode could change nullability.
/* 178 */         filter_value = org.apache.spark.util.Utils.nanSafeCompareDoubles(filter_value1, 2.0D) > 0;
/* 179 */
/* 180 */       }
/* 181 */       if (filter_isNull || !filter_value) continue;
/* 182 */
/* 183 */       filter_numOutputRows.add(1);
/* 184 */
/* 185 */       // CONSUME: WholeStageCodegen
/* 186 */       filter_holder.reset();
/* 187 */
/* 188 */       filter_rowWriter.zeroOutNullBytes();
/* 189 */
/* 190 */       if (inputadapter_isNull) {
/* 191 */         filter_rowWriter.setNullAt(0);
/* 192 */       } else {
/* 193 */         // Remember the current cursor so that we can calculate how many bytes are
/* 194 */         // written later.
/* 195 */         final int filter_tmpCursor = filter_holder.cursor;
/* 196 */
/* 197 */         if (inputadapter_value instanceof UnsafeArrayData) {
/* 198 */           final int filter_sizeInBytes = ((UnsafeArrayData) inputadapter_value).getSizeInBytes();
/* 199 */           // grow the global buffer before writing data.
/* 200 */           filter_holder.grow(filter_sizeInBytes);
/* 201 */           ((UnsafeArrayData) inputadapter_value).writeToMemory(filter_holder.buffer, filter_holder.cursor);
/* 202 */           filter_holder.cursor += filter_sizeInBytes;
/* 203 */
/* 204 */         } else {
/* 205 */           final int filter_numElements = inputadapter_value.numElements();
/* 206 */           filter_arrayWriter.initialize(filter_holder, filter_numElements, 8);
/* 207 */
/* 208 */           for (int filter_index1 = 0; filter_index1 < filter_numElements; filter_index1++) {
/* 209 */             if (inputadapter_value.isNullAt(filter_index1)) {
/* 210 */               filter_arrayWriter.setNullAt(filter_index1);
/* 211 */             } else {
/* 212 */               final double filter_element = inputadapter_value.getDouble(filter_index1);
/* 213 */               filter_arrayWriter.write(filter_index1, filter_element);
/* 214 */             }
/* 215 */           }
/* 216 */         }
/* 217 */
/* 218 */         filter_rowWriter.setOffsetAndSize(0, filter_tmpCursor, filter_holder.cursor - filter_tmpCursor);
/* 219 */         filter_rowWriter.alignToWords(filter_holder.cursor - filter_tmpCursor);
/* 220 */       }
/* 221 */       filter_result.setTotalSize(filter_holder.totalSize());
/* 222 */       append(filter_result);
/* 223 */       if (shouldStop()) return;
/* 224 */     }
/* 225 */   }
/* 226 */
/* 227 */   private void cleanup() {
/* 228 */     inputadapter_col0 = null;
/* 229 */
/* 230 */     columnar_itr = null;
/* 231 */   }
/* 232 */
/* 233 */   protected void processNext() throws java.io.IOException {
/* 234 */     if ((columnar_batchIdx != 0) ||
/* 235 */       (inputadapter_input1 instanceof org.apache.spark.sql.execution.columnar.ColumnarIterator &&
/* 236 */         (columnar_itr = (org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).isSupportColumnarCodeGen())) {
/* 237 */       processBatch();
/* 238 */     } else {
/* 239 */       processRow();
/* 240 */     }
/* 241 */   }
/* 242 */ }

How was this patch tested?

Added new tests into DataFrameCacheSuite.scala

@SparkQA
Copy link

SparkQA commented Jul 7, 2016

Test build #61913 has finished for PR 14091 at commit 54df41c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk kiszk changed the title [SPARK-16412][SQL] Generate Java code that gets an array in each column of CachedBatch when DataFrame.cache() is called [SPARK-16412][SQL][WIP] Generate Java code that gets an array in each column of CachedBatch when DataFrame.cache() is called Jul 8, 2016
remove duplicated final attribute at method declaration
@SparkQA
Copy link

SparkQA commented Aug 8, 2016

Test build #63350 has finished for PR 14091 at commit 61a4754.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Hi @kiszk, I just wonder if it is still WIP (just for curiosity).

@asfgit asfgit closed this in 5d2750a May 18, 2017
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
## What changes were proposed in this pull request?

This PR proposes to close PRs ...

  - inactive to the review comments more than a month
  - WIP and inactive more than a month
  - with Jenkins build failure but inactive more than a month
  - suggested to be closed and no comment against that
  - obviously looking inappropriate (e.g., Branch 0.5)

To make sure, I left a comment for each PR about a week ago and I could not have a response back from the author in these PRs below:

Closes apache#11129
Closes apache#12085
Closes apache#12162
Closes apache#12419
Closes apache#12420
Closes apache#12491
Closes apache#13762
Closes apache#13837
Closes apache#13851
Closes apache#13881
Closes apache#13891
Closes apache#13959
Closes apache#14091
Closes apache#14481
Closes apache#14547
Closes apache#14557
Closes apache#14686
Closes apache#15594
Closes apache#15652
Closes apache#15850
Closes apache#15914
Closes apache#15918
Closes apache#16285
Closes apache#16389
Closes apache#16652
Closes apache#16743
Closes apache#16893
Closes apache#16975
Closes apache#17001
Closes apache#17088
Closes apache#17119
Closes apache#17272
Closes apache#17971

Added:
Closes apache#17778
Closes apache#17303
Closes apache#17872

## How was this patch tested?

N/A

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#18017 from HyukjinKwon/close-inactive-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants