Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ public class RssMRConfig {
public static final int RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE;

public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE;
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;

public static final String RSS_CONF_FILE = "rss_conf.xml";

public static final Set<String> RSS_MANDATORY_CLUSTER_CONF = Sets.newHashSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.storage.util.StorageType;

public class RssMRAppMaster extends MRAppMaster {
Expand Down Expand Up @@ -128,25 +130,9 @@ public static void main(String[] args) {
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);

int requiredAssignmentShuffleServersNum = conf.getInt(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
);

ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
String appId = applicationAttemptId.toString();

ShuffleAssignmentsInfo response =
client.getShuffleAssignments(
appId,
0,
numReduceTasks,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum
);

Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges = response.getServerToPartitionRanges();
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
Expand All @@ -157,40 +143,9 @@ public Thread newThread(Runnable r) {
}
}
);
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return;
}

long heartbeatInterval = conf.getLong(RssMRConfig.RSS_HEARTBEAT_INTERVAL,
RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
long heartbeatTimeout = conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2);
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
client.sendAppHeartbeat(appId, heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
}
},
heartbeatInterval / 2,
heartbeatInterval,
TimeUnit.MILLISECONDS);

JobConf extraConf = new JobConf();
extraConf.clear();
// write shuffle worker assignments to submit work directory
// format is as below:
// mapreduce.rss.assignment.partition.1:server1,server2
// mapreduce.rss.assignment.partition.2:server3,server4
// ...
response.getPartitionToServers().entrySet().forEach(entry -> {
List<String> servers = Lists.newArrayList();
for (ShuffleServerInfo server : entry.getValue()) {
servers.add(server.getHost() + ":" + server.getPort());
}
extraConf.set(RssMRConfig.RSS_ASSIGNMENT_PREFIX + entry.getKey(), StringUtils.join(servers, ","));
});

// get remote storage from coordinator if necessary
boolean dynamicConfEnabled = conf.getBoolean(RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
Expand Down Expand Up @@ -233,14 +188,80 @@ public Thread newThread(Runnable r) {
}
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
}

int requiredAssignmentShuffleServersNum = conf.getInt(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
ShuffleAssignmentsInfo response;
try {
response = RetryUtils.retry(() -> {
ShuffleAssignmentsInfo shuffleAssignments =
client.getShuffleAssignments(
appId,
0,
numReduceTasks,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum
);

Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();

if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return null;
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges.entrySet().forEach(entry -> {
client.registerShuffle(
entry.getKey(), appId, 0, entry.getValue(), remoteStorage);
});
LOG.info("Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
return shuffleAssignments;
}, retryInterval, retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
}

LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges.entrySet().forEach(entry -> {
client.registerShuffle(
entry.getKey(), appId, 0, entry.getValue(), remoteStorage);
if (response == null) {
return;
}
long heartbeatInterval = conf.getLong(RssMRConfig.RSS_HEARTBEAT_INTERVAL,
RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
long heartbeatTimeout = conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2);
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
client.sendAppHeartbeat(appId, heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
}
},
heartbeatInterval / 2,
heartbeatInterval,
TimeUnit.MILLISECONDS);

// write shuffle worker assignments to submit work directory
// format is as below:
// mapreduce.rss.assignment.partition.1:server1,server2
// mapreduce.rss.assignment.partition.2:server3,server4
// ...
response.getPartitionToServers().entrySet().forEach(entry -> {
List<String> servers = Lists.newArrayList();
for (ShuffleServerInfo server : entry.getValue()) {
servers.add(server.getHost() + ":" + server.getPort());
}
extraConf.set(RssMRConfig.RSS_ASSIGNMENT_PREFIX + entry.getKey(), StringUtils.join(servers, ","));
});
LOG.info("Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");

writeExtraConf(conf, extraConf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ public class RssSparkConfig {
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);

public static final ConfigEntry<Long> RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = createLongBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);

public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);

public static final ConfigEntry<String> RSS_COORDINATOR_QUORUM = createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM)
.doc("Coordinator quorum"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;

Expand Down Expand Up @@ -220,13 +222,23 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff

int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);

ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
appId, shuffleId, dependency.partitioner().numPartitions(),
partitionNumPerRange, assignmentTags, requiredShuffleServerNumber);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = response.getPartitionToServers();
// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
Map<Integer, List<ShuffleServerInfo>> partitionToServers;
try {
partitionToServers = RetryUtils.retry(() -> {
ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
appId, shuffleId, dependency.partitioner().numPartitions(),
partitionNumPerRange, assignmentTags, requiredShuffleServerNumber);
registerShuffleServers(appId, shuffleId, response.getServerToPartitionRanges());
return response.getPartitionToServers();
}, retryInterval, retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
}

