-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WebSocket unimplemented BINARY message handling can result in TEXT message delivery to fail #5193
Comments
WebSock...Data0_0 is the first message, WebSocket_Data1_0 is the second message, IESWebSocketClnt$WebSocketEventHandler is the client that complete @websocket |
9.4.19.v20190610 is fine, so i suspect it may be some bugs in the new versions. |
I‘m lost now,because I find this bug occur when the server is netty, but not occur in tomcat server. |
I can't see anything strange going on in that debug log, the first frame with Are you using the Javax or the Jetty websocket API? |
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class IESWebSocketClnt {
private static final SouthLog LOGGER = SouthLogFactory.getLogger(IESWebSocketClnt.class);
public static final int WEBSOCKET_TIMEOUT = 15;
private WebSocketClient webSocketClient;
private Session session;
LogicalConnection connection;
private WebSocketSender wsSender;
private String wsUrl;
private WebSocketEventHandler handler;
private ClientUpgradeRequest request;
private String sessionId;
private RedisData redisData;
private String basicLogInfo = null;
private int statusCode = WebSocketUtil.SWITCHING_PROTOCOLS;
private boolean openWebsocketFlag = false;
public IESWebSocketClnt(String wsUrl, WebSocketClient client, String sessionId) {
this(wsUrl, client, sessionId, new ClientUpgradeRequest());
}
public IESWebSocketClnt(String wsUrl, WebSocketClient client, String sessionId, ClientUpgradeRequest request) {
this.sessionId = sessionId;
this.webSocketClient = client;
this.wsUrl = wsUrl;
this.request = request;
this.handler = new WebSocketEventHandler();
this.basicLogInfo = "start onMessage()!!!,sessionId=" + sessionId + ",Get message from " + wsUrl
+ ",message: ******";
this.open();
}
public void disConnection(int status, String reason) {
if (session != null) {
if (session.isOpen()) {
session.close(status, reason);
if (connection != null) {
connection.disconnect();
}
}
}
}
public Session getConnection() {
return IESWebSocketClnt.this.session;
}
public WebSocketSender getSender() {
return IESWebSocketClnt.this.wsSender;
}
public String getUrl() {
return this.wsUrl;
}
private void open() {
try {
URI echoUri = new URI(wsUrl);
webSocketClient.connect(handler, echoUri, request);
webSocketClient.setMaxTextMessageBufferSize(Integer.MAX_VALUE);
handler.awaitClose(WEBSOCKET_TIMEOUT, TimeUnit.SECONDS);
} catch (IOException e) {
openWebsocketFlag = false;
LOGGER.error("Other Exception");
} catch (InterruptedException e) {
openWebsocketFlag = false;
LOGGER.error("InterruptedException=", e);
} catch (URISyntaxException e) {
openWebsocketFlag = false;
LOGGER.error("URISyntaxException=", e);
} catch (Exception e) {
openWebsocketFlag = false;
LOGGER.error("client open exception : {},", e.getMessage());
}
}
public void clientStop(WebSocketClient client) {
try {
if (client != null) {
client.stop();
}
} catch (Exception e1) {
LOGGER.error("client.stop() Exception");
}
}
@WebSocket
public class WebSocketEventHandler {
private final CountDownLatch closeLatch;
public WebSocketEventHandler() {
this.closeLatch = new CountDownLatch(1);
}
public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
return this.closeLatch.await(duration, unit);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
if (connection != null) {
if (connection.isOpen()) {
connection.disconnect();
}
}
LOGGER.warn("websocket closed!!!SsId={},url: {},type:{},infomation:{}", sessionId, wsUrl,
String.valueOf(statusCode), reason);
this.closeLatch.countDown();
}
@OnWebSocketConnect
public void onConnect(Session session1) {
LOGGER.warn("websocket is opened!!!,ssId={}, url: {}", sessionId, wsUrl);
IESWebSocketClnt.this.session = session1;
connection = ((WebSocketSession) session1).getConnection();
openWebsocketFlag = true;
WebSocketSender websocketSender = new WebSocketSender(session1);
IESWebSocketClnt.this.wsSender = websocketSender;
this.closeLatch.countDown();
}
@OnWebSocketError
public void onError(Throwable thr) {
if (thr instanceof UpgradeException) {
statusCode = ((UpgradeException) thr).getResponseStatusCode();
LOGGER.error("websocket is onError!!!,ssId={}, url:{}, httpcode : {}. ", sessionId, wsUrl, statusCode);
LOGGER.error("websocket is onError!!!UpgradeException: ", thr);
} else {
LOGGER.error("websocket is onError!!!,ssId={}, url:{}.", sessionId, wsUrl);
LOGGER.error("websocket is onError!!!error: ", thr);
}
openWebsocketFlag = false;
this.closeLatch.countDown();
}
@OnWebSocketMessage
public void onMessage(String message) {
LOGGER.warn(basicLogInfo + ". " + message);
updateWSStatus();
MsgQueue.addtoMsgQueue(sessionId, message);
LOGGER.info("end onMessage()");
}
@OnWebSocketFrame
public void onFrame(Frame frame) {
if ("PONG".equals(frame.getType().toString())) {
LOGGER.warn("onFrame()!!!, PONG url = {}", wsUrl);
updateWSStatus();
}
if ("PING".equals(frame.getType().toString())) {
updateWSStatus();
}
}
private void updateWSStatus() {
WebsocketTunnelStatus websocketTunnelStatus = IESWebSocketClntManager.getSessionIdAndStatus()
.get(sessionId);
if (websocketTunnelStatus == null) {
websocketTunnelStatus = new WebsocketTunnelStatus();
} else {
websocketTunnelStatus.updateLstRcvTime();
}
IESWebSocketClntManager.getSessionIdAndStatus().put(sessionId, websocketTunnelStatus);
}
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public boolean getOpenWebsocketFlag() {
return openWebsocketFlag;
}
public void setOpenWebsocketFlag(boolean openWebsocketFlag) {
this.openWebsocketFlag = openWebsocketFlag;
}
public WebSocketClient getWebSocketClient() {
return webSocketClient;
}
public void setWebSocketClient(WebSocketClient webSocketClient) {
this.webSocketClient = webSocketClient;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public WebSocketSender getWsSender() {
return wsSender;
}
public void setWsSender(WebSocketSender wsSender) {
this.wsSender = wsSender;
}
public String getWsUrl() {
return wsUrl;
}
public void setWsUrl(String wsUrl) {
this.wsUrl = wsUrl;
}
public WebSocketEventHandler getHandler() {
return handler;
}
public void setHandler(WebSocketEventHandler handler) {
this.handler = handler;
}
public IESWebSocketClnt() {
}
public RedisData getRedisData() {
return redisData;
}
public void setRedisData(RedisData redisData) {
this.redisData = redisData;
}
public int getStatusCode() {
return statusCode;
}
} |
The only strange going on is that the AbstractEventDriver seem not callback the onMessage func in the fisrt message,but then callback fine. |
@flymoondust can you create a minimal reproduction test case? I cant seem to reproduce this same issue. I can reproduce all those same jetty debug logs to happen, but the |
the server is a little complecated, it's implemented by netty, and nginx front of it.I can compare some different scene and get the debug log.wait for me to obtain the log.I'll look for the netty code of server,try to find how it response. |
@flymoondust I don't think it would be a problem with the server if we have received and parsed the text frame. We know the frame gets to @joakime any idea how to debug this? |
Why do you have ... @OnWebSocketConnect
public void onConnect(Session session1) {
...
this.closeLatch.countDown(); // <-- this line in onConnect?
} You seem to be in a race to close the connection quickly after it's open? |
i think this line would not cause the connection closed.I just use the closeLatch for timing. |
Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
I created a PR #5202 that adds way more logging to the EventDriver layer. @flymoondust can you build Jetty from branch |
|
the different is the first message don't call func onTextFram(),but i dont know why |
|
i add a additional log and found that when the first message come, the activeMessage=org.eclipse.jetty.websocket.common.message.NullMessage@3939a75,then the activeMessage=null |
Ok that is strange... Ahhh ok I think I see the issue now, there is a bug when we assign the I will put up a PR with some new tests and a fix for this. |
So the scene is that when a binary message comes before the first text message, then the bug will happen? |
And i'm confused why jetty 9.4.19.v20190610 not has the bug? |
@flymoondust Yes, you must have received a binary fin frame just before the text frame. The bug is triggered because you do not have the binary annotation registered, that is when
The bug was introduced in PR #3885 which was released with jetty 9.4.20. So that explains why you do not have the bug in 9.4.19. |
@flymoondust as a work around you could just register a dummy binary @OnWebSocketMessage
public void onMessage(byte[] array, int offset, int length)
{
} |
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
…tion Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
thanks, i'll try the binary method to try to fix this. |
yeah, this method work fine, the bug is fix by adding the following code
|
Good analysis! But wouldn't his new debug logging output show this errant onBinaryFrame or onBinaryMessage (from the logging branch I created)? |
@flymoondust can you check the full logs you had to confirm whether you received a binary frame right before your problem occurred? |
|
yeah, binary message can be found in the old debug log,it's a message of 'success'. |
Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
Issue #5193 - fix lost websocket messages when only one OnWebSocketMessage annotation used
Fixed with PR #5208. |
Jetty version 9.4.27.v20200227 to 9.4.31
Java version
OS type/version
Description
When the websocket is established, the client always miss the first message from the server,then it work fine.
Following is the debug log:
The text was updated successfully, but these errors were encountered: