Skip to content

Commit 81f863f

Browse files
committed
Merge branch 'upmaster' into guava-cache-to-caffeine
2 parents 706a68d + de62b5a commit 81f863f

File tree

206 files changed

+6638
-4291
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

206 files changed

+6638
-4291
lines changed

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ jobs:
199199
name: "Build modules (${{ format('{0}, {1} job', needs.configure-jobs.outputs.branch, needs.configure-jobs.outputs.type) }}): ${{ matrix.modules }}"
200200
runs-on: ubuntu-20.04
201201
container:
202-
image: dongjoon/apache-spark-github-action-image:20210602
202+
image: dongjoon/apache-spark-github-action-image:20210730
203203
strategy:
204204
fail-fast: false
205205
matrix:
@@ -266,8 +266,6 @@ jobs:
266266
- name: Run tests
267267
env: ${{ fromJSON(needs.configure-jobs.outputs.envs) }}
268268
run: |
269-
# TODO(SPARK-36345): Install mlflow>=1.0 and sklearn in Python 3.9 of the base image
270-
python3.9 -m pip install 'mlflow>=1.0' sklearn
271269
# TODO(SPARK-36361): Install coverage in Python 3.9 and PyPy 3 in the base image
272270
python3.9 -m pip install coverage
273271
pypy3 -m pip install coverage

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,15 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
206206
*
207207
* @param appId applicationId.
208208
* @param shuffleId shuffle id.
209+
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
210+
* of shuffle by an indeterminate stage attempt.
209211
* @param reduceId reduce id.
210212
* @param callback callback the handle the reply.
211213
*/
212214
public void sendMergedBlockMetaReq(
213215
String appId,
214216
int shuffleId,
217+
int shuffleMergeId,
215218
int reduceId,
216219
MergedBlockMetaResponseCallback callback) {
217220
long requestId = requestId();
@@ -222,7 +225,8 @@ public void sendMergedBlockMetaReq(
222225
handler.addRpcRequest(requestId, callback);
223226
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
224227
channel.writeAndFlush(
225-
new MergedBlockMetaRequest(requestId, appId, shuffleId, reduceId)).addListener(listener);
228+
new MergedBlockMetaRequest(requestId, appId, shuffleId, shuffleMergeId,
229+
reduceId)).addListener(listener);
226230
}
227231

