Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Adding running on k8s.md #552

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open

Adding running on k8s.md #552

wants to merge 30 commits into from

Conversation

foxish
Copy link
Member

@foxish foxish commented Nov 14, 2017

This is updated k8s documentation for spark-2.3.
Pruned, reorganized some content to make it somewhat consistent with other documentation.

cc @apache-spark-on-k8s/contributors @liyinan926

* This will become a table of contents (this text will be scraped).
{:toc}

Spark can run on clusters managed by [Kubernetes](https://kubernetes.io). This feature is currently experimental.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call out Kubernetes as a native scheduler backend to differentiate it from other options like helm charts for standalone.


# Prerequisites

* A spark distribution with Kubernetes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a reference to releases: ([releases](https://github.com/apache-spark-on-k8s/spark/releases)).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for upstream, so, it shouldn't link back to our fork - maybe I'll add "Spark 2.3 and above" to clarify?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

[kubectl](https://kubernetes.io/docs/user-guide/prereqs/). If you do not already have a working Kubernetes cluster,
you may setup a test cluster on your local machine using
[minikube](https://kubernetes.io/docs/getting-started-guides/minikube/).
* We recommend that minikube be updated to the most recent version (0.19.0 at the time of this documentation), as some
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minikube version mentioned here needs to be updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ifilonenko, I think you did some recent testing with minikube. which version should we recommend for folks running spark/k8s?

<img src="img/k8s-cluster-mode.png" title="Spark cluster components" alt="Spark cluster components" />
</p>

spark-submit can be directly used to submit a Spark application to a Kubernetes cluster. The mechanism by which this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-submit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

happens is as follows:

* Spark creates a spark driver running within a [Kubernetes pod](https://kubernetes.io/docs/concepts/workloads/pods/pod/).
* The driver creates more executors which are also Kubernetes pods and connects to them, and executes application code.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "more". which also run in Kubernetes pods and connect to the driver through a Kubernetes headless service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


The local proxy can be started by:

kubectl proxy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

kubectl proxy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Kubernetes has the concept of [namespaces](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/).
Namespaces are a way to divide cluster resources between multiple users (via resource quota). Spark on Kubernetes can
use namespaces to launch spark applications. This is through the `--kubernetes-namespace` or equivalently the
`--spark.kubernetes.namespace` argument to spark-submit.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--spark.kubernetes.namespace or --conf spark.kubernetes.namespace=?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Some of these include:
* PySpark support
* R support
* Dynamic Executor Scaling
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent use of capital letters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


kubectl logs <spark-driver-pod>

Finally, deleting the driver pod will clean up the entire spark application, includling all executors, associated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add status and logs of failed executor pods can be checked in similar ways. at the beginning of this paragraph.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Example usage is:

./sbin/build-push-docker-images.sh -r <repo> -t my-tag build
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to wrap them in a code block. Ditto for a few places below.

Copy link
Member Author

@foxish foxish Nov 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


# Prerequisites

* A spark distribution with Kubernetes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: captialize Spark


* A spark distribution with Kubernetes.
* A running Kubernetes cluster with access configured to it using
[kubectl](https://kubernetes.io/docs/user-guide/prereqs/). If you do not already have a working Kubernetes cluster,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call out k8s version required?

<tr><th>Component</th><th>Image</th></tr>
<tr>
<td>Spark Driver Image</td>
<td><code>kubespark/spark-driver:v2.3.0</code></td>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from today's sig chat, so this should change to the asf one before release?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I was referring to kubespark/ part.=

* Dynamic Executor Scaling
* Local file dependency management

You can refer to the [documentation}(https://apache-spark-on-k8s.github.io/userdocs/) if you want to try these features

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[documentation]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

</tr>
<tr>
<td><code>spark.kubernetes.driver.docker.image</code></td>
<td><code>spark-driver:2.3.0</code></td>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be like kubespark/spark-executor:v2.3.0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed all references to kubespark for now. Just encouraging building images from source till we have the discussion conclude about releasing official docker images.

Copy link
Member Author

@foxish foxish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed one round of comments.

<img src="img/k8s-cluster-mode.png" title="Spark cluster components" alt="Spark cluster components" />
</p>

spark-submit can be directly used to submit a Spark application to a Kubernetes cluster. The mechanism by which this
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

happens is as follows:

* Spark creates a spark driver running within a [Kubernetes pod](https://kubernetes.io/docs/concepts/workloads/pods/pod/).
* The driver creates more executors which are also Kubernetes pods and connects to them, and executes application code.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

<tr><th>Component</th><th>Image</th></tr>
<tr>
<td>Spark Driver Image</td>
<td><code>kubespark/spark-driver:v2.3.0</code></td>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Example usage is:

./sbin/build-push-docker-images.sh -r <repo> -t my-tag build
Copy link
Member Author

@foxish foxish Nov 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


If you have a Kubernetes cluster setup, one way to discover the apiserver URL is by executing `kubectl cluster-info`.

kubectl cluster-info
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


The local proxy can be started by:

kubectl proxy
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


kubectl logs <spark-driver-pod>

Finally, deleting the driver pod will clean up the entire spark application, includling all executors, associated
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Kubernetes has the concept of [namespaces](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/).
Namespaces are a way to divide cluster resources between multiple users (via resource quota). Spark on Kubernetes can
use namespaces to launch spark applications. This is through the `--kubernetes-namespace` or equivalently the
`--spark.kubernetes.namespace` argument to spark-submit.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Some of these include:
* PySpark support
* R support
* Dynamic Executor Scaling
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Dynamic Executor Scaling
* Local file dependency management

You can refer to the [documentation}(https://apache-spark-on-k8s.github.io/userdocs/) if you want to try these features
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

by running `kubectl auth can-i <list|create|edit|delete> pods`.
* The service account credentials used by the driver pods must have appropriate permissions
as well for editing pod spec.
* You must have [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in your cluster.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we not used to have the handy command to run to see if dns is running?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. there is an easy way for minikube, but not in general for any cluster - because one may/may not use kube-dns. There are options like coredns that people may be running as well.

<tr><th>Component</th><th>Image</th></tr>
<tr>
<td>Spark Driver Image</td>
<td><code>kubespark/spark-driver:v2.3.0</code></td>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I was referring to kubespark/ part.=

--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--kubernetes-namespace default \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: --kubernetes-namespace has been removed upstream. So this should be changed to --conf spark.kubernetes.namespace=default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

its work. Specifically, at minimum, the service account must be granted a
[`Role` or `ClusterRole`](https://kubernetes.io/docs/admin/authorization/rbac/#role-and-clusterrole) that allows driver
pods to create pods and services. By default, the driver pod is automatically assigned the `default` service account in
the namespace specified by `--kubernetes-namespace`, if no service account is specified when the pod gets created.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/--kubernetes-namespace/spark.kubernetes.namespace/.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

<td>
The namespace that will be used for running the driver and executor pods. When using
<code>spark-submit</code> in cluster mode, this can also be passed to <code>spark-submit</code> via the
<code>--kubernetes-namespace</code> command line argument.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove mentioning of --kubernetes-namespace.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Kubernetes has the concept of [namespaces](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/).
Namespaces are a way to divide cluster resources between multiple users (via resource quota). Spark on Kubernetes can
use namespaces to launch spark applications. This is through the `--kubernetes-namespace` or equivalently the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove --kubernetes-namespace here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Docker files are under the `dockerfiles/` and can be customized further before
building using the supplied script, or manually.

## Cluster Mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include running SparkR (as PySpark is also included)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a mentioning of PySpark except in Future Work, where R is also listed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that must be in user-docs. maybe we should include this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ifilonenko, this is for upstream in 2.3. We haven't put in the R or Python bits yet there - so, omitted.

</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this has been unified into spark.executor.memoryOverhead and is documented in the main repo's configuration.md.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for spark.kubernetes.driver.memoryOverhead, which has been unified into spark.driver.memoryOverhead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That config is not k8s specific right? Then we can remove this entirely I'm thinking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both are no longer k8s specific so can be removed.

Copy link
Member

@liyinan926 liyinan926 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one minor comment.

</td>
</tr>
<tr>
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already included in the official doc at https://spark.apache.org/docs/latest/configuration.html, so we can remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


declare -A path=( [spark-driver]=dockerfiles/driver/Dockerfile \
[spark-executor]=dockerfiles/executor/Dockerfile \
[spark-driver-py]=dockerfiles/driver-py/Dockerfile \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following entries should not be included in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: what about spark-base? Should we include it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - it needs to be built but not necessarily pushed. I think including it would make sense for now. Pushing it is not the worst thing. Adding.

Copy link
Member Author

@foxish foxish Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait.. just noticed that it's already being built a few lines below in the build method.

@liyinan926
Copy link
Member

LGTM. Thanks for putting it together!

@foxish
Copy link
Member Author

foxish commented Dec 12, 2017

Now in apache#19946

foxish and others added 15 commits December 18, 2017 11:29
…ith container

## What changes were proposed in this pull request?
Changes discussed in apache#19946 (comment)
docker -> container, since with CRI, we are not limited to running only docker images.

## How was this patch tested?
Manual testing

Author: foxish <ramanathana@google.com>

Closes apache#19995 from foxish/make-docker-container.
This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#19751 from vanzin/SPARK-20653.
## What changes were proposed in this pull request?

In [the environment where `/usr/sbin/lsof` does not exist](apache#19695 (comment)), `./dev/run-tests.py` for `maven` causes the following error. This is because the current `./dev/run-tests.py` checks existence of only `/usr/sbin/lsof` and aborts immediately if it does not exist.

This PR changes to check whether `lsof` or `/usr/sbin/lsof` exists.

```
/bin/sh: 1: /usr/sbin/lsof: not found

Usage:
 kill [options] <pid> [...]

Options:
 <pid> [...]            send signal to every <pid> listed
 -<signal>, -s, --signal <signal>
                        specify the <signal> to be sent
 -l, --list=[<signal>]  list all signal names, or convert one to a name
 -L, --table            list all signal names in a nice table

 -h, --help     display this help and exit
 -V, --version  output version information and exit

For more details see kill(1).
Traceback (most recent call last):
  File "./dev/run-tests.py", line 626, in <module>
    main()
  File "./dev/run-tests.py", line 597, in main
    build_apache_spark(build_tool, hadoop_version)
  File "./dev/run-tests.py", line 389, in build_apache_spark
    build_spark_maven(hadoop_version)
  File "./dev/run-tests.py", line 329, in build_spark_maven
    exec_maven(profiles_and_goals)
  File "./dev/run-tests.py", line 270, in exec_maven
    kill_zinc_on_port(zinc_port)
  File "./dev/run-tests.py", line 258, in kill_zinc_on_port
    subprocess.check_call(cmd, shell=True)
  File "/usr/lib/python2.7/subprocess.py", line 541, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '/usr/sbin/lsof -P |grep 3156 | grep LISTEN | awk '{ print $2; }' | xargs kill' returned non-zero exit status 123
```

## How was this patch tested?

manually tested

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#19998 from kiszk/SPARK-22813.
…stribution

# What changes were proposed in this pull request?
1. entrypoint.sh for Kubernetes spark-base image is marked as executable (644 -> 755)
2. make-distribution script will now create kubernetes/dockerfiles directory when Kubernetes support is compiled.

## How was this patch tested?
Manual testing

cc/ ueshin jiangxb1987 mridulm vanzin rxin liyinan926

Author: foxish <ramanathana@google.com>

Closes apache#20007 from foxish/fix-dockerfiles.
## What changes were proposed in this pull request?
Remove useless `zipWithIndex` from `ResolveAliases `.

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#20009 from gatorsmile/try22.
…never possible

## What changes were proposed in this pull request?

The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache
With CBO enabled, we can actually have a more exact estimation of the underlying table size...

## How was this patch tested?

existing test

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>
Author: Nan Zhu <nanzhu@uber.com>

Closes apache#19864 from CodingCat/SPARK-22673.
## What changes were proposed in this pull request?

Equi-height histogram is one of the state-of-the-art statistics for cardinality estimation, which can provide better estimation accuracy, and good at cases with skew data.

This PR is to improve join estimation based on equi-height histogram. The difference from basic estimation (based on ndv) is the logic for computing join cardinality and the new ndv after join.

The main idea is as follows:
1. find overlapped ranges between two histograms from two join keys;
2. apply the formula `T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1))` in each overlapped range.

## How was this patch tested?
Added new test cases.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes apache#19594 from wzhfy/join_estimation_histogram.
## What changes were proposed in this pull request?

When calling explain on a query, the output can contain sensitive information. We should provide an admin/user to redact such information.

Before this PR, the plan of SS is like this
```
== Physical Plan ==
*HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L])
+- StateStoreSave [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L])
      +- StateStoreRestore [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5]
         +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#18L])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *MapElements <function1>, obj#5: java.lang.String
                        +- *DeserializeToObject value#30.toString, obj#4: java.lang.String
                           +- LocalTableScan [value#30]
```

After this PR, we can get the following output if users set `spark.redaction.string.regex` to `file:/[\\w_]+`
```
== Physical Plan ==
*HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L])
      +- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5]
         +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#32L])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *MapElements <function1>, obj#5: java.lang.String
                        +- *DeserializeToObject value#27.toString, obj#4: java.lang.String
                           +- LocalTableScan [value#27]
```
## How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#19985 from gatorsmile/redactPlan.
## What changes were proposed in this pull request?
We could get incorrect results by running DecimalPrecision twice. This PR resolves the original found in apache#15048 and apache#14797. After this PR, it becomes easier to change it back using `children` instead of using `innerChildren`.

## How was this patch tested?
The existing test.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#20000 from gatorsmile/keepPromotePrecision.
…ies for mutable state

## What changes were proposed in this pull request?

This PR is follow-on of apache#19518. This PR tries to reduce the number of constant pool entries used for accessing mutable state.
There are two directions:
1. Primitive type variables should be allocated at the outer class due to better performance. Otherwise, this PR allocates an array.
2. The length of allocated array is up to 32768 due to avoiding usage of constant pool entry at access (e.g. `mutableStateArray[32767]`).

Here are some discussions to determine these directions.
1. [[1]](apache#19518 (comment)), [[2]](apache#19518 (comment)), [[3]](apache#19518 (comment)), [[4]](apache#19518 (comment)), [[5]](apache#19518 (comment))
2. [[6]](apache#19518 (comment)), [[7]](apache#19518 (comment)), [[8]](apache#19518 (comment))

This PR modifies `addMutableState` function in the `CodeGenerator` to check if the declared state can be easily initialized compacted into an array. We identify three types of states that cannot compacted:

- Primitive type state (ints, booleans, etc) if the number of them does not exceed threshold
- Multiple-dimensional array type
- `inline = true`

When `useFreshName = false`, the given name is used.

Many codes were ported from apache#19518. Many efforts were put here. I think this PR should credit to bdrillard

With this PR, the following code is generated:
```
/* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private InternalRow mutableRow;
/* 009 */   private boolean isNull_0;
/* 010 */   private boolean isNull_1;
/* 011 */   private boolean isNull_2;
/* 012 */   private int value_2;
/* 013 */   private boolean isNull_3;
...
/* 10006 */   private int value_4999;
/* 10007 */   private boolean isNull_5000;
/* 10008 */   private int value_5000;
/* 10009 */   private InternalRow[] mutableStateArray = new InternalRow[2];
/* 10010 */   private boolean[] mutableStateArray1 = new boolean[7001];
/* 10011 */   private int[] mutableStateArray2 = new int[1001];
/* 10012 */   private UTF8String[] mutableStateArray3 = new UTF8String[6000];
/* 10013 */
...
/* 107956 */     private void init_176() {
/* 107957 */       isNull_4986 = true;
/* 107958 */       value_4986 = -1;
...
/* 108004 */     }
...
```

## How was this patch tested?

Added a new test case to `GeneratedProjectionSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#19811 from kiszk/SPARK-18016.
…sFromConstraints

## What changes were proposed in this pull request?
The optimizer rule `InferFiltersFromConstraints` could trigger our batch  `Operator Optimizations` exceeds the max iteration limit (i.e., 100) so that the final plan might not be properly optimized. The rule `InferFiltersFromConstraints` could conflict with the other Filter/Join predicate reduction rules. Thus, we need to separate `InferFiltersFromConstraints` from the other rules.

This PR is to separate `InferFiltersFromConstraints ` from the main batch `Operator Optimizations` .

## How was this patch tested?
The existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#19149 from gatorsmile/inferFilterRule.
…uality, StackCoercion and Division

## What changes were proposed in this pull request?

Test Coverage for `WidenSetOperationTypes`, `BooleanEquality`, `StackCoercion`  and `Division`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).

## How was this patch tested?
N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes apache#20006 from wangyum/SPARK-22821.
…ion in spill

## What changes were proposed in this pull request?
Currently, the task memory manager throws an OutofMemory error when there is an IO exception happens in spill() - https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194. Similarly there any many other places in code when if a task is not able to acquire memory due to an exception we throw an OutofMemory error which kills the entire executor and hence failing all the tasks that are running on that executor instead of just failing one single task.

## How was this patch tested?

Unit tests

Author: Sital Kedia <skedia@fb.com>

Closes apache#20014 from sitalkedia/skedia/upstream_SPARK-22827.
## What changes were proposed in this pull request?

Adding date_trunc() as a built-in function.
`date_trunc` is common in other databases, but Spark or Hive does not have support for this. `date_trunc` is commonly used by data scientists and business intelligence application such as Superset (https://github.com/apache/incubator-superset).
We do have `trunc` but this only works with 'MONTH' and 'YEAR' level on the DateType input.

date_trunc() in other databases:
AWS Redshift: http://docs.aws.amazon.com/redshift/latest/dg/r_DATE_TRUNC.html
PostgreSQL: https://www.postgresql.org/docs/9.1/static/functions-datetime.html
Presto: https://prestodb.io/docs/current/functions/datetime.html

## How was this patch tested?

Unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Youngbin Kim <ykim828@hotmail.com>

Closes apache#20015 from youngbink/date_trunc.
ferdonline and others added 10 commits December 19, 2017 20:47
## What changes were proposed in this pull request?

This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.

If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see apache#9428)
In general it makes the API more complete as well.

## How was this patch tested?

Python land quick use case:

```python
>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F

>>> def f(x):
    sleep(1)
    return x*2
   ...:

>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))

>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms
Wall time: 12.2 s

>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms
Wall time: 10.3 s

>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms

>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms
Wall time: 20.3 s
```

Author: Fernando Pereira <fernando.pereira@epfl.ch>

Closes apache#19805 from ferdonline/feature_dataset_localCheckpoint.
## What changes were proposed in this pull request?

Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.

**BEFORE**

```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
       spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```

**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746

scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
|  a|
+---+
|  1|
+---+
```

## How was this patch tested?

Pass the newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#19975 from dongjoon-hyun/SPARK-22781.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.