Skip to content

Commit 06feaef

Browse files
Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886
1 parent 9bbb449 commit 06feaef

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

CHANGES.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
2.2.10
2+
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
23
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
34
* Coalescing strategy sleeps too much (CASSANDRA-13090)
45
* Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)

src/java/org/apache/cassandra/streaming/ConnectionHandler.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,11 @@ public void initiate() throws IOException
8282
{
8383
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
8484
Socket incomingSocket = session.createConnection();
85-
incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
86-
incoming.sendInitMessage(incomingSocket, true);
85+
incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
8786

8887
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
8988
Socket outgoingSocket = session.createConnection();
90-
outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
91-
outgoing.sendInitMessage(outgoingSocket, false);
89+
outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
9290
}
9391

9492
/**
@@ -159,13 +157,15 @@ abstract static class MessageHandler implements Runnable
159157

160158
protected int protocolVersion;
161159
protected Socket socket;
160+
private final boolean isOutgoingHandler;
162161

163162
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
164163
private IncomingStreamingConnection incomingConnection;
165164

166-
protected MessageHandler(StreamSession session)
165+
protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
167166
{
168167
this.session = session;
168+
this.isOutgoingHandler = isOutgoingHandler;
169169
}
170170

171171
protected abstract String name();
@@ -187,14 +187,14 @@ protected static ReadableByteChannel getReadChannel(Socket socket) throws IOExce
187187
}
188188

189189
@SuppressWarnings("resource")
190-
public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
190+
public void sendInitMessage() throws IOException
191191
{
192192
StreamInitMessage message = new StreamInitMessage(
193193
FBUtilities.getBroadcastAddress(),
194194
session.sessionIndex(),
195195
session.planId(),
196196
session.description(),
197-
isForOutgoing,
197+
!isOutgoingHandler,
198198
session.keepSSTableLevel(),
199199
session.isIncremental());
200200
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOExcep
203203
out.flush();
204204
}
205205

206-
public void start(IncomingStreamingConnection connection, int protocolVersion)
206+
public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
207207
{
208208
this.incomingConnection = connection;
209-
start(connection.socket, protocolVersion);
209+
start(connection.socket, protocolVersion, false);
210210
}
211211

212-
public void start(Socket socket, int protocolVersion)
212+
public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
213213
{
214214
this.socket = socket;
215215
this.protocolVersion = protocolVersion;
216+
if (initiator)
217+
sendInitMessage();
216218

217219
new Thread(this, name() + "-" + session.peer).start();
218220
}
@@ -270,7 +272,7 @@ static class IncomingMessageHandler extends MessageHandler
270272
{
271273
IncomingMessageHandler(StreamSession session)
272274
{
273-
super(session);
275+
super(session, false);
274276
}
275277

276278
protected String name()
@@ -330,7 +332,7 @@ public int compare(StreamMessage o1, StreamMessage o2)
330332

331333
OutgoingMessageHandler(StreamSession session)
332334
{
333-
super(session);
335+
super(session, true);
334336
}
335337

336338
protected String name()

0 commit comments

Comments
 (0)