228232
/**

common/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,20 @@ public class MergedBlockMetaRequest extends AbstractMessage implements RequestMe
3232
public final long requestId;
3333
public final String appId;
3434
public final int shuffleId;
35+
public final int shuffleMergeId;
3536
public final int reduceId;
3637

37-
public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int reduceId) {
38+
public MergedBlockMetaRequest(
39+
long requestId,
40+
String appId,
41+
int shuffleId,
42+
int shuffleMergeId,
43+
int reduceId) {
3844
super(null, false);
3945
this.requestId = requestId;
4046
this.appId = appId;
4147
this.shuffleId = shuffleId;
48+
this.shuffleMergeId = shuffleMergeId;
4249
this.reduceId = reduceId;
4350
}
4451

@@ -49,36 +56,39 @@ public Type type() {
4956

5057
@Override
5158
public int encodedLength() {
52-
return 8 + Encoders.Strings.encodedLength(appId) + 4 + 4;
59+
return 8 + Encoders.Strings.encodedLength(appId) + 4 + 4 + 4;
5360
}
5461

5562
@Override
5663
public void encode(ByteBuf buf) {
5764
buf.writeLong(requestId);
5865
Encoders.Strings.encode(buf, appId);
5966
buf.writeInt(shuffleId);
67+
buf.writeInt(shuffleMergeId);
6068
buf.writeInt(reduceId);
6169
}
6270

6371
public static MergedBlockMetaRequest decode(ByteBuf buf) {
6472
long requestId = buf.readLong();
6573
String appId = Encoders.Strings.decode(buf);
6674
int shuffleId = buf.readInt();
75+
int shuffleMergeId = buf.readInt();
6776
int reduceId = buf.readInt();
68-
return new MergedBlockMetaRequest(requestId, appId, shuffleId, reduceId);
77+
return new MergedBlockMetaRequest(requestId, appId, shuffleId, shuffleMergeId, reduceId);
6978
}
7079

7180
@Override
7281
public int hashCode() {
73-
return Objects.hashCode(requestId, appId, shuffleId, reduceId);
82+
return Objects.hashCode(requestId, appId, shuffleId, shuffleMergeId, reduceId);
7483
}
7584

7685
@Override
7786
public boolean equals(Object other) {
7887
if (other instanceof MergedBlockMetaRequest) {
7988
MergedBlockMetaRequest o = (MergedBlockMetaRequest) other;
80-
return requestId == o.requestId && shuffleId == o.shuffleId && reduceId == o.reduceId
81-
&& Objects.equal(appId, o.appId);
89+
return requestId == o.requestId && shuffleId == o.shuffleId &&
90+
shuffleMergeId == o.shuffleMergeId && reduceId == o.reduceId &&
91+
Objects.equal(appId, o.appId);
8292
}
8393
return false;
8494
}
@@ -89,6 +99,7 @@ public String toString() {
8999
.append("requestId", requestId)
90100
.append("appId", appId)
91101
.append("shuffleId", shuffleId)
102+
.append("shuffleMergeId", shuffleMergeId)
92103
.append("reduceId", reduceId)
93104
.toString();
94105
}

common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
152152
TransportClient reverseClient = mock(TransportClient.class);
153153
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
154154
rpcHandler, 2L, null);
155-
MergedBlockMetaRequest validMetaReq = new MergedBlockMetaRequest(19, "app1", 0, 0);
155+
MergedBlockMetaRequest validMetaReq = new MergedBlockMetaRequest(19, "app1", 0, 0, 0);
156156
requestHandler.handle(validMetaReq);
157157
assertEquals(1, responseAndPromisePairs.size());
158158
assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof MergedBlockMetaSuccess);
159159
assertEquals(2,
160160
((MergedBlockMetaSuccess) (responseAndPromisePairs.get(0).getLeft())).getNumChunks());
161161

162-
MergedBlockMetaRequest invalidMetaReq = new MergedBlockMetaRequest(21, "app1", -1, 1);
162+
MergedBlockMetaRequest invalidMetaReq = new MergedBlockMetaRequest(21, "app1", -1, 0, 1);
163163
requestHandler.handle(invalidMetaReq);
164164
assertEquals(2, responseAndPromisePairs.size());
165165
assertTrue(responseAndPromisePairs.get(1).getLeft() instanceof RpcFailure);

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
import org.apache.spark.network.client.RpcResponseCallback;
3434
import org.apache.spark.network.client.TransportClient;
3535
import org.apache.spark.network.client.TransportClientFactory;
36-
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
37-
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
38-
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
36+
import org.apache.spark.network.shuffle.checksum.Cause;
37+
import org.apache.spark.network.shuffle.protocol.*;
38+
import org.apache.spark.network.util.TransportConf;
3939

4040
/**
4141
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
@@ -46,6 +46,45 @@ public abstract class BlockStoreClient implements Closeable {
4646

4747
protected volatile TransportClientFactory clientFactory;
4848
protected String appId;
49+
protected TransportConf transportConf;
50+
51+
/**
52+
* Send the diagnosis request for the corrupted shuffle block to the server.
53+
*
54+
* @param host the host of the remote node.
55+
* @param port the port of the remote node.
56+
* @param execId the executor id.
57+
* @param shuffleId the shuffleId of the corrupted shuffle block
58+
* @param mapId the mapId of the corrupted shuffle block
59+
* @param reduceId the reduceId of the corrupted shuffle block
60+
* @param checksum the shuffle checksum which calculated at client side for the corrupted
61+
* shuffle block
62+
* @return The cause of the shuffle block corruption
63+
*/
64+
public Cause diagnoseCorruption(
65+
String host,
66+
int port,
67+
String execId,
68+
int shuffleId,
69+
long mapId,
70+
int reduceId,
71+
long checksum,
72+
String algorithm) {
73+
try {
74+
TransportClient client = clientFactory.createClient(host, port);
75+
ByteBuffer response = client.sendRpcSync(
76+
new DiagnoseCorruption(appId, execId, shuffleId, mapId, reduceId, checksum, algorithm)
77+
.toByteBuffer(),
78+
transportConf.connectionTimeoutMs()
79+
);
80+
CorruptionCause cause =
81+
(CorruptionCause) BlockTransferMessage.Decoder.fromByteBuffer(response);
82+
return cause.cause;
83+
} catch (Exception e) {
84+
logger.warn("Failed to get the corruption cause.");
85+
return Cause.UNKNOWN_ISSUE;
86+
}
87+
}
4988

