Skip to content

Conversation

@mn-mikke
Copy link
Contributor

@mn-mikke mn-mikke commented Apr 21, 2018

What changes were proposed in this pull request?

Adding function zip_with_index(array[, indexFirst, startFromZero]) that transforms the input array by encapsulating elements into pairs with indexes indicating the order.

zip_with_index(array("d", "a", null, "b")) => [("d",1),("a",2),(null,3),("b",4)]
zip_with_index(array("d", "a", null, "b"), true, false) => [(1,"d"),(2,"a"),(3,null),(4,"b")]
zip_with_index(array("d", "a", null, "b"), true, true) => [(0,"d"),(1,"a"),(2,null),(3,"b")]

How was this patch tested?

New tests added into:

  • CollectionExpressionSuite
  • DataFrameFunctionsSuite

Codegen examples

Primitive type

val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(zip_with_index($"i")).debugCodegen

Result:

/* 033 */         boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
/* 034 */         ArrayData inputadapter_value_0 = inputadapter_isNull_0 ?
/* 035 */         null : (inputadapter_row_0.getArray(0));
/* 036 */
/* 037 */         boolean filter_value_0 = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull_0)) {
/* 040 */           filter_value_0 = inputadapter_isNull_0;
/* 041 */         }
/* 042 */         if (!filter_value_0) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull_0 = inputadapter_isNull_0;
/* 047 */         ArrayData project_value_0 = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull_0) {
/* 050 */           final int project_numElements_0 = inputadapter_value_0.numElements();
/* 051 */
/* 052 */           final int project_structSize_0 = 24;
/* 053 */           final long project_byteArraySize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numElements_0, 8 + project_structSize_0);
/* 054 */           final int project_structsOffset_0 = UnsafeArrayData.calculateHeaderPortionInBytes(project_numElements_0) + project_numElements_0 * 8;
/* 055 */           if (project_byteArraySize_0 > 2147483632) {
/* 056 */             final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 057 */             for (int z = 0; z < project_numElements_0; z++) {
/* 058 */               project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{inputadapter_value_0.getInt(z), z + 1});
/* 059 */             }
/* 060 */             project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
/* 061 */
/* 062 */           } else {
/* 063 */             final byte[] project_byteArray_0 = new byte[(int)project_byteArraySize_0];
/* 064 */             UnsafeArrayData project_unsafeArrayData_0 = new UnsafeArrayData();
/* 065 */             Platform.putLong(project_byteArray_0, 16, project_numElements_0);
/* 066 */             project_unsafeArrayData_0.pointTo(project_byteArray_0, 16, (int)project_byteArraySize_0);
/* 067 */             UnsafeRow project_unsafeRow_0 = new UnsafeRow(2);
/* 068 */             for (int z = 0; z < project_numElements_0; z++) {
/* 069 */               long offset = project_structsOffset_0 + z * project_structSize_0;
/* 070 */               project_unsafeArrayData_0.setLong(z, (offset << 32) + project_structSize_0);
/* 071 */               project_unsafeRow_0.pointTo(project_byteArray_0, 16 + offset, project_structSize_0);
/* 072 */               if (false && inputadapter_value_0.isNullAt(z)) {
/* 073 */                 project_unsafeRow_0.setNullAt(0);
/* 074 */               } else {
/* 075 */                 project_unsafeRow_0.setInt(
/* 076 */                   0,
/* 077 */                   inputadapter_value_0.getInt(z)
/* 078 */                 );
/* 079 */               }
/* 080 */               project_unsafeRow_0.setInt(1, z + 1);
/* 081 */             }
/* 082 */             project_value_0 = project_unsafeArrayData_0;
/* 083 */           }
/* 084 */
/* 085 */         }

Non-primitive type

val df = Seq(
  Seq("d", "a", "f", "g"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(zip_with_index($"s")).debugCodegen

Result:

/* 033 */         boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
/* 034 */         ArrayData inputadapter_value_0 = inputadapter_isNull_0 ?
/* 035 */         null : (inputadapter_row_0.getArray(0));
/* 036 */
/* 037 */         boolean filter_value_0 = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull_0)) {
/* 040 */           filter_value_0 = inputadapter_isNull_0;
/* 041 */         }
/* 042 */         if (!filter_value_0) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull_0 = inputadapter_isNull_0;
/* 047 */         ArrayData project_value_0 = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull_0) {
/* 050 */           final int project_numElements_0 = inputadapter_value_0.numElements();
/* 051 */
/* 052 */           final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 053 */           for (int z = 0; z < project_numElements_0; z++) {
/* 054 */             project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{inputadapter_value_0.getUTF8String(z), z + 1});
/* 055 */           }
/* 056 */           project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
/* 057 */
/* 058 */         }

@mn-mikke
Copy link
Contributor Author

cc @gatorsmile @ueshin @kiszk

@HyukjinKwon
Copy link
Member

ok to test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's one more leading space here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: // scalastyle:on line.size.limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid using a default value in APIs. It doesn't work in Java.

@gatorsmile
Copy link
Member

Which database has this function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about val (valuePosition, indexPosition) = if (indexFirstValue) ("1", "0") else ("0", "1")?

@mn-mikke
Copy link
Contributor Author

