Skip to content

Commit daa5b70

Browse files
author
Marcelo Vanzin
committed
[SPARK-23020][core] Fix another race in the in-process launcher test.
First the bad news: there's an unfixable race in the launcher code. (By unfixable I mean it would take a lot more effort than this change to fix it.) The good news is that it should only affect super short lived applications, such as the one run by the flaky test, so it's possible to work around it in our test. The fix also uncovered an issue with the recently added "closeAndWait()" method; closing the connection would still possibly cause data loss, so this change waits a while for the connection to finish itself, and closes the socket if that times out. The existing connection timeout is reused so that if desired it's possible to control how long to wait. As part of that I also restored the old behavior that disconnect() would force a disconnection from the child app; the "wait for data to arrive" approach is only taken when disposing of the handle. I tested this by inserting a bunch of sleeps in the test and the socket handling code in the launcher library; with those I was able to reproduce the error from the jenkins jobs. With the changes, even with all the sleeps still in place, all tests pass.
1 parent a23187f commit daa5b70

File tree

6 files changed

+83
-34
lines changed

6 files changed

+83
-34
lines changed

core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {
157157

158158
SparkAppHandle handle = null;
159159
try {
160-
handle = new InProcessLauncher()
161-
.setMaster("local")
162-
.setAppResource(SparkLauncher.NO_RESOURCE)
163-
.setMainClass(InProcessTestApp.class.getName())
164-
.addAppArgs("hello")
165-
.startApplication(listener);
160+
synchronized (InProcessTestApp.LOCK) {
161+
handle = new InProcessLauncher()
162+
.setMaster("local")
163+
.setAppResource(SparkLauncher.NO_RESOURCE)
164+
.setMainClass(InProcessTestApp.class.getName())
165+
.addAppArgs("hello")
166+
.startApplication(listener);
167+
168+
// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
169+
// we wait until we know that the connection between the app and the launcher has been
170+
// established before allowing the app to finish.
171+
final SparkAppHandle _handle = handle;
172+
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
173+
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
174+
});
175+
176+
InProcessTestApp.LOCK.wait(5000);
177+
}
166178

167179
waitFor(handle);
168180
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
@@ -193,10 +205,26 @@ public static void main(String[] args) throws Exception {
193205

194206
public static class InProcessTestApp {
195207

208+
/**
209+
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
210+
* the InProcessAppHandle to dispose of itself even before the child connection was properly
211+
* established, so no state changes would be detected for the application and its final
212+
* state would be LOST.
213+
*
214+
* It's not really possible to fix that race safely in the handle code itself without changing
215+
* the way in-process apps talk to the launcher library, so we work around that in the test by
216+
* synchronizing on this object.
217+
*/
218+
public static final Object LOCK = new Object();
219+
196220
public static void main(String[] args) throws Exception {
197221
assertNotEquals(0, args.length);
198222
assertEquals(args[0], "hello");
199223
new SparkContext().stop();
224+
225+
synchronized (LOCK) {
226+
LOCK.notifyAll();
227+
}
200228
}
201229

202230
}

launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@
1818
package org.apache.spark.launcher;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayList;
2221
import java.util.List;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2323
import java.util.concurrent.atomic.AtomicReference;
2424
import java.util.logging.Level;
2525
import java.util.logging.Logger;
2626

2727
abstract class AbstractAppHandle implements SparkAppHandle {
2828

29-
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
29+
private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());
3030

3131
private final LauncherServer server;
3232

3333
private LauncherServer.ServerConnection connection;
3434
private List<Listener> listeners;
3535
private AtomicReference<State> state;
36-
private String appId;
36+
private volatile String appId;
3737
private volatile boolean disposed;
3838

3939
protected AbstractAppHandle(LauncherServer server) {
@@ -42,9 +42,9 @@ protected AbstractAppHandle(LauncherServer server) {
4242
}
4343

4444
@Override
45-
public synchronized void addListener(Listener l) {
45+
public void addListener(Listener l) {
4646
if (listeners == null) {
47-
listeners = new ArrayList<>();
47+
listeners = new CopyOnWriteArrayList<>();
4848
}
4949
listeners.add(l);
5050
}
@@ -71,16 +71,14 @@ public void stop() {
7171

7272
@Override
7373
public synchronized void disconnect() {
74-
if (!isDisposed()) {
75-
if (connection != null) {
76-
try {
77-
connection.closeAndWait();
78-
} catch (IOException ioe) {
79-
// no-op.
80-
}
74+
if (connection != null && connection.isOpen()) {
75+
try {
76+
connection.close();
77+
} catch (IOException ioe) {
78+
// no-op.
8179
}
82-
dispose();
8380
}
81+
dispose();
8482
}
8583

8684
void setConnection(LauncherServer.ServerConnection connection) {
@@ -100,7 +98,18 @@ boolean isDisposed() {
10098
*/
10199
synchronized void dispose() {
102100
if (!isDisposed()) {
101+
// First wait for all data from the connection to be read. Then unregister the handle.
102+
// Otherwise, unregistering might cause the server to be stopped and all child connections
103+
// to be closed.
104+
if (connection != null) {
105+
try {
106+
connection.waitForClose();
107+
} catch (IOException ioe) {
108+
// no-op.
109+
}
110+
}
103111
server.unregister(this);
112+
104113
// Set state to LOST if not yet final.
105114
setState(State.LOST, false);
106115
this.disposed = true;
@@ -127,11 +136,13 @@ void setState(State s, boolean force) {
127136
current = state.get();
128137
}
129138

130-
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
131-
new Object[] { current, s });
139+
if (s != State.LOST) {
140+
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
141+
new Object[] { current, s });
142+
}
132143
}
133144

134-
synchronized void setAppId(String appId) {
145+
void setAppId(String appId) {
135146
this.appId = appId;
136147
fireEvent(true);
137148
}

launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ void monitorChild() {
112112
}
113113
}
114114

115-
disconnect();
115+
dispose();
116116
}
117117
}
118118

launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
6666
setState(State.FAILED);
6767
}
6868

69-
disconnect();
69+
dispose();
7070
});
7171

7272
app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));

launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ public void run() {
238238
};
239239
ServerConnection clientConnection = new ServerConnection(client, timeout);
240240
Thread clientThread = factory.newThread(clientConnection);
241+
clientConnection.setConnectionThread(clientThread);
241242
synchronized (clients) {
242243
clients.add(clientConnection);
243244
}
@@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection {
290291

291292
private TimerTask timeout;
292293
private volatile Thread connectionThread;
293-
volatile AbstractAppHandle handle;
294+
private volatile AbstractAppHandle handle;
294295

295296
ServerConnection(Socket socket, TimerTask timeout) throws IOException {
296297
super(socket);
297298
this.timeout = timeout;
298299
}
299300

300-
@Override
301-
public void run() {
302-
this.connectionThread = Thread.currentThread();
303-
super.run();
301+
void setConnectionThread(Thread t) {
302+
this.connectionThread = t;
304303
}
305304

306305
@Override
@@ -363,17 +362,28 @@ public void close() throws IOException {
363362
/**
364363
* Close the connection and wait for any buffered data to be processed before returning.
365364
* This ensures any changes reported by the child application take effect.
365+
*
366+
* This method allows a short period for the connection thread to finish by itself (same amount
367+
* of time as the connection timeout, which is configurable). This should be fine for
368+
* well-behaved applications, where they close the connection when the app handle detects the
369+
* app has finished.
370+
*
371+
* In case the connection is not closed within the grace period, this method forcefully closes
372+
* it and any subsequent data that may arrive will be ignored.
366373
*/
367-
public void closeAndWait() throws IOException {
368-
close();
369-
374+
public void waitForClose() throws IOException {
370375
Thread connThread = this.connectionThread;
371376
if (Thread.currentThread() != connThread) {
372377
try {
373-
connThread.join();
378+
connThread.join(getConnectionTimeout());
374379
} catch (InterruptedException ie) {
375380
// Ignore.
376381
}
382+
383+
if (connThread.isAlive()) {
384+
LOG.log(Level.WARNING, "Timed out waiting for child connection to close.");
385+
close();
386+
}
377387
}
378388
}
379389

launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) {
9494
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
9595
assertTrue(stopMsg instanceof Stop);
9696
} finally {
97-
handle.kill();
9897
close(client);
98+
handle.kill();
9999
client.clientThread.join();
100100
}
101101
}

0 commit comments

Comments
 (0)