5089
/**
5190
* Fetch a sequence of blocks from a remote node asynchronously,
@@ -167,6 +206,8 @@ public void pushBlocks(
167206
* @param host host of shuffle server
168207
* @param port port of shuffle server.
169208
* @param shuffleId shuffle ID of the shuffle to be finalized
209+
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
210+
* of shuffle by an indeterminate stage attempt.
170211
* @param listener the listener to receive MergeStatuses
171212
*
172213
* @since 3.1.0
@@ -175,6 +216,7 @@ public void finalizeShuffleMerge(
175216
String host,
176217
int port,
177218
int shuffleId,
219+
int shuffleMergeId,
178220
MergeFinalizerListener listener) {
179221
throw new UnsupportedOperationException();
180222
}
@@ -185,6 +227,8 @@ public void finalizeShuffleMerge(
185227
* @param host the host of the remote node.
186228
* @param port the port of the remote node.
187229
* @param shuffleId shuffle id.
230+
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
231+
* of shuffle by an indeterminate stage attempt.
188232
* @param reduceId reduce id.
189233
* @param listener the listener to receive chunk counts.
190234
*
@@ -194,6 +238,7 @@ public void getMergedBlockMeta(
194238
String host,
195239
int port,
196240
int shuffleId,
241+
int shuffleMergeId,
197242
int reduceId,
198243
MergedBlocksMetaListener listener) {
199244
throw new UnsupportedOperationException();

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ default boolean shouldLogError(Throwable t) {
5555
class BlockPushErrorHandler implements ErrorHandler {
5656
/**
5757
* String constant used for generating exception messages indicating a block to be merged
58-
* arrives too late on the server side, and also for later checking such exceptions on the
59-
* client side. When we get a block push failure because of the block arrives too late, we
60-
* will not retry pushing the block nor log the exception on the client side.
58+
* arrives too late or stale block push in the case of indeterminate stage retries on the
59+
* server side, and also for later checking such exceptions on the client side. When we get
60+
* a block push failure because of the block push being stale or arrives too late, we will
61+
* not retry pushing the block nor log the exception on the client side.
6162
*/
62-
public static final String TOO_LATE_MESSAGE_SUFFIX =
63-
"received after merged shuffle is finalized";
63+
public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
64+
"received after merged shuffle is finalized or stale block push as shuffle blocks of a"
65+
+ " higher shuffleMergeId for the shuffle is being pushed";
6466

6567
/**
6668
* String constant used for generating exception messages indicating the server couldn't
@@ -81,25 +83,54 @@ class BlockPushErrorHandler implements ErrorHandler {
8183
public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
8284
"IOExceptions exceeded the threshold";
8385

86+
/**
87+
* String constant used for generating exception messages indicating the server rejecting a
88+
* shuffle finalize request since shuffle blocks of a higher shuffleMergeId for a shuffle is
89+
* already being pushed. This typically happens in the case of indeterminate stage retries
90+
* where if a stage attempt fails then the entirety of the shuffle output needs to be rolled
91+
* back. For more details refer SPARK-23243, SPARK-25341 and SPARK-32923.
92+
*/
93+
public static final String STALE_SHUFFLE_FINALIZE_SUFFIX =
94+
"stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the"
95+
+ " shuffle is already being pushed";
96+
8497
@Override
8598
public boolean shouldRetryError(Throwable t) {
8699
// If it is a connection time-out or a connection closed exception, no need to retry.
87100
// If it is a FileNotFoundException originating from the client while pushing the shuffle
88-
// blocks to the server, even then there is no need to retry. We will still log this exception
89-
// once which helps with debugging.
101+
// blocks to the server, even then there is no need to retry. We will still log this
102+
// exception once which helps with debugging.
90103
if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
91104
t.getCause() instanceof FileNotFoundException)) {
92105
return false;
93106
}
94-
// If the block is too late, there is no need to retry it
95-
return !Throwables.getStackTraceAsString(t).contains(TOO_LATE_MESSAGE_SUFFIX);
107+
108+
String errorStackTrace = Throwables.getStackTraceAsString(t);
109+
// If the block is too late or stale block push, there is no need to retry it
110+
return !errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
96111
}
97112

98113
@Override
99114
public boolean shouldLogError(Throwable t) {
100115
String errorStackTrace = Throwables.getStackTraceAsString(t);
101-
return !errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) &&
102-
!errorStackTrace.contains(TOO_LATE_MESSAGE_SUFFIX);
116+
return !(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) ||
117+
errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
118+
}
119+
}
120+
121+
class BlockFetchErrorHandler implements ErrorHandler {
122+
public static final String STALE_SHUFFLE_BLOCK_FETCH =
123+
"stale shuffle block fetch request as shuffle blocks of a higher shuffleMergeId for the"
124+
+ " shuffle is available";
125+
126+
@Override
127+
public boolean shouldRetryError(Throwable t) {
128+
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
129+
}
130+
131+
@Override
132+
public boolean shouldLogError(Throwable t) {
133+
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
103134
}
104135
}
105136
}

0 commit comments

Comments
 (0)