@gatorsmile I'm not aware of any. From user experience, I strongly feel that such a function is missing. Escpecially, when transform function is introduced.

@SparkQA
Copy link

SparkQA commented Apr 21, 2018

Test build #89679 has finished for PR 21121 at commit 551d04d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2018

Test build #89683 has finished for PR 21121 at commit a599544.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2018

Test build #89686 has finished for PR 21121 at commit 06348c3.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the index be 0-based or 1-based? Other array functions seems to be 1-based.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's really good question! The newly added functions element_at and array_position are 1-based. But on the other handed, the getItem from the Column class is 0-based. What about adding one extra parameter and let users decide whether the array will indexed from 0 or 1.

Copy link
Member

@viirya viirya Apr 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure the input is always unsafe-backed array? If it is GenericArrayData?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. You just use unsafe-backed array as output.

Copy link
Member

@viirya viirya Apr 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, if we use GenericArrayData as output array, can't we avoid this limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your suggestion. So instead of throwing the exception, the function will execute a similar piece of code as in genCodeForNonPrimitiveElements...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we can alleviate this limitation ( up to MAX_ARRAY_LENGTHMAX_ARRAY_LENGTH elements) if we use GenericArrayData. BTW, we have to do the same check in genCodeForNonPrimitiveElements`, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, this is zip that does not involve concat of multiple arrays.

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89802 has finished for PR 21121 at commit fd4e473.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ZipWithIndex(child: Expression, indexFirst: Expression, startFromZero: Expression)

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89803 has finished for PR 21121 at commit 67915f2.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2018

Test build #89804 has finished for PR 21121 at commit 4e9b140.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@kiszk kiszk Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove null check if containNulls is false even when elementType is not primitive type? For example, ArrayType(StringType, null)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot! Thanks.

@SparkQA
Copy link

SparkQA commented Apr 25, 2018

Test build #89838 has finished for PR 21121 at commit ffcebe3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2018

Test build #89857 has finished for PR 21121 at commit fd71544.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CachedRDDBuilder(
  • case class InMemoryRelation(

@mn-mikke mn-mikke force-pushed the feature/array-api-zip_with_index-to-master branch from fd71544 to 51c8199 Compare April 26, 2018 13:49
@mn-mikke mn-mikke force-pushed the feature/array-api-zip_with_index-to-master branch from 51c8199 to bcd52bd Compare April 26, 2018 14:03
@SparkQA
Copy link

SparkQA commented Apr 26, 2018

Test build #89887 has finished for PR 21121 at commit 51c8199.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrayJoin(
  • case class Flatten(child: Expression) extends UnaryExpression
  • case class MonthsBetween(
  • trait QueryPlanConstraints extends ConstraintHelper
  • trait ConstraintHelper
  • case class CachedRDDBuilder(
  • case class InMemoryRelation(
  • case class WriteToContinuousDataSource(
  • case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)

@SparkQA
Copy link

SparkQA commented Apr 26, 2018

Test build #89888 has finished for PR 21121 at commit bcd52bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrayJoin(
  • case class Flatten(child: Expression) extends UnaryExpression
  • case class MonthsBetween(
  • trait QueryPlanConstraints extends ConstraintHelper
  • trait ConstraintHelper
  • case class CachedRDDBuilder(
  • case class InMemoryRelation(
  • case class WriteToContinuousDataSource(
  • case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)

@ueshin
Copy link
Member

ueshin commented Apr 27, 2018

I'm still not sure we really need this function.
If the purpose is only for transform function you mentionsed at #21121 (comment), how about adding a second parameter to transform to pass the index? It would seem higher performance because we don't need to materialize the struct.
It's also up to @hvanhovell who I believe is working on transform, so I'd like to wait for his opinion as well.

@lokm01
Copy link

lokm01 commented Apr 27, 2018

@ueshin Currently we use our own implementation of zipWithIndex when we do explode and need to preserve the ordering of the array elements (especially if there is a shuffle involved in the subsequent transformation).

Sure, once transform becomes available, it will be much better and more performant to use that, but since we're dealing with production applications, we would like to start rewriting these jobs with those small "drop-in" replacements for functions such as zipWithIndex before going for a major rewrite with HOFs in spark SQL.

I've seen many threads in the community, which recommend the same approach when dealing with these difficult array cases - I'm pretty sure it will benefit other users.

@mn-mikke
Copy link
Contributor Author

@ueshin What about combining zip_with_index with map_from_entries?

@Tagar
Copy link

Tagar commented May 1, 2018

Would this cover https://issues.apache.org/jira/browse/SPARK-23074 as well? Thanks.

@rxin
Copy link
Contributor

rxin commented May 1, 2018

@lokm01 wouldn't @ueshin's suggestion on adding a second parameter to transform work for you? You can just do something similar to transform(x, (entry, index) -> struct(entry, index)). Perhaps zip_with_index is just an alias for that.

@mn-mikke
Copy link
Contributor Author

mn-mikke commented May 2, 2018

@rxin Oh, I see. In that case, I'm happy to close the PR. @hvanhovell Can you confirm that the transform function will pass the index into lambda functions?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@ueshin
Copy link
Member

ueshin commented Aug 9, 2018

@mn-mikke I think we can close this since we've added transform which can take the index argument as suggested.

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Aug 9, 2018

Sure, closing ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.