Skip to content

Commit 86fa433

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-24058
2 parents 526fa4a + 9498e52 commit 86fa433

File tree

215 files changed

+6177
-1968
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

215 files changed

+6177
-1968
lines changed

R/pkg/NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ exportMethods("%<=>%",
201201
"approxCountDistinct",
202202
"approxQuantile",
203203
"array_contains",
204+
"array_max",
205+
"array_min",
204206
"array_position",
205207
"asc",
206208
"ascii",
@@ -256,6 +258,7 @@ exportMethods("%<=>%",
256258
"expr",
257259
"factorial",
258260
"first",
261+
"flatten",
259262
"floor",
260263
"format_number",
261264
"format_string",

R/pkg/R/functions.R

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ NULL
206206
#' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
207207
#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
208208
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
209+
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
209210
#' head(select(tmp, array_position(tmp$v1, 21)))
211+
#' head(select(tmp, flatten(tmp$v1)))
210212
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
211213
#' head(tmp2)
212214
#' head(select(tmp, posexplode(tmp$v1)))
@@ -2992,6 +2994,32 @@ setMethod("array_contains",
29922994
column(jc)
29932995
})
29942996

2997+
#' @details
2998+
#' \code{array_max}: Returns the maximum value of the array.
2999+
#'
3000+
#' @rdname column_collection_functions
3001+
#' @aliases array_max array_max,Column-method
3002+
#' @note array_max since 2.4.0
3003+
setMethod("array_max",
3004+
signature(x = "Column"),
3005+
function(x) {
3006+
jc <- callJStatic("org.apache.spark.sql.functions", "array_max", x@jc)
3007+
column(jc)
3008+
})
3009+
3010+
#' @details
3011+
#' \code{array_min}: Returns the minimum value of the array.
3012+
#'
3013+
#' @rdname column_collection_functions
3014+
#' @aliases array_min array_min,Column-method
3015+
#' @note array_min since 2.4.0
3016+
setMethod("array_min",
3017+
signature(x = "Column"),
3018+
function(x) {
3019+
jc <- callJStatic("org.apache.spark.sql.functions", "array_min", x@jc)
3020+
column(jc)
3021+
})
3022+
29953023
#' @details
29963024
#' \code{array_position}: Locates the position of the first occurrence of the given value
29973025
#' in the given array. Returns NA if either of the arguments are NA.
@@ -3008,6 +3036,19 @@ setMethod("array_position",
30083036
column(jc)
30093037
})
30103038

3039+
#' @details
3040+
#' \code{flatten}: Transforms an array of arrays into a single array.
3041+
#'
3042+
#' @rdname column_collection_functions
3043+
#' @aliases flatten flatten,Column-method
3044+
#' @note flatten since 2.4.0
3045+
setMethod("flatten",
3046+
signature(x = "Column"),
3047+
function(x) {
3048+
jc <- callJStatic("org.apache.spark.sql.functions", "flatten", x@jc)
3049+
column(jc)
3050+
})
3051+
30113052
#' @details
30123053
#' \code{map_keys}: Returns an unordered array containing the keys of the map.
30133054
#'

R/pkg/R/generics.R

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,14 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun
757757
#' @name NULL
758758
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })
759759

760+
#' @rdname column_collection_functions
761+
#' @name NULL
762+
setGeneric("array_max", function(x) { standardGeneric("array_max") })
763+
764+
#' @rdname column_collection_functions
765+
#' @name NULL
766+
setGeneric("array_min", function(x) { standardGeneric("array_min") })
767+
760768
#' @rdname column_collection_functions
761769
#' @name NULL
762770
setGeneric("array_position", function(x, value) { standardGeneric("array_position") })
@@ -910,6 +918,10 @@ setGeneric("explode_outer", function(x) { standardGeneric("explode_outer") })
910918
#' @name NULL
911919
setGeneric("expr", function(x) { standardGeneric("expr") })
912920

