Skip to content

Commit

Permalink
实验性断线重连 #70
Browse files Browse the repository at this point in the history
Signed-off-by: liuyanghejerry <liuyanghejerry@126.com>
  • Loading branch information
liuyanghejerry committed Nov 22, 2014
1 parent f581d3c commit 5e5e15c
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 39 deletions.
122 changes: 88 additions & 34 deletions src/common/network/clientsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ ClientSocket::ClientSocket(QObject *parent) :
{
initRouter();
connect(this, &ClientSocket::newData,
this, &ClientSocket::onPending);
this, &ClientSocket::onInputPending);
connect(this, &ClientSocket::managerPack,
this, &ClientSocket::onManagerPack);
connect(this, &ClientSocket::disconnected,
this, &ClientSocket::onServerDisconnected);
connect(this, &ClientSocket::cmdPack,
[this](const QJsonObject &map) {
qDebug()<<"onCmdData"<<map;
router_.onData(map);
});
connect(loopTimer_, &QTimer::timeout,
this, &ClientSocket::processPending);
this, &ClientSocket::processInputPending);
loopTimer_->start(WAIT_TIME);

connect(heartBeatTimer_, &QTimer::timeout,
Expand All @@ -69,7 +76,7 @@ void ClientSocket::setPoolEnabled(bool on)
poolEnabled_ = on;
if(!poolEnabled_){
loopTimer_->start(WAIT_TIME);
processPending();
processInputPending();
}else{
loopTimer_->stop();
}
Expand Down Expand Up @@ -171,13 +178,6 @@ void ClientSocket::requestNewRoom(const QJsonObject &m)
state_ = REQUESTING_NEWROOM;
}

void ClientSocket::tryJoinRoom(const QString &url)
{
auto decoded_info = decodeRoomUrl(url);
setPasswd(decoded_info.passwd);
tryJoinRoom(QHostAddress(decoded_info.addr), decoded_info.port);
}

void ClientSocket::onResponseRoomList(const QJsonObject &obj)
{
if(!obj["result"].toBool())
Expand Down Expand Up @@ -287,27 +287,26 @@ void ClientSocket::onResponseLogin(const QJsonObject &map)
//
}

void ClientSocket::tryJoinRoom(const QString &url)
{
auto decoded_info = decodeRoomUrl(url);
setPasswd(decoded_info.passwd);
tryJoinRoom(QHostAddress(decoded_info.addr), decoded_info.port);
}

void ClientSocket::tryJoinRoom(const QHostAddress &addr, const int port)
{
qDebug()<<"tryJoinRoom"<<addr<<port;
close();
disconnect(this, &ClientSocket::connected, this, 0);
disconnect(this, &ClientSocket::cmdPack, this, 0);
connect(this, &ClientSocket::connected,
[this]() {
state_ = JOINING_ROOM;
QJsonObject map;
map.insert("request", QString("login"));
map.insert("name", userName());
map.insert("password", passwd());

qDebug()<<"try auto join room";
sendCmdPack(map);
state_ = JOINING_ROOM;
});
connect(this, &ClientSocket::cmdPack,
[this](const QJsonObject &map) {
qDebug()<<"onCmdData"<<map;
router_.onData(map);
});
connectToHost(addr, port);
state_ = CONNECTING_ROOM;
Expand Down Expand Up @@ -616,6 +615,61 @@ void ClientSocket::onManagerPack(const QJsonObject &data)
router_.onData(data);
}

void ClientSocket::trySendData(const QByteArray &content)
{
if(state_ == ROOM_OFFLINE) {
outputPool_.push_back(content);
} else {
if(outputPool_.count()){
outputPool_.push_back(content);
processOutputPending();
}else{
sendData(content);
}
}
}

void ClientSocket::processOutputPending()
{
while(!outputPool_.isEmpty() && state_ == ROOM_JOINED){
sendData(outputPool_.takeFirst());
// QApplication::processEvents();
}
}

void ClientSocket::onServerDisconnected()
{
qDebug()<<"server disconnected"<<state_;
switch(state_) {
case MANAGER_CONNECTED:
case REQUESTING_ROOMLIST:
case REQUESTING_NEWROOM:
state_ = MANAGER_DISCONNECTED;
break;
case ROOM_JOINED:
// TODO: what if user is kicked?
state_ = ROOM_OFFLINE;
emit roomOfflined();
tryRejoinRoom();
break;
default:
break;
}
}

void ClientSocket::tryRejoinRoom()
{
qDebug()<<"try to re-joining room...";
tryJoinRoom(address(), port());
// FIXME: what if we exit this room and go into another one?
connect(this, &ClientSocket::roomJoined,
[this](){
emit newClientId(clientId());
setPoolEnabled(false);
processOutputPending();
});
}

