Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.2 #587

Merged
merged 35 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
30e7004
bumped dev version
metasim Mar 14, 2022
0b4309e
CI fix.
metasim Apr 7, 2022
80e6992
Dependency updates.
metasim Apr 7, 2022
d0e5bd5
Spark 3.1.3
metasim May 4, 2022
ff00b41
bumped dev version
metasim Mar 14, 2022
7f5e078
withNewChildrenInternal
echeipesh Jun 30, 2022
160f351
Try Aggregator implemtnation
echeipesh Jun 30, 2022
eb8ccb9
more explicit
echeipesh Jun 30, 2022
a92ee4e
Fix UDF style Aggregates
echeipesh Jul 6, 2022
8da8bd7
Bring in the Kryo setup
echeipesh Dec 5, 2022
ef2f4ee
Register functions directly
echeipesh Dec 5, 2022
d12ace5
Bump versions
echeipesh Dec 5, 2022
aab5486
Landsat PDS is gone :(
echeipesh Dec 9, 2022
a3ac4cf
Fix Resample and ResampleNearest
echeipesh Dec 10, 2022
0214fa2
fix masking functions
echeipesh Dec 11, 2022
285e03d
Fix test: 6900 bit at position 4 is 1 -- expect NODATA after mask
echeipesh Dec 11, 2022
725c9d5
TileRasterizerAggregate expects column in rf_raster_proj order
echeipesh Dec 11, 2022
91468b4
Use spark-testing-base - core tests green
echeipesh Dec 11, 2022
72a76a0
Update StacApiDataSourceTest.scala
echeipesh Dec 14, 2022
ae3acc4
disable GeoTrellisDataSourceSpec
echeipesh Dec 14, 2022
460971a
Shade caffeine
echeipesh Dec 14, 2022
b937a70
Merge branch 'develop' into spark-3.2
echeipesh Dec 14, 2022
43e8d3d
boop
echeipesh Dec 14, 2022
d1cfb99
Expressions constructors toSeq conversion
pomadchin Jan 3, 2023
b28a10b
Downgrade scaffeine to 4.1.0 for JDK 8 support in caffeine 2.9
echeipesh Jan 3, 2023
15b420c
pyspark version 3.2.1
echeipesh Jan 3, 2023
05b4c44
why exclude log4j ? tests need it
echeipesh Jan 3, 2023
ec3c5f4
GitHub actions build.
metasim Sep 27, 2021
3a7b90f
Fix formatting
echeipesh Jan 3, 2023
4f24ad5
Fix Expressions arity issue
pomadchin Jan 3, 2023
9be3cb6
Add .jvmopts
pomadchin Jan 3, 2023
61081b7
Fix: Mask operations preserver the target tile cell type
echeipesh Jan 4, 2023
b14adaa
Pin GitHub Actions to ubuntu-20.04
echeipesh Jan 13, 2023
df552b8
Implement withNewChildrenInternal directly
echeipesh Jan 13, 2023
c5cf70c
Remove python build from CI
echeipesh Jan 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ package org.locationtech.rasterframes.expressions.transformers

import geotrellis.raster.{NODATA, Tile, isNoData}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.{Column, TypedColumn}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, ExpressionDescription}
import org.apache.spark.sql.types.DataType
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor
import org.locationtech.rasterframes.expressions.{RasterResult, row}
import org.locationtech.rasterframes.tileEncoder