921+
#' @rdname column_collection_functions
922+
#' @name NULL
923+
setGeneric("flatten", function(x) { standardGeneric("flatten") })
924+
913925
#' @rdname column_datetime_diff_functions
914926
#' @name NULL
915927
setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,11 +1479,18 @@ test_that("column functions", {
14791479
df5 <- createDataFrame(list(list(a = "010101")))
14801480
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")
14811481

1482-
# Test array_contains(), array_position(), element_at() and sort_array()
1482+
# Test array_contains(), array_max(), array_min(), array_position(), element_at()
1483+
# and sort_array()
14831484
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
14841485
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
14851486
expect_equal(result, c(TRUE, FALSE))
14861487

1488+
result <- collect(select(df, array_max(df[[1]])))[[1]]
1489+
expect_equal(result, c(3, 6))
1490+
1491+
result <- collect(select(df, array_min(df[[1]])))[[1]]
1492+
expect_equal(result, c(1, 4))
1493+
14871494
result <- collect(select(df, array_position(df[[1]], 1L)))[[1]]
14881495
expect_equal(result, c(1, 0))
14891496

@@ -1495,6 +1502,12 @@ test_that("column functions", {
14951502
result <- collect(select(df, sort_array(df[[1]])))[[1]]
14961503
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
14971504

1505+
# Test flattern
1506+
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L, 4L))),
1507+
list(list(list(5L, 6L), list(7L, 8L)))))
1508+
result <- collect(select(df, flatten(df[[1]])))[[1]]
1509+
expect_equal(result, list(list(1L, 2L, 3L, 4L), list(5L, 6L, 7L, 8L)))
1510+
14981511
# Test map_keys(), map_values() and element_at()
14991512
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
15001513
result <- collect(select(df, map_keys(df$map)))[[1]]
@@ -2197,8 +2210,8 @@ test_that("join(), crossJoin() and merge() on a DataFrame", {
21972210
expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
21982211
# cartesian join
21992212
expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
2200-
paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
2201-
" INNER join between logical plans).*"))
2213+
paste0(".*(org.apache.spark.sql.AnalysisException: Detected implicit cartesian",
2214+
" product for INNER join between logical plans).*"))
22022215

22032216
joined <- crossJoin(df, df2)
22042217
expect_equal(names(joined), c("age", "name", "name", "test"))

common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.unsafe.types;
1919

20-
import org.apache.spark.unsafe.Platform;
21-
2220
import java.util.Arrays;
2321