void ClientSocket::sendHeartbeat()
{
QJsonObject obj;
Expand All @@ -627,28 +681,28 @@ void ClientSocket::sendHeartbeat()

void ClientSocket::sendDataPack(const QByteArray &content)
{
sendData(assamblePack(true, DATA, content));
trySendData(assamblePack(true, DATA, content));
}

void ClientSocket::sendDataPack(const QJsonObject &content)
{
sendData(assamblePack(true, DATA, jsonToBuffer(content)));
trySendData(assamblePack(true, DATA, jsonToBuffer(content)));
}

void ClientSocket::sendCmdPack(const QJsonObject &content)
{
sendData(assamblePack(true, COMMAND, jsonToBuffer(content)));
trySendData(assamblePack(true, COMMAND, jsonToBuffer(content)));
}

void ClientSocket::sendManagerPack(const QJsonObject &content)
{
sendData(assamblePack(true, MANAGER, jsonToBuffer(content)));
trySendData(assamblePack(true, MANAGER, jsonToBuffer(content)));
}

void ClientSocket::cancelPendings()
{
canceled_ = true;
pool_.clear();
inputPool_.clear();
}

void ClientSocket::close()
Expand Down Expand Up @@ -768,30 +822,30 @@ QByteArray ClientSocket::assamblePack(bool compress, PACK_TYPE pt, const QByteAr
return result;
}

void ClientSocket::onPending(const QByteArray& bytes)
void ClientSocket::onInputPending(const QByteArray& bytes)
{
if(canceled_){
return;
}

if(poolEnabled_){
pool_.append(bytes);
inputPool_.append(bytes);
}else{
if(pool_.count()){
pool_.append(bytes);
processPending();
if(inputPool_.count()){
inputPool_.append(bytes);
processInputPending();
}else{
dispatch(bytes);
}
}
}

void ClientSocket::processPending()
void ClientSocket::processInputPending()
{
// qDebug()<<"process pending";
while(!pool_.isEmpty() && !canceled_){
if(dispatch(pool_.first())){
pool_.pop_front();
while(!inputPool_.isEmpty() && !canceled_){
if(dispatch(inputPool_.first())){
inputPool_.pop_front();
}else{
break;
}
Expand Down Expand Up @@ -879,7 +933,7 @@ void ClientSocket::reset()
roomname_.clear();
canvassize_ = QSize();
loopTimer_->start(WAIT_TIME);
pool_.clear();
inputPool_.clear();
// mutex_.unlock();
}

Expand Down
16 changes: 12 additions & 4 deletions src/common/network/clientsocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ClientSocket : public Socket
JOINING_ROOM,
ROOM_JOINED,
ROOM_EXITED,
ROOM_DISCONNECTED,
ROOM_OFFLINE,
ROOM_KICKED
};

Expand Down Expand Up @@ -124,6 +124,7 @@ class ClientSocket : public Socket
void roomListFetched(QHash<QString, QJsonObject>);
void roomCreated();
void roomJoined();
void roomOfflined();

void roomAboutToClose();
void layerAllCleared();
Expand All @@ -132,6 +133,8 @@ class ClientSocket : public Socket
void getKicked();
void delayGet(int);

void newClientId(const QString&);

void dataPack(const QJsonObject&);
void msgPack(const QJsonObject&);
void cmdPack(const QJsonObject&);
Expand Down Expand Up @@ -163,7 +166,8 @@ public slots:
quint64 schedualDataLength_;
quint64 leftDataLength_;
Router<> router_;
QList<QByteArray> pool_;
QList<QByteArray> inputPool_;
QList<QByteArray> outputPool_;
State state_;
QAtomicInt roomDelay_;
QTimer *loopTimer_;
Expand All @@ -184,11 +188,15 @@ private slots:
void setSchedualDataLength(quint64 length);
ParserResult parserPack(const QByteArray& data);
QByteArray assamblePack(bool compress, PACK_TYPE pt, const QByteArray& bytes);
void onPending(const QByteArray& bytes);
void processPending();
void onInputPending(const QByteArray& bytes);
void processInputPending();
bool dispatch(const QByteArray& bytes);
void onNewMessage(const QJsonObject &map);
void onManagerPack(const QJsonObject &data);
void trySendData(const QByteArray &content);
void processOutputPending();
void onServerDisconnected();
void tryRejoinRoom();
};

#endif // CLIENTSOCKET_H
6 changes: 5 additions & 1 deletion src/painttyDesktop/widgets/canvasbackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ CanvasBackend::CanvasBackend(QObject *parent)

fullspeed_replay = settings.value("canvas/fullspeed_replay",
true).toBool();

// when re-join one room, clientId should be refreshed
connect(&client_socket, &ClientSocket::newClientId,
[this] (const QString& clientId){
cached_clientid_ = clientId;
});
connect(&client_socket, &ClientSocket::dataPack,
this, &CanvasBackend::onIncomingData);
connect(this, &CanvasBackend::newDataGroup,
Expand Down

0 comments on commit 5e5e15c

Please sign in to comment.