Skip to content

Commit

Permalink
added heart beat
Browse files Browse the repository at this point in the history
  • Loading branch information
hiaw committed Dec 11, 2019
1 parent 4863e62 commit 6d79644
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
4 changes: 4 additions & 0 deletions lib/src/impl/util_write.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ void writeDataFrame(StompConnector connector, String command,
connector.writeEof();
}

void pongMessage(StompConnector connector) {
writeSimpleFrame(connector, SEND, null);
}

///Write a frame from the given stream
Future writeStreamFrame(StompConnector connector, String command,
Map<String, String> headers, Stream<List<int>> stream) {
Expand Down
5 changes: 5 additions & 0 deletions lib/src/stomp_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class _StompClient implements StompClient {
final _ErrorCallback _onError;
final _FaultCallback _onFault;

DateTime lastMessageDate = new DateTime.now();

///<String subscription-id, _Subscriber>
final Map<String, _Subscriber> _subscribers = new HashMap();

Expand Down Expand Up @@ -127,9 +129,11 @@ class _StompClient implements StompClient {

_connector
..onBytes = (List<int> data) {
lastMessageDate = DateTime.now();
_parser.addBytes(data);
}
..onString = (String data) {
lastMessageDate = DateTime.now();
_parser.addString(data);
}
..onError = (error, stackTrace) {
Expand All @@ -139,6 +143,7 @@ class _StompClient implements StompClient {
_disconnected = true;
_subscribers.clear();
_receipts.clear();
cleanTimers();
if (_onDisconnect != null) _onDisconnect(this);
};
}
Expand Down
29 changes: 29 additions & 0 deletions lib/src/stomp_util.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ part of stomp;

const int _SUB_BYTES = 0, _SUB_STRING = 1, _SUB_JSON = 2, _SUB_BLOB = 3;

Timer outgoingTimer, incomingTimer;

///The information of a subscriber
class _Subscriber {
final String id;
Expand Down Expand Up @@ -79,10 +81,37 @@ void _handleHeartbeat(_StompClient client, String heartbeat) {
sy = int.parse(heartbeat.substring(i + 1));
client.heartbeat[0] = _calcHeartbeat(client.heartbeat[0], sy);
client.heartbeat[1] = _calcHeartbeat(client.heartbeat[1], sx);
final int ttlOutgoing = client.heartbeat[0];
if (ttlOutgoing != 0) {
outgoingTimer =
Timer.periodic(new Duration(milliseconds: ttlOutgoing), (_) {
if (!client.isDisconnected) {
print("pong");
pongMessage(client._connector);
}
});
}
final int ttlIncoming = client.heartbeat[1];
if (ttlIncoming != 0) {
incomingTimer =
Timer.periodic(new Duration(milliseconds: ttlIncoming), (_) {
int delta = new DateTime.now()
.difference(client.lastMessageDate)
.inMilliseconds;
if (delta > (ttlIncoming * 2)) {
client.disconnect();
}
});
}
} catch (ex) {
// ignore silently
}
}
}

cleanTimers() {
if (outgoingTimer != null && outgoingTimer.isActive) outgoingTimer.cancel();
if (incomingTimer != null && incomingTimer.isActive) incomingTimer.cancel();
}

int _calcHeartbeat(int a, int b) => a == 0 || b == 0 ? 0 : max(a, b);

0 comments on commit 6d79644

Please sign in to comment.