22+
import com.google.common.primitives.Ints;
23+
24+
import org.apache.spark.unsafe.Platform;
25+
2426
public final class ByteArray {
2527

2628
public static final byte[] EMPTY_BYTE = new byte[0];
@@ -77,17 +79,17 @@ public static byte[] subStringSQL(byte[] bytes, int pos, int len) {
7779

7880
public static byte[] concat(byte[]... inputs) {
7981
// Compute the total length of the result
80-
int totalLength = 0;
82+
long totalLength = 0;
8183
for (int i = 0; i < inputs.length; i++) {
8284
if (inputs[i] != null) {
83-
totalLength += inputs[i].length;
85+
totalLength += (long)inputs[i].length;
8486
} else {
8587
return null;
8688
}
8789
}
8890

8991
// Allocate a new byte array, and copy the inputs one by one into it
90-
final byte[] result = new byte[totalLength];
92+
final byte[] result = new byte[Ints.checkedCast(totalLength)];
9193
int offset = 0;
9294
for (int i = 0; i < inputs.length; i++) {
9395
int len = inputs[i].length;

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import com.esotericsoftware.kryo.KryoSerializable;
3030
import com.esotericsoftware.kryo.io.Input;
3131
import com.esotericsoftware.kryo.io.Output;
32-
3332
import com.google.common.primitives.Ints;
33+
3434
import org.apache.spark.unsafe.Platform;
3535
import org.apache.spark.unsafe.array.ByteArrayMethods;
3636
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
@@ -877,17 +877,17 @@ public UTF8String lpad(int len, UTF8String pad) {
877877
*/
878878
public static UTF8String concat(UTF8String... inputs) {
879879
// Compute the total length of the result.
880-
int totalLength = 0;
880+
long totalLength = 0;
881881
for (int i = 0; i < inputs.length; i++) {
882882
if (inputs[i] != null) {
883-
totalLength += inputs[i].numBytes;
883+
totalLength += (long)inputs[i].numBytes;
884884
} else {
885885
return null;
886886
}
887887
}
888888

889889
// Allocate a new byte array, and copy the inputs one by one into it.
890-
final byte[] result = new byte[totalLength];
890+
final byte[] result = new byte[Ints.checkedCast(totalLength)];
891891
int offset = 0;
892892
for (int i = 0; i < inputs.length; i++) {
893893
int len = inputs[i].numBytes;

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolE
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

2424
import scala.collection.JavaConverters._
25-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
25+
import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map}
2626
import scala.concurrent.{ExecutionContext, Future}
2727
import scala.concurrent.duration.Duration
2828
import scala.reflect.ClassTag
@@ -282,7 +282,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
282282

283283
// For testing
284284
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
285-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
285+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
286286
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
287287
}
288288

@@ -296,7 +296,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
296296
* describing the shuffle blocks that are stored at that block manager.
297297
*/
298298
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
299-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])]
299+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
300300

301301
/**
302302
* Deletes map output status information for the specified shuffle stage.
@@ -632,17 +632,18 @@ private[spark] class MapOutputTrackerMaster(
632632
}
633633
}
634634

635+
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
635636
// This method is only called in local-mode.
636637
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
637-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
638+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
638639
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
639640
shuffleStatuses.get(shuffleId) match {
640641
case Some (shuffleStatus) =>
641642
shuffleStatus.withMapStatuses { statuses =>
642643
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
643644
}
644645
case None =>
645-
Seq.empty
646+
Iterator.empty
646647
}
647648
}
648649

@@ -669,8 +670,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
669670
/** Remembers which map output locations are currently being fetched on an executor. */
670671
private val fetching = new HashSet[Int]
671672

673+
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
672674
override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
673-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
675+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
674676
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
675677
val statuses = getStatuses(shuffleId)
676678
try {
@@ -841,6 +843,7 @@ private[spark] object MapOutputTracker extends Logging {
841843
* Given an array of map statuses and a range of map output partitions, returns a sequence that,
842844
* for each block manager ID, lists the shuffle block IDs and corresponding shuffle block sizes
843845
* stored at that block manager.
846+
* Note that empty blocks are filtered in the result.
844847
*
845848
* If any of the statuses is null (indicating a missing location due to a failed mapper),
846849
* throws a FetchFailedException.
@@ -857,22 +860,24 @@ private[spark] object MapOutputTracker extends Logging {
857860
shuffleId: Int,
858861
startPartition: Int,
859862
endPartition: Int,
860-
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
863+
statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
861864
assert (statuses != null)
862-
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
863-
for ((status, mapId) <- statuses.zipWithIndex) {
865+
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long)]]
866+
for ((status, mapId) <- statuses.iterator.zipWithIndex) {
864867
if (status == null) {
865868
val errorMessage = s"Missing an output location for shuffle $shuffleId"
866869
logError(errorMessage)
867870
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
868871
} else {
869872
for (part <- startPartition until endPartition) {
870-
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
871-
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
873+
val size = status.getSizeForBlock(part)
874+
if (size != 0) {
875+
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
876+
((ShuffleBlockId(shuffleId, mapId, part), size))
877+
}
872878
}
873879
}
874880
}
875-
876-
splitsByAddress.toSeq
881+
splitsByAddress.iterator
877882
}
878883
}

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ private[spark] class SecurityManager(
8989
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
9090
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
9191

92+
private var secretKey: String = _
9293
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
9394
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
9495
"; users with view permissions: " + viewAcls.toString() +
@@ -321,6 +322,12 @@ private[spark] class SecurityManager(
321322
val creds = UserGroupInformation.getCurrentUser().getCredentials()
322323
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
323324
.map { bytes => new String(bytes, UTF_8) }
325+
// Secret key may not be found in current UGI's credentials.
326+
// This happens when UGI is refreshed in the driver side by UGI's loginFromKeytab but not
327+
// copy secret key from original UGI to the new one. This exists in ThriftServer's Hive
328+
// logic. So as a workaround, storing secret key in a local variable to make it visible
329+
// in different context.
330+
.orElse(Option(secretKey))
324331
.orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
325332
.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
326333
.getOrElse {
@@ -364,8 +371,8 @@ private[spark] class SecurityManager(
364371
rnd.nextBytes(secretBytes)
365372

366373
val creds = new Credentials()
367-
val secretStr = HashCodes.fromBytes(secretBytes).toString()
368-
creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8))
374+
secretKey = HashCodes.fromBytes(secretBytes).toString()
375+
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
369376
UserGroupInformation.getCurrentUser().addCredentials(creds)
370377
}
371378

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.deploy.history
1919

2020
import java.io.{File, FileNotFoundException, IOException}
21+
import java.nio.file.Files
22+
import java.nio.file.attribute.PosixFilePermissions
2123
import java.util.{Date, ServiceLoader}
2224
import java.util.concurrent.{ExecutorService, TimeUnit}
2325
import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -130,8 +132,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
130132

131133
// Visible for testing.
132134
private[history] val listing: KVStore = storePath.map { path =>
133-
require(path.isDirectory(), s"Configured store directory ($path) does not exist.")
134-
val dbPath = new File(path, "listing.ldb")
135+
val perms = PosixFilePermissions.fromString("rwx------")
136+
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath(),
137+
PosixFilePermissions.asFileAttribute(perms)).toFile()
138+
135139
val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
136140
AppStatusStore.CURRENT_VERSION, logDir.toString())
137141

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
2525
import com.google.common.io.Files
2626

2727
import org.apache.spark.{SecurityManager, SparkConf}
28-
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
28+
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
2929
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.rpc.RpcEndpointRef
@@ -142,7 +142,11 @@ private[deploy] class ExecutorRunner(
142142
private def fetchAndRunExecutor() {
143143
try {
144144
// Launch the process
145-
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
145+
val subsOpts = appDesc.command.javaOpts.map {
146+
Utils.substituteAppNExecIds(_, appId, execId.toString)
147+
}
148+
val subsCommand = appDesc.command.copy(javaOpts = subsOpts)
149+
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
146150
memory, sparkHome.getAbsolutePath, substituteVariables)
147151
val command = builder.command()
148152
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")

0 commit comments

Comments
 (0)