startHeartbeat();
registerShuffleServers(appId, shuffleId, response.getServerToPartitionRanges());

LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum[" + partitionToServers.size() + "]");
return new RssShuffleHandle(shuffleId, appId, numMaps, dependency, partitionToServers, remoteStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;

Expand Down Expand Up @@ -258,17 +260,26 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<

int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);

ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
id.get(),
shuffleId,
dependency.partitioner().numPartitions(),
1,
assignmentTags,
requiredShuffleServerNumber);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = response.getPartitionToServers();

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
Map<Integer, List<ShuffleServerInfo>> partitionToServers;
try {
partitionToServers = RetryUtils.retry(() -> {
ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
id.get(),
shuffleId,
dependency.partitioner().numPartitions(),
1,
assignmentTags,
requiredShuffleServerNumber);
registerShuffleServers(id.get(), shuffleId, response.getServerToPartitionRanges());
return response.getPartitionToServers();
}, retryInterval, retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
}
startHeartbeat();
registerShuffleServers(id.get(), shuffleId, response.getServerToPartitionRanges());

LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum[" + partitionToServers.size()
+ "], shuffleServerForResult: " + partitionToServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class RssClientConfig {
public static final String RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE = "14m";
// The tags specified by rss client to determine server assignment.
public static final String RSS_CLIENT_ASSIGNMENT_TAGS = "rss.client.assignment.tags";

public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = "rss.client.assignment.retry.interval";
public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE = 65000;
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = "rss.client.assignment.retry.times";
public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE = 3;

public static final String RSS_ACCESS_TIMEOUT_MS = "rss.access.timeout.ms";
public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE = 10000;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.util;

import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryUtils {
private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);

public static <T> T retry(RetryCmd<T> cmd, long intervalMs, int retryTimes) throws Throwable {
return retry(cmd, null, intervalMs, retryTimes, null);
}

/**
* @param cmd command to execute
* @param callBack the callback command executed when the attempt of command fail
* @param intervalMs retry interval
* @param retryTimes retry times
* @param exceptionClasses exception classes which need to be retry, null for all.
* @param <T> return type
* @return
* @throws Throwable
*/
public static <T> T retry(RetryCmd<T> cmd, RetryCallBack callBack, long intervalMs,
int retryTimes, Set<Class> exceptionClasses) throws Throwable {
int retry = 0;
while (true) {
try {
T ret = cmd.execute();
return ret;
} catch (Throwable t) {
retry++;
if ((exceptionClasses != null && !isInstanceOf(exceptionClasses, t)) || retry >= retryTimes) {
throw t;
} else {
LOG.info("Retry due to Throwable, " + t.getClass().getName() + " " + t.getMessage());
LOG.info("Waiting " + intervalMs + " milliseconds before next connection attempt.");
Thread.sleep(intervalMs);
if (callBack != null) {
callBack.execute();
}
}
}
}
}

private static boolean isInstanceOf(Set<Class> classes, Throwable t) {
for (Class c : classes) {
if (c.isInstance(t)) {
return true;
}
}
return false;
}

public interface RetryCmd<T> {
T execute() throws Throwable;
}

public interface RetryCallBack {
void execute() throws Throwable;
}
}
Loading