Expand All @@ -45,36 +42,26 @@ import org.locationtech.rasterframes.tileEncoder
..."""
)
case class InverseMaskByDefined(targetTile: Expression, maskTile: Expression)
extends BinaryExpression
extends BinaryExpression with MaskExpression
with CodegenFallback
with RasterResult {
override def nodeName: String = "rf_inverse_mask"

def dataType: DataType = targetTile.dataType
def left: Expression = targetTile
def right: Expression = maskTile

protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
InverseMaskByDefined(newLeft, newRight)

override def checkInputDataTypes(): TypeCheckResult = {
if (!tileExtractor.isDefinedAt(targetTile.dataType)) {
TypeCheckFailure(s"Input type '${targetTile.dataType}' does not conform to a raster type.")
} else if (!tileExtractor.isDefinedAt(maskTile.dataType)) {
TypeCheckFailure(s"Input type '${maskTile.dataType}' does not conform to a raster type.")
} else TypeCheckSuccess
}

private lazy val targetTileExtractor = tileExtractor(targetTile.dataType)
private lazy val maskTileExtractor = tileExtractor(maskTile.dataType)
override def checkInputDataTypes(): TypeCheckResult = checkTileDataTypes()

override protected def nullSafeEval(targetInput: Any, maskInput: Any): Any = {
val (targetTile, targetCtx) = targetTileExtractor(row(targetInput))
val (mask, maskCtx) = maskTileExtractor(row(maskInput))

val result = targetTile.dualCombine(mask)
{ (v, m) => if (isNoData(m)) v else NODATA }
val result = maskEval(targetTile, mask,
{ (v, m) => if (isNoData(m)) v else NODATA },
{ (v, m) => if (isNoData(m)) v else NODATA }
)
toInternalRow(result, targetCtx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@

package org.locationtech.rasterframes.expressions.transformers

import geotrellis.raster.{NODATA, Tile, d2i}
import geotrellis.raster.{NODATA, Tile}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.{Column, TypedColumn}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, TernaryExpression}
import org.apache.spark.sql.types.DataType
import org.locationtech.rasterframes.expressions.DynamicExtractors.{intArgExtractor, tileExtractor}
import org.locationtech.rasterframes.expressions.DynamicExtractors.intArgExtractor
import org.locationtech.rasterframes.expressions.{RasterResult, row}
import org.locationtech.rasterframes.tileEncoder

Expand All @@ -47,12 +46,11 @@ import org.locationtech.rasterframes.tileEncoder
..."""
)
case class InverseMaskByValue(targetTile: Expression, maskTile: Expression, maskValue: Expression)
extends TernaryExpression
extends TernaryExpression with MaskExpression
with CodegenFallback
with RasterResult {
override def nodeName: String = "rf_inverse_mask_by_value"

def dataType: DataType = targetTile.dataType
def first: Expression = targetTile
def second: Expression = maskTile
def third: Expression = maskValue
Expand All @@ -61,27 +59,22 @@ case class InverseMaskByValue(targetTile: Expression, maskTile: Expression, mask
InverseMaskByValue(newFirst, newSecond, newThird)

override def checkInputDataTypes(): TypeCheckResult = {
if (!tileExtractor.isDefinedAt(targetTile.dataType)) {
TypeCheckFailure(s"Input type '${targetTile.dataType}' does not conform to a raster type.")
} else if (!tileExtractor.isDefinedAt(maskTile.dataType)) {
TypeCheckFailure(s"Input type '${maskTile.dataType}' does not conform to a raster type.")
} else if (!intArgExtractor.isDefinedAt(maskValue.dataType)) {
if (!intArgExtractor.isDefinedAt(maskValue.dataType)) {
TypeCheckFailure(s"Input type '${maskValue.dataType}' isn't an integral type.")
} else TypeCheckSuccess
} else checkTileDataTypes()
}

private lazy val targetTileExtractor = tileExtractor(targetTile.dataType)
private lazy val maskTileExtractor = tileExtractor(maskTile.dataType)
private lazy val maskValueExtractor = intArgExtractor(maskValue.dataType)

override protected def nullSafeEval(targetInput: Any, maskInput: Any, maskValueInput: Any): Any = {
val (targetTile, targetCtx) = targetTileExtractor(row(targetInput))
val (mask, maskCtx) = maskTileExtractor(row(maskInput))
val maskValue = maskValueExtractor(maskValueInput).value

val result = targetTile.dualCombine(mask)
val result = maskEval(targetTile, mask,
{ (v, m) => if (m != maskValue) NODATA else v },
{ (v, m) => if (m != maskValue) NODATA else v }
{ (v, m) => if (d2i(m) != maskValue) NODATA else v }
)
toInternalRow(result, targetCtx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
package org.locationtech.rasterframes.expressions.transformers
import geotrellis.raster.{NODATA, Tile, isNoData}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.{Column, TypedColumn}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, ExpressionDescription}
import org.apache.spark.sql.types.DataType
import org.locationtech.rasterframes.expressions.DynamicExtractors.{tileExtractor}
import org.locationtech.rasterframes.expressions.{RasterResult, row}
import org.locationtech.rasterframes.tileEncoder

Expand All @@ -44,36 +41,26 @@ import org.locationtech.rasterframes.tileEncoder
..."""
)
case class MaskByDefined(targetTile: Expression, maskTile: Expression)
extends BinaryExpression
extends BinaryExpression with MaskExpression
with CodegenFallback
with RasterResult {
override def nodeName: String = "rf_mask"

def dataType: DataType = targetTile.dataType
def left: Expression = targetTile
def right: Expression = maskTile

protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
MaskByDefined(newLeft, newRight)

override def checkInputDataTypes(): TypeCheckResult = {
if (!tileExtractor.isDefinedAt(targetTile.dataType)) {
TypeCheckFailure(s"Input type '${targetTile.dataType}' does not conform to a raster type.")
} else if (!tileExtractor.isDefinedAt(maskTile.dataType)) {
TypeCheckFailure(s"Input type '${maskTile.dataType}' does not conform to a raster type.")
} else TypeCheckSuccess
}

private lazy val targetTileExtractor = tileExtractor(targetTile.dataType)
private lazy val maskTileExtractor = tileExtractor(maskTile.dataType)
override def checkInputDataTypes(): TypeCheckResult = checkTileDataTypes()

override protected def nullSafeEval(targetInput: Any, maskInput: Any): Any = {
val (targetTile, targetCtx) = targetTileExtractor(row(targetInput))
val (mask, maskCtx) = maskTileExtractor(row(maskInput))

val result = targetTile.dualCombine(mask)
{ (v, m) => if (isNoData(m)) NODATA else v }
val result = maskEval(targetTile, mask,
{ (v, m) => if (isNoData(m)) NODATA else v },
{ (v, m) => if (isNoData(m)) NODATA else v }
)
toInternalRow(result, targetCtx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@

package org.locationtech.rasterframes.expressions.transformers

import geotrellis.raster.{NODATA, Tile, d2i}
import geotrellis.raster.{NODATA, Tile}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.{Column, TypedColumn}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, TernaryExpression}
import org.apache.spark.sql.types.{DataType}
import org.locationtech.rasterframes.expressions.DynamicExtractors.{intArgExtractor, tileExtractor}
import org.locationtech.rasterframes.expressions.DynamicExtractors.intArgExtractor
import org.locationtech.rasterframes.expressions.{RasterResult, row}
import org.locationtech.rasterframes.tileEncoder

Expand All @@ -46,42 +45,36 @@ import org.locationtech.rasterframes.tileEncoder
> SELECT _FUNC_(target, mask, maskValue);
..."""
)
case class MaskByValue(dataTile: Expression, maskTile: Expression, maskValue: Expression)
extends TernaryExpression
case class MaskByValue(targetTile: Expression, maskTile: Expression, maskValue: Expression)
extends TernaryExpression with MaskExpression
with CodegenFallback
with RasterResult {
override def nodeName: String = "rf_mask_by_value"

def dataType: DataType = dataTile.dataType
def first: Expression = dataTile
def first: Expression = targetTile
def second: Expression = maskTile
def third: Expression = maskValue

protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression =
MaskByValue(newFirst, newSecond, newThird)

override def checkInputDataTypes(): TypeCheckResult = {
if (!tileExtractor.isDefinedAt(dataTile.dataType)) {
TypeCheckFailure(s"Input type '${dataTile.dataType}' does not conform to a raster type.")
} else if (!tileExtractor.isDefinedAt(maskTile.dataType)) {
TypeCheckFailure(s"Input type '${maskTile.dataType}' does not conform to a raster type.")
} else if (!intArgExtractor.isDefinedAt(maskValue.dataType)) {
if (!intArgExtractor.isDefinedAt(maskValue.dataType)) {
TypeCheckFailure(s"Input type '${maskValue.dataType}' isn't an integral type.")
} else TypeCheckSuccess
} else checkTileDataTypes()
}

private lazy val dataTileExtractor = tileExtractor(dataTile.dataType)
private lazy val maskTileExtractor = tileExtractor(maskTile.dataType)
private lazy val maskValueExtractor = intArgExtractor(maskValue.dataType)

override protected def nullSafeEval(targetInput: Any, maskInput: Any, maskValueInput: Any): Any = {
val (targetTile, targetCtx) = dataTileExtractor(row(targetInput))
val (targetTile, targetCtx) = targetTileExtractor(row(targetInput))
val (mask, maskCtx) = maskTileExtractor(row(maskInput))
val maskValue = maskValueExtractor(maskValueInput).value

val result = targetTile.dualCombine(mask)
val result = maskEval(targetTile, mask,
{ (v, m) => if (m == maskValue) NODATA else v },
{ (v, m) => if (m == maskValue) NODATA else v }
{ (v, m) => if (d2i(m) == maskValue) NODATA else v }
)
toInternalRow(result, targetCtx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@

package org.locationtech.rasterframes.expressions.transformers

import geotrellis.raster.{NODATA, Tile, d2i}
import geotrellis.raster.{NODATA, Tile}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, TernaryExpression}
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, TypedColumn}
import org.locationtech.rasterframes.expressions.DynamicExtractors.{intArrayExtractor, tileExtractor}
import org.locationtech.rasterframes.expressions.DynamicExtractors.intArrayExtractor
import org.locationtech.rasterframes.expressions.{RasterResult, row}
import org.locationtech.rasterframes.tileEncoder

Expand All @@ -48,12 +47,11 @@ import org.locationtech.rasterframes.tileEncoder
..."""
)
case class MaskByValues(targetTile: Expression, maskTile: Expression, maskValues: Expression)
extends TernaryExpression
extends TernaryExpression with MaskExpression
with CodegenFallback
with RasterResult {
override def nodeName: String = "rf_mask_by_values"

def dataType: DataType = targetTile.dataType
def first: Expression = targetTile
def second: Expression = maskTile
def third: Expression = maskValues
Expand All @@ -62,26 +60,21 @@ case class MaskByValues(targetTile: Expression, maskTile: Expression, maskValues
MaskByValues(newFirst, newSecond, newThird)

override def checkInputDataTypes(): TypeCheckResult =
if (!tileExtractor.isDefinedAt(targetTile.dataType)) {
TypeCheckFailure(s"Input type '${targetTile.dataType}' does not conform to a raster type.")
} else if (!tileExtractor.isDefinedAt(maskTile.dataType)) {
TypeCheckFailure(s"Input type '${maskTile.dataType}' does not conform to a raster type.")
} else if (!intArrayExtractor.isDefinedAt(maskValues.dataType)) {
if (!intArrayExtractor.isDefinedAt(maskValues.dataType)) {
TypeCheckFailure(s"Input type '${maskValues.dataType}' does not translate to an array<int>.")
} else TypeCheckSuccess
} else checkTileDataTypes()

private lazy val targetTileExtractor = tileExtractor(targetTile.dataType)
private lazy val maskTileExtractor = tileExtractor(maskTile.dataType)
private lazy val maskValuesExtractor = intArrayExtractor(maskValues.dataType)

override protected def nullSafeEval(targetInput: Any, maskInput: Any, maskValuesInput: Any): Any = {
val (targetTile, targetCtx) = targetTileExtractor(row(targetInput))
val (mask, maskCtx) = maskTileExtractor(row(maskInput))
val maskValues: Array[Int] = maskValuesExtractor(maskValuesInput.asInstanceOf[ArrayData])

val result = targetTile.dualCombine(mask)
val result = maskEval(targetTile, mask,
{ (v, m) => if (maskValues.contains(m)) NODATA else v },
{ (v, m) => if (maskValues.contains(m)) NODATA else v }
{ (v, m) => if (maskValues.contains(d2i(m))) NODATA else v }
)

toInternalRow(result, targetCtx)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* This software is licensed under the Apache 2 license, quoted below.
*
* Copyright 2019 Astraea, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* [http://www.apache.org/licenses/LICENSE-2.0]
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.locationtech.rasterframes.expressions.transformers

import geotrellis.raster.Tile
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.DataType
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor

import spire.syntax.cfor._

trait MaskExpression { self: Expression =>
echeipesh marked this conversation as resolved.
Show resolved Hide resolved

def targetTile: Expression
def maskTile: Expression

def dataType: DataType = targetTile.dataType

protected lazy val targetTileExtractor = tileExtractor(targetTile.dataType)
protected lazy val maskTileExtractor = tileExtractor(maskTile.dataType)

def checkTileDataTypes(): TypeCheckResult = {
if (!tileExtractor.isDefinedAt(targetTile.dataType)) {
TypeCheckFailure(s"Input type '${targetTile.dataType}' does not conform to a raster type.")
} else if (!tileExtractor.isDefinedAt(maskTile.dataType)) {
TypeCheckFailure(s"Input type '${maskTile.dataType}' does not conform to a raster type.")
} else TypeCheckSuccess
}

def maskEval(targetTile: Tile, maskTile: Tile, maskInt: (Int, Int) => Int, maskDouble: (Double, Int) => Double): Tile = {
val result = targetTile.mutable

if (targetTile.cellType.isFloatingPoint) {
cfor(0)(_ < targetTile.rows, _ + 1) { row =>
cfor(0)(_ < targetTile.cols, _ + 1) { col =>
val v = targetTile.getDouble(col, row)
val m = maskTile.get(col, row)
result.setDouble(col, row, maskDouble(v, m))
}
}
} else {
cfor(0)(_ < targetTile.rows, _ + 1) { row =>
cfor(0)(_ < targetTile.cols, _ + 1) { col =>
val v = targetTile.get(col, row)
val m = maskTile.get(col, row)
result.set(col, row, maskInt(v, m))
}
}
}

result
}
}