diff --git a/.gitignore b/.gitignore
index debad77ec2ad3..a20fd4fc904a9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -62,6 +62,9 @@ unit-tests.log
ec2/lib/
rat-results.txt
scalastyle.txt
+conf/spark-defaults.conf.bak
+conf/*.conf
+conf/conf.cloudera.yarn
scalastyle-output.xml
R-unit-tests.log
R/unit-tests.out
diff --git a/Capfile b/Capfile
new file mode 100644
index 0000000000000..d85d15adaa432
--- /dev/null
+++ b/Capfile
@@ -0,0 +1,69 @@
+require 'bundler/setup'
+require 'capistrano_recipes/deploy/packserv'
+
+set :application, "spark"
+set :user, "deploy"
+set :shared_work_path, "/u/apps/spark/shared/work"
+set :shared_logs_path, "/u/apps/spark/shared/log"
+set :shared_conf_path, "/u/apps/spark/shared/conf"
+set :spark_jar_path, "hdfs://hadoop-production/user/sparkles"
+set :gateway, nil
+set :keep_releases, 5
+set :branch, fetch(:branch, `git symbolic-ref --short HEAD`.gsub("\s",""))
+
+DATANODES = (2..48).map {|i| "dn%02d.chi.shopify.com" % i }
+OTHERNODES = ["hadoop-etl1.chi.shopify.com", "hadoop-misc4.chi.shopify.com", "spark-etl1.chi.shopify.com", "reportify-etl4.chi.shopify.com"]
+BROKEN = [] # Node is down don't try to send code
+
+task :production do
+ role :app, *(DATANODES + OTHERNODES - BROKEN)
+ role :history, "hadoop-rm.chi.shopify.com"
+ role :uploader, "spark-etl1.chi.shopify.com"
+end
+
+namespace :deploy do
+ task :cleanup do
+ count = fetch(:keep_releases, 5).to_i
+ run "ls -1dt /u/apps/spark/releases/* | tail -n +#{count + 1} | xargs rm -rf"
+ end
+
+ task :upload_to_hdfs, :roles => :uploader, :on_no_matching_servers => :continue do
+ run "hdfs dfs -copyFromLocal -f #{release_path}/lib/spark-assembly-*.jar #{fetch(:spark_jar_path)}/spark-assembly-#{fetch(:sha)}.jar"
+ run "hdfs dfs -copyFromLocal -f #{release_path}/python/lib/pyspark.zip #{fetch(:spark_jar_path)}/pyspark-#{fetch(:sha)}.zip"
+ run "hdfs dfs -copyFromLocal -f #{release_path}/python/lib/py4j-*.zip #{fetch(:spark_jar_path)}/py4j-#{fetch(:sha)}.zip"
+ end
+
+ task :prevent_gateway do
+ set :gateway, nil
+ end
+
+ task :symlink_shared do
+ run "ln -nfs #{shared_work_path} #{release_path}/work"
+ run "ln -nfs #{shared_logs_path} #{release_path}/logs"
+ run "rm -rf #{release_path}/conf && ln -nfs #{shared_conf_path} #{release_path}/conf"
+ end
+
+ task :remind_us_to_update_starscream do
+ puts "****************************************************************"
+ puts "*"
+ puts "* Remember to update starscream/conf/config.yml"
+ puts "*"
+ puts "* spark_production"
+ puts "* conf_options:"
+ puts "* <<: *spark_remote"
+ puts "* spark.yarn.jar: \"#{fetch(:spark_jar_path)}/spark-assembly-\033[31m#{fetch(:sha)}\033[0m.jar\""
+ puts "*"
+ puts "****************************************************************"
+ end
+
+ task :restart do
+ end
+
+ after 'deploy:initialize_variables', 'deploy:prevent_gateway' # capistrano recipes packserv deploy always uses a gateway
+ before 'deploy:symlink_current', 'deploy:symlink_shared'
+ before 'deploy:test_spark_jar', 'deploy:initialize_variables'
+ before 'deploy:upload_to_hdfs', 'deploy:initialize_variables'
+ after 'deploy:unpack', 'deploy:upload_to_hdfs'
+ after 'deploy:restart', 'deploy:cleanup'
+ after 'deploy:cleanup', 'deploy:remind_us_to_update_starscream'
+end
diff --git a/Gemfile b/Gemfile
new file mode 100644
index 0000000000000..b6d208818e7e4
--- /dev/null
+++ b/Gemfile
@@ -0,0 +1,7 @@
+# A sample Gemfile
+source "https://rubygems.org"
+
+group :deploy do
+ gem 'capistrano', '~> 2'
+ gem 'capistrano-recipes', git: "git@github.com:Shopify/capistrano-recipes", ref: '57bd4ed4accc5561d4774ec2f072bb71bd1b2ea7'
+end
diff --git a/Gemfile.lock b/Gemfile.lock
new file mode 100644
index 0000000000000..1c695014d451d
--- /dev/null
+++ b/Gemfile.lock
@@ -0,0 +1,34 @@
+GIT
+ remote: git@github.com:Shopify/capistrano-recipes
+ revision: 57bd4ed4accc5561d4774ec2f072bb71bd1b2ea7
+ ref: 57bd4ed4accc5561d4774ec2f072bb71bd1b2ea7
+ specs:
+ capistrano-recipes (1.1.0)
+ capistrano (~> 2.15.5)
+ json (>= 1.8.1)
+
+GEM
+ remote: https://rubygems.org/
+ specs:
+ capistrano (2.15.5)
+ highline
+ net-scp (>= 1.0.0)
+ net-sftp (>= 2.0.0)
+ net-ssh (>= 2.0.14)
+ net-ssh-gateway (>= 1.1.0)
+ highline (1.6.21)
+ json (1.8.1)
+ net-scp (1.1.2)
+ net-ssh (>= 2.6.5)
+ net-sftp (2.1.2)
+ net-ssh (>= 2.6.5)
+ net-ssh (2.8.0)
+ net-ssh-gateway (1.2.0)
+ net-ssh (>= 2.6.5)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ capistrano (~> 2)
+ capistrano-recipes!
diff --git a/README.md b/README.md
index 380422ca00dbe..4d0202f857efc 100644
--- a/README.md
+++ b/README.md
@@ -1,14 +1,9 @@
-# Apache Spark
+# Shopify's Apache Spark
-Spark is a fast and general cluster computing system for Big Data. It provides
-high-level APIs in Scala, Java, and Python, and an optimized engine that
-supports general computation graphs for data analysis. It also supports a
-rich set of higher-level tools including Spark SQL for SQL and DataFrames,
-MLlib for machine learning, GraphX for graph processing,
-and Spark Streaming for stream processing.
-
-
+Spark is a fast and general cluster computing system for Big Data.
+This is Shopify's clone with specific to Shopify customizations, mostly
+surrounding configuration.
## Online Documentation
@@ -17,82 +12,14 @@ guide, on the [project web page](http://spark.apache.org/documentation.html)
and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).
This README file only contains basic setup instructions.
-## Building Spark
-
-Spark is built using [Apache Maven](http://maven.apache.org/).
-To build Spark and its example programs, run:
-
- build/mvn -DskipTests clean package
-
-(You do not need to do this if you downloaded a pre-built package.)
-More detailed documentation is available from the project site, at
-["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).
-
-## Interactive Scala Shell
-
-The easiest way to start using Spark is through the Scala shell:
-
- ./bin/spark-shell
-
-Try the following command, which should return 1000:
-
- scala> sc.parallelize(1 to 1000).count()
-
-## Interactive Python Shell
-
-Alternatively, if you prefer Python, you can use the Python shell:
-
- ./bin/pyspark
-
-And run the following command, which should also return 1000:
-
- >>> sc.parallelize(range(1000)).count()
-
-## Example Programs
-
-Spark also comes with several sample programs in the `examples` directory.
-To run one of them, use `./bin/run-example [params]`. For example:
-
- ./bin/run-example SparkPi
-
-will run the Pi example locally.
-
-You can set the MASTER environment variable when running examples to submit
-examples to a cluster. This can be a mesos:// or spark:// URL,
-"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run
-locally with one thread, or "local[N]" to run locally with N threads. You
-can also use an abbreviated class name if the class is in the `examples`
-package. For instance:
-
- MASTER=spark://host:7077 ./bin/run-example SparkPi
-
-Many of the example programs print usage help if no params are given.
-
-## Running Tests
-
-Testing first requires [building Spark](#building-spark). Once Spark is built, tests
-can be run using:
-
- ./dev/run-tests
-
-Please see the guidance on how to
-[run tests for a module, or individual tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).
+## Building Shopify Spark
-## A Note About Hadoop Versions
+You can build Shopify spark using `script/setup`, or continuously and incrementally using `script/watch`
-Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
-storage systems. Because the protocols have changed in different versions of
-Hadoop, you must build Spark against the same version that your cluster runs.
+## Testing Shopify Spark
-Please refer to the build documentation at
-["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
-for detailed guidance on building for a particular distribution of Hadoop, including
-building for particular Hive and Hive Thriftserver distributions. See also
-["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)
-for guidance on building a Spark application that works with a particular
-distribution.
+To test a Shopify spark build, assemble the spark jar with `script/setup` or maven, and then unset the `spark.yarn.jar` property from the defaults.conf or the config of the application you are using. Spark will then upload your local assembly to your YARN application's staging, no deploy involved.
-## Configuration
+## Deploying Shopify Spark
-Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)
-in the online documentation for an overview on how to configure Spark.
+The cap deploy script is only for deploying Shopify Spark to production. To deploy, execute `bundle exec cap production deploy`
diff --git a/SHOPIFY_HADOOP_OPTIONS b/SHOPIFY_HADOOP_OPTIONS
new file mode 100644
index 0000000000000..e51a043249fa9
--- /dev/null
+++ b/SHOPIFY_HADOOP_OPTIONS
@@ -0,0 +1 @@
+-Phadoop-2.4 -Dhadoop.version=2.6.0 -Pyarn -Phive
diff --git a/assembly/pom.xml b/assembly/pom.xml
index e9c6d26ccddc7..2acab016987d9 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -92,27 +92,6 @@
true
-
-
- org.apache.maven.plugins
- maven-antrun-plugin
-
-
- package
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
org.apache.maven.plugins
@@ -162,6 +141,27 @@
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ package
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/conf/java-opts b/conf/java-opts
new file mode 100644
index 0000000000000..c80852aa70a64
--- /dev/null
+++ b/conf/java-opts
@@ -0,0 +1 @@
+-Djava.security.krb5.realm= -Djava.security.krb5.kdc= -Djava.security.krb5.conf=/dev/null
diff --git a/conf/log4j.properties b/conf/log4j.properties
new file mode 100644
index 0000000000000..be016d0f03577
--- /dev/null
+++ b/conf/log4j.properties
@@ -0,0 +1,22 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console, file
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=WARN
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# setttings for file appender that captures more verbose output
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/tmp/spark.log
+log4j.appender.file.MaxFileSize=20MB
+log4j.appender.file.Threshold=INFO
+log4j.appender.file.MaxBackupIndex=1
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
diff --git a/conf/spark-defaults.conf b/conf/spark-defaults.conf
new file mode 100644
index 0000000000000..d4fff194c27f5
--- /dev/null
+++ b/conf/spark-defaults.conf
@@ -0,0 +1,3 @@
+# Shopify doesn't use defaults here and instead lets all the clients specify their own set of defaults.
+# This way, each client can set defaults appropriate to it, as well as change those defaults based on the environment.
+# They also don't have to care about this weird set of overridden values that is different than the defaults listed in the docs.
diff --git a/conf/spark-env.sh b/conf/spark-env.sh
new file mode 100755
index 0000000000000..4b00e86334fa2
--- /dev/null
+++ b/conf/spark-env.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+
+echoerr() { echo "$@" 1>&2; }
+FWDIR="$(cd `dirname $0`/..; pwd)"
+
+
+if [ "$(uname)" == "Darwin" ]; then
+ case "$PYTHON_ENV" in
+ 'remote_development')
+ echoerr "Sparkify: Connecting to chicago spark cluster ..."
+ # Figure out the local IP to bind spark to for shell <-> master communication
+ vpn_interface=tap0;
+ get_ip_command="ifconfig $vpn_interface 2>&1 | grep 'inet' | awk '{print \$2}'"
+ if ifconfig $vpn_interface > /dev/null 2>&1; then
+ export SPARK_LOCAL_IP=`bash -c "$get_ip_command"`
+ else
+ echoerr "ERROR: could not find an VPN interface to connect to the Shopify Spark Cluster! Please connect your VPN client! See https://vault-unicorn.shopify.com/VPN---Servers ."
+ exit 1
+ fi
+
+ export HADOOP_CONF_DIR=$FWDIR/conf/conf.cloudera.yarn
+ ;;
+ 'test'|'development')
+ export SPARK_LOCAL_IP=127.0.0.1
+ ;;
+ esac
+fi
+
+if which ipython > /dev/null; then
+ export IPYTHON=1
+fi
diff --git a/core/pom.xml b/core/pom.xml
index 95f36eb348698..edce5f6332f00 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -380,7 +380,7 @@
net.razorvine
pyrolite
- 4.4
+ 4.9
net.razorvine
diff --git a/core/src/main/java/com/bealetech/metrics/reporting/Statsd.java b/core/src/main/java/com/bealetech/metrics/reporting/Statsd.java
new file mode 100644
index 0000000000000..ab6c9f458d102
--- /dev/null
+++ b/core/src/main/java/com/bealetech/metrics/reporting/Statsd.java
@@ -0,0 +1,201 @@
+package com.bealetech.metrics.reporting;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * A client to a StatsD server.
+ */
+public class Statsd implements Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(Statsd.class);
+
+ private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+
+ private static final String DRIVER = "driver";
+ private static final String FENCED_DRIVER = "";
+ private static final String EXECUTOR = "executor";
+
+ private static final String DRIVER_MATCH = ".driver.";
+ private static final String FENCED_DRIVER_MATCH = "..";
+ private static final String EXECUTOR_MATCH = ".executor.";
+
+ public static enum StatType { COUNTER, TIMER, GAUGE }
+
+ private final String host;
+ private final int port;
+
+ private String prefix = "spark";
+ private String appPrefix = "spark.app-";
+ private String yarnAppPrefix = "spark.application_";
+
+ private boolean prependNewline = false;
+
+ private ByteArrayOutputStream outputData;
+ private DatagramSocket datagramSocket;
+ private Writer writer;
+
+ public Statsd(String host, int port) {
+ this.host = host;
+ this.port = port;
+
+ outputData = new ByteArrayOutputStream();
+ }
+
+ public void connect() throws IllegalStateException, SocketException {
+ if(datagramSocket != null) {
+ throw new IllegalStateException("Already connected");
+ }
+
+ prependNewline = false;
+
+ datagramSocket = new DatagramSocket();
+
+ outputData.reset();
+ this.writer = new BufferedWriter(new OutputStreamWriter(outputData));
+ }
+
+ public void setNamePrefix(String namePrefix) {
+ prefix = namePrefix;
+ appPrefix = namePrefix + ".app-";
+ yarnAppPrefix = namePrefix + ".application_";
+ }
+
+ private String buildMetricName(String rawName) throws IllegalArgumentException {
+ rawName = WHITESPACE.matcher(rawName).replaceAll("-");
+
+ // Non-yarn worker metrics
+ if (rawName.startsWith(appPrefix)) {
+ String[] parts = rawName.split("\\.");
+ if (parts.length < 5) {
+ throw new IllegalArgumentException("A spark app metric name must contain at least 4 parts: " + rawName);
+ }
+
+ StringBuilder stringBuilder = new StringBuilder(prefix);
+ if (DRIVER.equals(parts[2])) {
+ // e.g. spark.app-20141209201233-0145.driver.BlockManager.memory.maxMem_MB
+ stringBuilder.append(rawName.substring(rawName.indexOf(DRIVER_MATCH)));
+ } else if (EXECUTOR.equals(parts[3])) {
+ // e.g. spark.app-20141209201027-0139.31.executor.filesystem.file.read_bytes
+ stringBuilder.append(rawName.substring(rawName.indexOf(EXECUTOR_MATCH)));
+ } else if ("jvm".equals(parts[3])) {
+ // spark.app-20141212193256-0012.15.jvm.total.max
+ stringBuilder.append(rawName.substring(rawName.indexOf(".jvm.")));
+ } else {
+ throw new IllegalArgumentException("Unrecognized metric name pattern: " + rawName);
+ }
+
+ return stringBuilder.toString();
+ } else if (rawName.startsWith(yarnAppPrefix)) {
+ String[] parts = rawName.split("\\.");
+
+ StringBuilder stringBuilder = new StringBuilder(prefix);
+
+ if (DRIVER.equals(parts[2])) {
+ // e.g. spark.application_1418834509223_0044.driver.jvm.non-heap.used
+ stringBuilder.append(rawName.substring(rawName.indexOf(DRIVER_MATCH)));
+ } else if (FENCED_DRIVER.equals(parts[2])) {
+ stringBuilder.append(rawName.substring(rawName.indexOf(FENCED_DRIVER_MATCH)));
+ } else if (EXECUTOR.equals(parts[3])) {
+ // spark.app-20141212193256-0012.15.executor.filesystem.total.max
+ stringBuilder.append(rawName.substring(rawName.indexOf(EXECUTOR_MATCH)));
+ } else if ("jvm".equals(parts[3])) {
+ // spark.app-20141212193256-0012.15.jvm.total.max
+ stringBuilder.append(rawName.substring(rawName.indexOf(".jvm.")));
+ } else if ("".equals(parts[2])) {
+ stringBuilder.append(rawName.substring(rawName.indexOf("..")));
+ } else {
+ throw new IllegalArgumentException("Unrecognized metric name pattern: " + rawName);
+ }
+
+ return stringBuilder.toString();
+ }
+
+ return rawName;
+ }
+
+ public void send(String name, String value, StatType statType) throws IOException {
+ String statTypeStr = "";
+ switch (statType) {
+ case COUNTER:
+ statTypeStr = "c";
+ break;
+ case GAUGE:
+ statTypeStr = "g";
+ break;
+ case TIMER:
+ statTypeStr = "ms";
+ break;
+ }
+
+ String tags = null; // TODO: Would be nice to get the job name and job user as tags
+
+ try {
+ name = buildMetricName(name);
+ } catch (IllegalArgumentException e) {
+ logger.error("Error sending to Statsd:", e);
+ return; // Drop metrics that we can't process so we don't push metrics with app names (e.g. 20141209201233-0145)
+ }
+
+ try {
+ if (prependNewline) {
+ writer.write("\n");
+ }
+ writer.write(name);
+ writer.write(":");
+ writer.write(value);
+ writer.write("|");
+ writer.write(statTypeStr);
+ if (tags != null) {
+ writer.write("|");
+ writer.write(tags);
+ }
+ prependNewline = true;
+ writer.flush();
+ } catch (IOException e) {
+ logger.error("Error sending to Statsd:", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ DatagramPacket packet = newPacket(outputData);
+
+ packet.setData(outputData.toByteArray());
+ datagramSocket.send(packet);
+
+ if(datagramSocket != null) {
+ datagramSocket.close();
+ }
+ this.datagramSocket = null;
+ this.writer = null;
+ }
+
+ private DatagramPacket newPacket(ByteArrayOutputStream out) {
+ byte[] dataBuffer;
+
+ if (out != null) {
+ dataBuffer = out.toByteArray();
+ }
+ else {
+ dataBuffer = new byte[8192];
+ }
+
+ try {
+ return new DatagramPacket(dataBuffer, dataBuffer.length, InetAddress.getByName(this.host), this.port);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
diff --git a/core/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java b/core/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java
new file mode 100644
index 0000000000000..85bb53784546a
--- /dev/null
+++ b/core/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java
@@ -0,0 +1,302 @@
+package com.bealetech.metrics.reporting;
+
+import com.codahale.metrics.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A reporter which publishes metric values to a Statds server.
+ *
+ * @see Statsd
+ */
+public class StatsdReporter extends ScheduledReporter {
+
+ /**
+ * Returns a new {@link Builder} for {@link StatsdReporter}.
+ *
+ * @param registry the registry to report
+ * @return a {@link Builder} instance for a {@link StatsdReporter}
+ */
+ public static Builder forRegistry(MetricRegistry registry) {
+ return new Builder(registry);
+ }
+
+ /**
+ * A builder for {@link StatsdReporter} instances. Defaults to not using a prefix, using the
+ * default clock, converting rates to events/second, converting durations to milliseconds, and
+ * not filtering metrics.
+ */
+ public static class Builder {
+ private final MetricRegistry registry;
+ private String prefix;
+ private TimeUnit rateUnit;
+ private TimeUnit durationUnit;
+ private MetricFilter filter;
+
+ private Builder(MetricRegistry registry) {
+ this.registry = registry;
+ this.prefix = null;
+ this.rateUnit = TimeUnit.SECONDS;
+ this.durationUnit = TimeUnit.MILLISECONDS;
+ this.filter = MetricFilter.ALL;
+ }
+
+ /**
+ * Prefix all metric names with the given string.
+ *
+ * @param prefix the prefix for all metric names
+ * @return {@code this}
+ */
+ public Builder prefixedWith(String prefix) {
+ this.prefix = prefix;
+ return this;
+ }
+
+ /**
+ * Convert rates to the given time unit.
+ *
+ * @param rateUnit a unit of time
+ * @return {@code this}
+ */
+ public Builder convertRatesTo(TimeUnit rateUnit) {
+ this.rateUnit = rateUnit;
+ return this;
+ }
+
+ /**
+ * Convert durations to the given time unit.
+ *
+ * @param durationUnit a unit of time
+ * @return {@code this}
+ */
+ public Builder convertDurationsTo(TimeUnit durationUnit) {
+ this.durationUnit = durationUnit;
+ return this;
+ }
+
+ /**
+ * Only report metrics which match the given filter.
+ *
+ * @param filter a {@link MetricFilter}
+ * @return {@code this}
+ */
+ public Builder filter(MetricFilter filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ /**
+ * Builds a {@link StatsdReporter} with the given properties, sending metrics using the
+ * given {@link Statsd} client.
+ *
+ * @param statsd a {@link Statsd} client
+ * @return a {@link StatsdReporter}
+ */
+ public StatsdReporter build(Statsd statsd) {
+ return new StatsdReporter(registry,
+ statsd,
+ prefix,
+ filter,
+ rateUnit,
+ durationUnit);
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StatsdReporter.class);
+
+ private final Statsd statsd;
+ private final String prefix;
+
+ public StatsdReporter(MetricRegistry registry,
+ Statsd statsd,
+ String prefix,
+ MetricFilter filter,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit) {
+ super(registry, "statsd-reporter", filter, rateUnit, durationUnit);
+
+ this.statsd = statsd;
+ this.statsd.setNamePrefix(prefix);
+ this.prefix = prefix;
+ }
+
+ @Override
+ public void report(SortedMap gauges,
+ SortedMap counters,
+ SortedMap histograms,
+ SortedMap meters,
+ SortedMap timers) {
+
+ try {
+ statsd.connect();
+
+ for (Map.Entry entry : gauges.entrySet()) {
+ reportGauge(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : counters.entrySet()) {
+ reportCounter(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : histograms.entrySet()) {
+ reportHistogram(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : meters.entrySet()) {
+ reportMetered(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : timers.entrySet()) {
+ reportTimer(entry.getKey(), entry.getValue());
+ }
+
+ } catch(IOException e) {
+ LOGGER.warn("Unable to report to StatsD", statsd, e);
+ } finally {
+ try {
+ statsd.close();
+ } catch (IOException e) {
+ LOGGER.debug("Error disconnecting from StatsD server", statsd, e);
+ }
+ }
+ }
+
+ private void reportTimer(String name, Timer timer) throws IOException {
+ final Snapshot snapshot = timer.getSnapshot();
+
+ statsd.send(prefix(name, "max"),
+ format(convertDuration(snapshot.getMax())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "mean"),
+ format(convertDuration(snapshot.getMean())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "min"),
+ format(convertDuration(snapshot.getMin())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "stddev"),
+ format(convertDuration(snapshot.getStdDev())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p50"),
+ format(convertDuration(snapshot.getMedian())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p75"),
+ format(convertDuration(snapshot.get75thPercentile())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p95"),
+ format(convertDuration(snapshot.get95thPercentile())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p98"),
+ format(convertDuration(snapshot.get98thPercentile())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p99"),
+ format(convertDuration(snapshot.get99thPercentile())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p999"),
+ format(convertDuration(snapshot.get999thPercentile())),
+ Statsd.StatType.TIMER);
+
+ reportMetered(name, timer);
+ }
+
+ private void reportMetered(String name, Metered meter) throws IOException {
+ statsd.send(prefix(name, "count"), format(meter.getCount()), Statsd.StatType.GAUGE);
+ statsd.send(prefix(name, "m1_rate"),
+ format(convertRate(meter.getOneMinuteRate())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "m5_rate"),
+ format(convertRate(meter.getFiveMinuteRate())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "m15_rate"),
+ format(convertRate(meter.getFifteenMinuteRate())),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "mean_rate"),
+ format(convertRate(meter.getMeanRate())),
+ Statsd.StatType.TIMER);
+ }
+
+ private void reportHistogram(String name, Histogram histogram) throws IOException {
+ final Snapshot snapshot = histogram.getSnapshot();
+ statsd.send(prefix(name, "count"),
+ format(histogram.getCount()),
+ Statsd.StatType.GAUGE);
+ statsd.send(prefix(name, "max"),
+ format(snapshot.getMax()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "mean"),
+ format(snapshot.getMean()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "min"),
+ format(snapshot.getMin()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "stddev"),
+ format(snapshot.getStdDev()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p50"),
+ format(snapshot.getMedian()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p75"),
+ format(snapshot.get75thPercentile()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p95"),
+ format(snapshot.get95thPercentile()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p98"),
+ format(snapshot.get98thPercentile()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p99"),
+ format(snapshot.get99thPercentile()),
+ Statsd.StatType.TIMER);
+ statsd.send(prefix(name, "p999"),
+ format(snapshot.get999thPercentile()),
+ Statsd.StatType.TIMER);
+ }
+
+ private void reportCounter(String name, Counter counter) throws IOException {
+ statsd.send(prefix(name, "count"),
+ format(counter.getCount()),
+ Statsd.StatType.COUNTER);
+ }
+
+ private void reportGauge(String name, Gauge gauge) throws IOException {
+ final String value = format(gauge.getValue());
+ if (value != null) {
+ statsd.send(prefix(name), value,
+ Statsd.StatType.GAUGE);
+ }
+ }
+
+ private String format(Object o) {
+ if (o instanceof Float) {
+ return format(((Float) o).doubleValue());
+ } else if (o instanceof Double) {
+ return format(((Double) o).doubleValue());
+ } else if (o instanceof Byte) {
+ return format(((Byte) o).longValue());
+ } else if (o instanceof Short) {
+ return format(((Short) o).longValue());
+ } else if (o instanceof Integer) {
+ return format(((Integer) o).longValue());
+ } else if (o instanceof Long) {
+ return format(((Long) o).longValue());
+ }
+ return null;
+ }
+
+ private String prefix(String... components) {
+ return MetricRegistry.name(prefix, components);
+ }
+
+ private String format(long n) {
+ return Long.toString(n);
+ }
+
+ private String format(double v) {
+ return String.format(Locale.US, "%2.2f", v);
+ }
+}
diff --git a/core/src/main/java/com/shopify/metrics/reporting/LogReporter.java b/core/src/main/java/com/shopify/metrics/reporting/LogReporter.java
new file mode 100644
index 0000000000000..22e15e261e9e1
--- /dev/null
+++ b/core/src/main/java/com/shopify/metrics/reporting/LogReporter.java
@@ -0,0 +1,217 @@
+package com.shopify.metrics.reporting;
+
+import com.codahale.metrics.*;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+import org.apache.log4j.RollingFileAppender;
+import org.apache.log4j.PatternLayout;
+
+import java.io.*;
+import java.util.regex.Pattern;
+import java.nio.charset.Charset;
+import java.util.Locale;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A reporter which creates an appending logfile of the measurements for each metric
+ */
+
+public class LogReporter extends ScheduledReporter {
+
+ private static final Logger LOGGER = Logger.getLogger(LogReporter.class);
+ private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+
+ public static Builder forRegistry(MetricRegistry registry) {
+ return new Builder(registry);
+ }
+
+ public static class Builder {
+ private final MetricRegistry registry;
+ private Locale locale;
+ private TimeUnit rateUnit;
+ private TimeUnit durationUnit;
+ private Clock clock;
+ private MetricFilter filter;
+
+ private Builder(MetricRegistry registry) {
+ this.registry = registry;
+ this.locale = Locale.getDefault();
+ this.rateUnit = TimeUnit.SECONDS;
+ this.durationUnit = TimeUnit.MILLISECONDS;
+ this.clock = Clock.defaultClock();
+ this.filter = MetricFilter.ALL;
+ }
+
+ public Builder formatFor(Locale locale){
+ this.locale = locale;
+ return this;
+ }
+
+ public Builder convertRatesTo(TimeUnit rateUnit){
+ this.rateUnit = rateUnit;
+ return this;
+ }
+
+ public Builder convertDurationsTo(TimeUnit durationUnit){
+ this.durationUnit = durationUnit;
+ return this;
+ }
+
+ public Builder withClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public Builder filter(MetricFilter filter){
+ this.filter = filter;
+ return this;
+ }
+
+ public LogReporter build(String file, String maxFileSize, int maxBackupIndex) {
+ return new LogReporter(registry, file, maxFileSize, maxBackupIndex, locale, rateUnit, durationUnit, clock, filter);
+ }
+ }
+
+ private final Locale locale;
+ private final Clock clock;
+ private final Logger logger;
+
+ private LogReporter(MetricRegistry registry,
+ String file,
+ String maxFileSize,
+ int maxBackupIndex,
+ Locale locale,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit,
+ Clock clock,
+ MetricFilter filter) {
+
+ super(registry, "log-reporter", filter, rateUnit, durationUnit);
+ this.logger = Logger.getLogger("com.shopify.metrics");
+ this.logger.setAdditivity(false);
+
+ try {
+ PatternLayout layout = new PatternLayout("%d{ISO8601} %c %m%n");
+ RollingFileAppender logfile = new RollingFileAppender(layout, file);
+
+ LOGGER.info(String.format("Creating metrics output file: %s", file));
+ logfile.setMaxFileSize(maxFileSize);
+ logfile.setMaxBackupIndex(maxBackupIndex);
+
+ this.logger.setLevel(Level.INFO);
+ this.logger.addAppender(logfile);
+ } catch (IOException e) {
+ LOGGER.error("Could not add appender", e);
+ }
+
+ this.locale = locale;
+ this.clock = clock;
+ }
+
+ @Override
+ public void report(SortedMap gauges,
+ SortedMap counters,
+ SortedMap histograms,
+ SortedMap meters,
+ SortedMap timers) {
+
+ final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());
+
+ for (Map.Entry entry : gauges.entrySet()) {
+ reportGauge(timestamp, entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : counters.entrySet()) {
+ reportCounter(timestamp, entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : histograms.entrySet()) {
+ reportHistogram(timestamp, entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : meters.entrySet()) {
+ reportMeter(timestamp, entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : timers.entrySet()) {
+ reportTimer(timestamp, entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void reportTimer(long timestamp, String name, Timer timer ) {
+
+ final Snapshot snapshot = timer.getSnapshot();
+
+ report(timestamp,
+ name,
+ "count=%d max=%f mean=%f min=%f stddev= %f p50=%f p75=%f p95=%f p98=%f p99=%f p999=%f mean_rate=%f m1_rate=%f m5_rate=%f m15_rate=%f rate_unit=calls/%s duration_unit=%s",
+ timer.getCount(),
+ convertDuration(snapshot.getMax()),
+ convertDuration(snapshot.getMean()),
+ convertDuration(snapshot.getMin()),
+ convertDuration(snapshot.getStdDev()),
+ convertDuration(snapshot.getMedian()),
+ convertDuration(snapshot.get75thPercentile()),
+ convertDuration(snapshot.get95thPercentile()),
+ convertDuration(snapshot.get98thPercentile()),
+ convertDuration(snapshot.get99thPercentile()),
+ convertDuration(snapshot.get999thPercentile()),
+ convertRate(timer.getMeanRate()),
+ convertRate(timer.getOneMinuteRate()),
+ convertRate(timer.getFiveMinuteRate()),
+ convertRate(timer.getFifteenMinuteRate()),
+ getRateUnit(),
+ getDurationUnit());
+ }
+
+ private void reportMeter(long timestamp, String name, Meter meter) {
+ report(timestamp,
+ name,
+ "count=%d mean_rate=%f m1_rate=%f m5_rate=%f m15_rate=%f rate_unit=events/%s",
+ meter.getCount(),
+ convertRate(meter.getMeanRate()),
+ convertRate(meter.getOneMinuteRate()),
+ convertRate(meter.getFiveMinuteRate()),
+ convertRate(meter.getFifteenMinuteRate()),
+ getRateUnit());
+ }
+
+ private void reportHistogram(long timestamp, String name, Histogram histogram) {
+ final Snapshot snapshot = histogram.getSnapshot();
+
+ report(timestamp,
+ name,
+ "count=%d max=%d mean=%f min=%f stddev=%f p50=%f p75=%f p95=%f p98=%f p99=%f p999=%f",
+ histogram.getCount(),
+ snapshot.getMax(),
+ snapshot.getMean(),
+ snapshot.getMin(),
+ snapshot.getStdDev(),
+ snapshot.getMedian(),
+ snapshot.get75thPercentile(),
+ snapshot.get95thPercentile(),
+ snapshot.get98thPercentile(),
+ snapshot.get99thPercentile(),
+ snapshot.get999thPercentile());
+ }
+
+ private void reportGauge(long timestamp, String name, Gauge gauge){
+ report(timestamp, name, "value=%s", gauge.getValue());
+ }
+
+ private void reportCounter(long timestamp, String name, Counter counter) {
+ report(timestamp, name,"count=%d", counter.getCount());
+ }
+
+ private void report(long timestamp, String name, String line, Object... values) {
+ String metrics = String.format(line, values);
+ this.logger.info(String.format(locale, "event_at=%d %s %s", timestamp, name, metrics));
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/LogSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/LogSink.scala
new file mode 100644
index 0000000000000..ba4618ed9e53e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/LogSink.scala
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.metrics.sink
+
+import java.io.File
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+import com.shopify.metrics.reporting.LogReporter
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.metrics.MetricsSystem
+
+private[spark] class LogSink(val property: Properties, val registry: MetricRegistry,
+ securityMgr: SecurityManager) extends Sink {
+
+ val LOG_KEY_PERIOD = "period"
+ val LOG_KEY_UNIT = "unit"
+ val LOG_KEY_FILE = "file"
+ val LOG_KEY_MAX_FILE_SIZE = "maxFileSize"
+ val LOG_KEY_MAX_BACKUP_INDEX = "maxFileIndex"
+
+ val LOG_DEFAULT_PERIOD = 10
+ val LOG_DEFAULT_UNIT = "SECONDS"
+ val LOG_DEFAULT_FILE = "/tmp/metrics"
+ val LOG_DEFAULT_MAX_FILE_SIZE = "50mb"
+ val LOG_DEFAULT_BACKUP_INDEX = 10
+
+ val pollPeriod = Option(property.getProperty(LOG_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => LOG_DEFAULT_PERIOD
+ }
+
+ val pollUnit: TimeUnit = Option(property.getProperty(LOG_KEY_UNIT)) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(LOG_DEFAULT_UNIT)
+ }
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val pollFile = Option(property.getProperty(LOG_KEY_FILE)) match {
+ case Some(s) => s
+ case None => LOG_DEFAULT_FILE
+ }
+
+ val maxFileSize = Option(property.getProperty(LOG_KEY_MAX_FILE_SIZE)) match {
+ case Some(s) => s.toString
+ case None => LOG_DEFAULT_MAX_FILE_SIZE
+ }
+
+ val maxBackupIndex = Option(property.getProperty(LOG_KEY_MAX_BACKUP_INDEX)) match {
+ case Some(s) => s.toInt
+ case None => LOG_DEFAULT_BACKUP_INDEX
+ }
+
+ val reporter: LogReporter = LogReporter.forRegistry(registry)
+ .formatFor(Locale.US)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(pollFile, maxFileSize, maxBackupIndex)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+
+ override def report() {
+ reporter.report()
+ }
+ }
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
new file mode 100644
index 0000000000000..402d4968e51ec
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.InetSocketAddress
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+import com.bealetech.metrics.reporting.{Statsd, StatsdReporter}
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.metrics.MetricsSystem
+
+private[spark] class StatsdSink(val property: Properties, val registry: MetricRegistry,
+ securityMgr: SecurityManager) extends Sink {
+ val STATSD_DEFAULT_PERIOD = 10
+ val STATSD_DEFAULT_UNIT = "SECONDS"
+ val STATSD_DEFAULT_PREFIX = ""
+
+ val STATSD_KEY_HOST = "host"
+ val STATSD_KEY_PORT = "port"
+ val STATSD_KEY_PERIOD = "period"
+ val STATSD_KEY_UNIT = "unit"
+ val STATSD_KEY_PREFIX = "prefix"
+
+ def propertyToOption(prop: String) = Option(property.getProperty(prop))
+
+ if (!propertyToOption(STATSD_KEY_HOST).isDefined) {
+ throw new Exception("Statsd sink requires 'host' property.")
+ }
+
+ if (!propertyToOption(STATSD_KEY_PORT).isDefined) {
+ throw new Exception("Statsd sink requires 'port' property.")
+ }
+
+ val host = propertyToOption(STATSD_KEY_HOST).get
+ val port = propertyToOption(STATSD_KEY_PORT).get.toInt
+
+ val pollPeriod = propertyToOption(STATSD_KEY_PERIOD) match {
+ case Some(s) => s.toInt
+ case None => STATSD_DEFAULT_PERIOD
+ }
+
+ val pollUnit = propertyToOption(STATSD_KEY_UNIT) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(STATSD_DEFAULT_UNIT)
+ }
+
+ val prefix = propertyToOption(STATSD_KEY_PREFIX).getOrElse(STATSD_DEFAULT_PREFIX)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val statsd: Statsd = new Statsd(host, port)
+
+ val reporter: StatsdReporter = StatsdReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .prefixedWith(prefix)
+ .build(statsd)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+
+ override def report() {
+ try {
+ reporter.report()
+ } catch {
+ case e: NullPointerException => println("StatsD reporter errored upon exit");
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 660702f6e6fd0..3e1d1e27f728e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -420,6 +420,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}
+ executorsPendingToRemove --= knownExecutors
// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 7f06d4288c872..9426ae32150ac 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import re
+import resource
import numbers
import os
@@ -26,7 +28,7 @@
import gc
from errno import EINTR, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
-from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
+from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT, SIGPROF
from pyspark.worker import main as worker_main
from pyspark.serializers import read_int, write_int
@@ -51,12 +53,39 @@ def worker(sock):
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)
+ # Shopify added profiling signal handler
+ profiling = [False]
+
+ def handle_sigprof(*args):
+ import yappi
+
+ if not profiling[0]:
+ profiling[0] = True
+ yappi.start()
+ else:
+ profiling[0] = False
+ yappi.get_func_stats().print_all()
+ yappi.get_thread_stats().print_all()
+ signal.signal(SIGPROF, handle_sigprof)
+
+ # Blocks until the socket is closed by draining the input stream
+ # until it raises an exception or returns EOF.
+ def waitSocketClose(sock):
+ try:
+ while True:
+ # Empty string is returned upon EOF (and only then).
+ if sock.recv(4096) == '':
+ return
+ except:
+ pass
+
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
exit_code = 0
+
try:
worker_main(infile, outfile)
except SystemExit as exc:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fa8e0a0574a62..e48a0e625fb0b 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -53,6 +53,7 @@
from pyspark.traceback_utils import SCCallSiteSync
from py4j.java_collections import ListConverter, MapConverter
+from statsd import DogStatsd as statsd
__all__ = ["RDD"]
@@ -1694,10 +1695,13 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
def add_shuffle_key(split, iterator):
+ client = statsd()
buckets = defaultdict(list)
+ record_count = 0
c, batch = 0, min(10 * numPartitions, 1000)
for k, v in iterator:
+ record_count += 1
buckets[partitionFunc(k) % numPartitions].append((k, v))
c += 1
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 411b4dbf481f1..3664e3c31a126 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -161,7 +161,15 @@ def _read_with_length(self, stream):
obj = stream.read(length)
if len(obj) < length:
raise EOFError
- return self.loads(obj)
+
+ try:
+ result = self.loads(obj)
+ except TypeError as e:
+ print >>sys.stderr, "Error while decoding"
+ print >>sys.stderr, obj.encode('hex')
+ raise
+
+ return result
def dumps(self, obj):
"""
diff --git a/python/pyspark/statsd.py b/python/pyspark/statsd.py
new file mode 100644
index 0000000000000..d0c8f92ac984a
--- /dev/null
+++ b/python/pyspark/statsd.py
@@ -0,0 +1,145 @@
+"""
+DogStatsd is a Python client for DogStatsd, a Statsd fork for Datadog.
+"""
+
+import logging
+from random import random
+from time import time
+import socket
+
+try:
+ from itertools import imap
+except ImportError:
+ imap = map
+
+
+log = logging.getLogger('dogstatsd')
+
+
+class DogStatsd(object):
+
+ def __init__(self, host='localhost', port=8125):
+ """
+ Initialize a DogStatsd object.
+
+ >>> statsd = DogStatsd()
+
+ :param host: the host of the DogStatsd server.
+ :param port: the port of the DogStatsd server.
+ """
+ self._host = None
+ self._port = None
+ self.socket = None
+ self.connect(host, port)
+
+ def connect(self, host, port):
+ """
+ Connect to the statsd server on the given host and port.
+ """
+ self._host = host
+ self._port = int(port)
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.socket.connect((self._host, self._port))
+
+ def gauge(self, metric, value, tags=None, sample_rate=1):
+ """
+ Record the value of a gauge, optionally setting a list of tags and a
+ sample rate.
+
+ >>> statsd.gauge('users.online', 123)
+ >>> statsd.gauge('active.connections', 1001, tags=["protocol:http"])
+ """
+ return self._send(metric, 'g', value, tags, sample_rate)
+
+ def increment(self, metric, value=1, tags=None, sample_rate=1):
+ """
+ Increment a counter, optionally setting a value, tags and a sample
+ rate. Tags is an optional list of key value pairs separated by
+ colons.
+
+ >>> statsd.increment('page.views')
+ >>> statsd.increment('files.transferred', 124)
+ """
+ self._send(metric, 'c', value, tags, sample_rate)
+
+ def decrement(self, metric, value=1, tags=None, sample_rate=1):
+ """
+ Decrement a counter, optionally setting a value, tags and a sample
+ rate.
+
+ >>> statsd.decrement('files.remaining')
+ >>> statsd.decrement('active.connections', 2)
+ """
+ self._send(metric, 'c', -value, tags, sample_rate)
+
+ def histogram(self, metric, value, tags=None, sample_rate=1):
+ """
+ Sample a histogram value, optionally setting tags and a sample rate.
+
+ >>> statsd.histogram('uploaded.file.size', 1445)
+ >>> statsd.histogram('album.photo.count', 26, tags=["gender:female"])
+ """
+ self._send(metric, 'h', value, tags, sample_rate)
+
+ def timing(self, metric, value, tags=None, sample_rate=1):
+ """
+ Record a timing, optionally setting tags and a sample rate.
+
+ >>> statsd.timing("query.response.time", 1234)
+ """
+ self._send(metric, 'ms', value, tags, sample_rate)
+
+ def timed(self, metric, tags=None, sample_rate=1):
+ """
+ A decorator that will mesaure the distribution of a function's run time.
+ Optionally specify a list of tag or a sample rate.
+ ::
+
+ @statsd.timed('user.query.time', sample_rate=0.5)
+ def get_user(user_id):
+ # Do what you need to ...
+ pass
+
+ # Is equivalent to ...
+ start = time.time()
+ try:
+ get_user(user_id)
+ finally:
+ statsd.timing('user.query.time', time.time() - start)
+ """
+ def wrapper(func):
+ def wrapped(*args, **kwargs):
+ start = time()
+ result = func(*args, **kwargs)
+ self.timing(metric, time() - start, tags=tags, sample_rate=sample_rate)
+ return result
+ wrapped.__name__ = func.__name__
+ wrapped.__doc__ = func.__doc__
+ wrapped.__dict__.update(func.__dict__)
+ return wrapped
+ return wrapper
+
+ def set(self, metric, value, tags=None, sample_rate=1):
+ """
+ Sample a set value.
+
+ >>> statsd.set('visitors.uniques', 999)
+ """
+ self._send(metric, 's', value, tags, sample_rate)
+
+ def _send(self, metric, metric_type, value, tags, sample_rate):
+ if sample_rate != 1 and random() > sample_rate:
+ return
+
+ payload = [metric, ":", value, "|", metric_type]
+ if sample_rate != 1:
+ payload.extend(["|@", sample_rate])
+ if tags:
+ if not type(tags) is list:
+ tags = [tags]
+ payload.extend(["|#", ",".join([str(tag) for tag in tags])])
+
+ try:
+ self.socket.send("".join(imap(str, payload)))
+ except socket.error:
+ log.exception("Error submitting metric %s" % metric)
diff --git a/script/compile b/script/compile
new file mode 100755
index 0000000000000..429217d4970d8
--- /dev/null
+++ b/script/compile
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+
+set -x
+set -e
+
+FWDIR="$(cd `dirname $0`/..; pwd)"
+GIT_SHA=$(git rev-parse HEAD)
+GIT_BRANCH=$(git name-rev HEAD)
+GIT_DESC=$(git describe HEAD)
+GIT_HUMAN=$(git log --pretty=format:"%s (%an)" HEAD...HEAD~1)
+
+# Compile spark jars into dist folder
+export LOCAL_SBT_DIR=1
+export SBT_HOME=$FWDIR/sbt
+export SCALA_HOME=$HOME/.sbt/boot/scala-2.10.4
+
+# Find JAVA_HOME on OS X and Linux
+uname=$(uname)
+if [[ "$uname" == "Darwin" ]]; then
+ JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
+else
+ JAVA_HOME=/usr/lib/jvm/default-java/
+fi
+export JAVA_HOME=$JAVA_HOME
+
+./make-distribution.sh --skip-java-test $(cat SHOPIFY_HADOOP_OPTIONS)
+
+if [ "$?" != "0" ] || [ -e "$FWDIR/lib/spark-assembly*hadoop*.jar" ]; then
+ echo "Failed to make spark distro using sbt."
+ exit 1
+fi
+
+# Remove everything not in dist or conf
+find * -maxdepth 0 -name 'dist' -o -name 'conf' -prune -o -exec rm -rf '{}' ';'
+
+# Copy everything out of dist that doesn't exist already
+mv -n dist/* .
+echo $GIT_SHA > ./GIT_SHA
+echo $GIT_BRANCH >> ./GIT_DESC
+echo $GIT_DESC >> ./GIT_DESC
+echo $GIT_HUMAN >> ./GIT_DESC
diff --git a/script/get_config b/script/get_config
new file mode 100755
index 0000000000000..32ba5e9f9d901
--- /dev/null
+++ b/script/get_config
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+set -x
+set -e
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+CONFIG_PATH=$DIR/../conf/conf.cloudera.yarn
+
+rm -rf $CONFIG_PATH
+mkdir $CONFIG_PATH
+curl -o /tmp/cloudera_configs.zip http://util-ng.chi.shopify.com:7180/cmf/services/47/client-config
+unzip -j /tmp/cloudera_configs.zip -d $CONFIG_PATH
+
+# The topology script isn't available locally so we just don't run it
+perl -0777 -i -pe 's/\s*\n\s*net.topology.script.file.name<\/name>\n\s*[^<]+?<\/value>\n\s*<\/property>//igs' $CONFIG_PATH/core-site.xml
diff --git a/script/setup b/script/setup
new file mode 100755
index 0000000000000..edee707a893a6
--- /dev/null
+++ b/script/setup
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+
+set -x
+set -e
+
+if [ "$CI" = "" ]; then
+ ./script/get_config
+fi
+
+FWDIR="$(cd `dirname $0`/..; pwd)"
+export HADOOP_OPTIONS="$(cat $FWDIR/SHOPIFY_HADOOP_OPTIONS)"
+export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
+build/mvn $HADOOP_OPTIONS -DskipTests clean package
diff --git a/script/watch b/script/watch
new file mode 100755
index 0000000000000..b747765b9f635
--- /dev/null
+++ b/script/watch
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -x
+set -e
+
+FWDIR="$(cd `dirname $0`/..; pwd)"
+export HADOOP_OPTIONS="$(cat $FWDIR/SHOPIFY_HADOOP_OPTIONS)"
+export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
+mvn $HADOOP_OPTIONS scala:cc