Skip to content

Commit

Permalink
[SPARK-25299] Add an implementation of the SPARK-25299 shuffle storag…
Browse files Browse the repository at this point in the history
…e plugin that asynchronously backs up shuffle data to remote storage (#646)

* Add an implementation of the SPARK-25299 shuffle storage plugin that asynchronously backs up shuffle data to remote storage.

* Don't write dependency reduced pom for async shuffle upload core.

Seems to cause the shade plugin to hang for this particular module for some reason...

* Start adding javadoc

* Rename a class, add more docs

* More docs

* More docs. Rename another s3 -> hadoop reference.

* Remove trailing period

* More documentation.

* S3 -> Hadoop again

* Move a bunch of references  from S3 -> Hadoop or remote

* Fix build

* Add async-shuffle-upload-core to bom

* Try using a create method from the scala object

Something strange with a NoMethodDefError?
  • Loading branch information
mccheah authored Mar 10, 2020
1 parent f0be092 commit 9903e96
Show file tree
Hide file tree
Showing 108 changed files with 10,525 additions and 13 deletions.
4 changes: 3 additions & 1 deletion FORK.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
* yarn: YarnClusterSchedulerBackend, YarnSchedulerBackend

* [SPARK-26626](https://issues.apache.org/jira/browse/SPARK-26626) - Limited the maximum size of repeatedly substituted aliases
* [SPARK-25299](https://issues.apache.org/jira/browse/SPARK-25299) - Adds the complete plugin tree for shuffle byte storage

# Added

* Gradle plugin to easily create custom docker images for use with k8s
* Filter rLibDir by exists so that daemon.R references the correct file [460](https://github.com/palantir/spark/pull/460)
* Implementation of the shuffle I/O plugins from SPARK-25299 that asynchronously backs up shuffle files to remote storage

# Reverted
* [SPARK-25908](https://issues.apache.org/jira/browse/SPARK-25908) - Removal of `monotonicall_increasing_id`, `toDegree`, `toRadians`, `approxCountDistinct`, `unionAll`
Expand All @@ -35,4 +37,4 @@
* [SPARK-26580](https://issues.apache.org/jira/browse/SPARK-26580) - Bring back scala 2.11 behaviour of primitive types null behaviour
* [SPARK-26133](https://issues.apache.org/jira/browse/SPARK-26133) - Old OneHotEncoder
* [SPARK-11215](https://issues.apache.org/jira/browse/SPARK-11215) - StringIndexer multi column support
* [SPARK-26616](https://issues.apache.org/jira/browse/SPARK-26616) - No document frequency in IDFModel
* [SPARK-26616](https://issues.apache.org/jira/browse/SPARK-26616) - No document frequency in IDFModel
131 changes: 131 additions & 0 deletions async-shuffle-upload/async-shuffle-upload-api/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-async-shuffle-upload-api_2.11</artifactId>
<properties>
<sbt.project.name>async-shuffle-upload-api</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Async Shuffle Upload API</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>builder</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>safe-logging</artifactId>
</dependency>
<dependency>
<groupId>com.palantir.safe-logging</groupId>
<artifactId>preconditions</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-async-shuffle-upload-immutables_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- Disable Scala compilation for modules that don't have Scala. This is because they possibly break our usage of Immutables(?) -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>eclipse-add-source</id>
<phase>none</phase>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>none</phase>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>none</phase>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<skipMain>false</skipMain> <!-- skip compile -->
<skip>false</skip> <!-- skip testCompile -->
</configuration>
</plugin>
</plugins>
</build>
</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.palantir.shuffle.async.api;

public final class SparkShuffleApiConstants {

// Identifiers used by the spark shuffle plugin
public static final String SHUFFLE_BASE_URI_CONF = "spark.shuffle.hadoop.async.base-uri";

public static final String SHUFFLE_PLUGIN_APP_NAME_CONF = "spark.shuffle.hadoop.async.appName";
// Deprecated configurations are picked up as fallbacks when their newer counterparts are not
// specified.
public static final String SHUFFLE_PLUGIN_APP_NAME_CONF_DEPRECATED =
"spark.plugin.shuffle.async.appName";

public static final String SHUFFLE_S3A_CREDS_FILE_CONF =
"spark.shuffle.hadoop.async.s3a.credsFile";
public static final String SHUFFLE_S3A_CREDS_FILE_CONF_DEPRECATED =
"spark.plugin.shuffle.async.s3a.credsFile";

public static final String SHUFFLE_S3A_ENDPOINT_CONF =
"spark.shuffle.hadoop.async.s3a.endpoint";

public static final String METRICS_FACTORY_CLASS_CONF =
"spark.shuffle.hadoop.async.metrics.factory.class";
public static final String METRICS_FACTORY_CLASS_CONF_DEPRECATED =
"spark.plugin.shuffle.async.metricsFactoryClass";

private SparkShuffleApiConstants() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.palantir.shuffle.async.api;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import org.immutables.value.Value;

import org.apache.spark.palantir.shuffle.async.immutables.ImmutablesStyle;

/**
* Structure for holding AWS credentials for accessing Amazon S3.
* <p>
* Allowing using a file to store AWS credentials when S3a is used as the backing store for
* shuffle files. The path to a file holding the credentials is specified via
* {@link SparkShuffleApiConstants#SHUFFLE_S3A_CREDS_FILE_CONF}.
*/
@ImmutablesStyle
@Value.Immutable
@JsonSerialize(as = ImmutableSparkShuffleAwsCredentials.class)
@JsonDeserialize(as = ImmutableSparkShuffleAwsCredentials.class)
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class SparkShuffleAwsCredentials {

private static final ObjectMapper MAPPER = new ObjectMapper();

public abstract String accessKeyId();

public abstract String secretAccessKey();

public abstract String sessionToken();

public final byte[] toBytes() {
try {
return MAPPER.writeValueAsString(this).getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static SparkShuffleAwsCredentials fromBytes(byte[] bytes) {
try {
return MAPPER.readValue(
new String(bytes, StandardCharsets.UTF_8), SparkShuffleAwsCredentials.class);
} catch (IOException e) {
throw new SafeIllegalArgumentException(
"Could not deserialize bytes as AWS credentials.",
UnsafeArg.of("cause", e));
}
}

public static ImmutableSparkShuffleAwsCredentials.Builder builder() {
return ImmutableSparkShuffleAwsCredentials.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.palantir.shuffle.async.api;

import java.nio.charset.StandardCharsets;

import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public final class SparkShuffleAwsCredentialsSuite {

@Test
public void testSerialize() {
SparkShuffleAwsCredentials creds = SparkShuffleAwsCredentials.builder()
.accessKeyId("access-key")
.secretAccessKey("secret-key")
.sessionToken("session-token")
.build();
byte[] bytes = creds.toBytes();
assertThat(new String(bytes, StandardCharsets.UTF_8))
.isEqualTo("{\"accessKeyId\":\"access-key\","
+ "\"secretAccessKey\":\"secret-key\","
+ "\"sessionToken\":\"session-token\"}");
}

@Test
public void testDeserialize() {
String serializedString = "{\"accessKeyId\":\"access-key\","
+ "\"secretAccessKey\":\"secret-key\","
+ "\"sessionToken\":\"session-token\"}";

SparkShuffleAwsCredentials creds =
SparkShuffleAwsCredentials.fromBytes(serializedString.getBytes(StandardCharsets.UTF_8));
}
}
Loading

0 comments on commit 9903e96

Please sign in to comment.