Skip to content

Commit

Permalink
Support Spark4 Column Node API
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Aug 20, 2024
1 parent 568eac0 commit 18e4532
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 3 deletions.
26 changes: 26 additions & 0 deletions src/main/scala-spark-3.5/org/apache/spark/extension/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 G-Research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.Expression

package object extension {
implicit class ExpressionExtension(expr: Expression) {
def toColumn: Column = new Column(expr)
}
}
26 changes: 26 additions & 0 deletions src/main/scala-spark-4.0/org/apache/spark/extension/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 G-Research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.Expression

package object extension {
implicit class ExpressionExtension(expr: Expression) {
def toColumn: Column = Column(expr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.co.gresearch.spark.diff.comparator

import org.apache.spark.extension.ExpressionExtension
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
Expand All @@ -31,8 +32,7 @@ trait EquivDiffComparator[T] extends DiffComparator {

private trait ExpressionEquivDiffComparator[T] extends EquivDiffComparator[T] {
def equiv(left: Expression, right: Expression): EquivExpression[T]
def equiv(left: Column, right: Column): Column =
new Column(equiv(left.expr, right.expr).asInstanceOf[Expression])
def equiv(left: Column, right: Column): Column = equiv(left.expr, right.expr).toColumn
}

trait TypedEquivDiffComparator[T] extends EquivDiffComparator[T] with TypedDiffComparator
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/uk/co/gresearch/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.co.gresearch

import org.apache.spark.extension.ExpressionExtension
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.NamedExpression
Expand Down Expand Up @@ -271,7 +272,7 @@ package object spark extends Logging with SparkVersion with BuildVersion {
* result tick value column
*/
def timestampToDotNetTicks(timestampColumn: Column): Column =
unixEpochTenthMicrosToDotNetTicks(new Column(UnixMicros.unixMicros(timestampColumn.expr)) * 10)
unixEpochTenthMicrosToDotNetTicks(UnixMicros.unixMicros(timestampColumn.expr).toColumn * 10)

/**
* Convert a Spark timestamp to a .Net `DateTime.Ticks` timestamp. The input column must be of TimestampType.
Expand Down

0 comments on commit 18e4532

Please sign in to comment.