Skip to content

Commit

Permalink
Close idle connections
Browse files Browse the repository at this point in the history
  • Loading branch information
cachapa committed Oct 28, 2023
1 parent fa0f5ff commit 0cbb6f0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 16 deletions.
41 changes: 32 additions & 9 deletions lib/tudo_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert';
import 'dart:io';

import 'package:crdt_sync/crdt_sync.dart';
import 'package:crdt_sync/crdt_sync_server.dart';
import 'package:postgres_crdt/postgres_crdt.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
Expand Down Expand Up @@ -46,12 +47,15 @@ Map<String, Query> _queries(String userId) => {
''',
}.map((table, sql) => MapEntry(table, (sql, [userId])));

// Maximum time clients can remain connected without activity
const maxIdleDuration = Duration(minutes: 5);

final minimumVersion = Version(2, 3, 4);

class TudoServer {
late final SqlCrdt _crdt;

final _clientIds = <String>{};
final _connectedClients = <CrdtSync, DateTime>{};
var _userNames = <String, String>{};

Future<void> serve({
Expand Down Expand Up @@ -168,9 +172,9 @@ class TudoServer {
}

final handler = webSocketHandler(
pingInterval: Duration(seconds: 20),
(WebSocketChannel webSocket) {
CrdtSync.server(
late CrdtSync syncClient;
syncClient = CrdtSync.server(
_crdt,
webSocket,
changesetBuilder: (
Expand All @@ -188,26 +192,45 @@ class TudoServer {
modifiedAfter: modifiedAfter),
validateRecord: _validateRecord,
onConnect: (nodeId, __) {
_clientIds.add(nodeId);
_refreshClient(syncClient);
print(
'${_getName(userId)} (${nodeId.short}): connect [${_clientIds.length}]');
'${_getName(userId)} (${nodeId.short}): connect [${_connectedClients.length}]');
},
onDisconnect: (nodeId, code, reason) {
_clientIds.remove(nodeId);
_connectedClients.remove(syncClient);
print(
'${_getName(userId)} (${nodeId.short}): disconnect [${_connectedClients.length}] $code ${reason ?? ''}');
},
onChangesetReceived: (nodeId, recordCounts) {
_refreshClient(syncClient);
print(
'${_getName(userId)} (${nodeId.short}): disconnect [${_clientIds.length}] $code ${reason ?? ''}');
'⬇️ ${_getName(userId)} (${nodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}');
},
onChangesetReceived: (nodeId, recordCounts) => print(
'⬇️ ${_getName(userId)} (${nodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}'),
onChangesetSent: (nodeId, recordCounts) => print(
'⬆️ ${_getName(userId)} (${nodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}'),
// verbose: true,
);
},
);

return await handler(request);
}

void _refreshClient(CrdtSync syncClient) {
final now = DateTime.now();
// Reset client's idle time
_connectedClients[syncClient] = now;
// Close stale connections
_connectedClients.forEach((client, lastAccess) {
final idleTime = now.difference(lastAccess);
print('idle time: ${idleTime.inSeconds}');
if (idleTime > maxIdleDuration) {
print('Closing idle client: (${syncClient.peerId!.short})');
client.close();
}
});
}

Handler _validateVersion(Handler innerHandler) => (request) {
final userAgent = request.headers[HttpHeaders.userAgentHeader]!;
final version = Version.parse(userAgent.substring(
Expand Down
8 changes: 4 additions & 4 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ packages:
dependency: "direct main"
description:
name: crdt_sync
sha256: "6472a29a20fda1d43b277b162bff494ae489814be3a926eb0364831b86deea39"
sha256: a1d0ae47104bccdc68fdc1d2a9761848d4d1a9bb7a9073c8b115bd4c7c58a69a
url: "https://pub.dev"
source: hosted
version: "1.0.8"
version: "1.0.9"
crypto:
dependency: transitive
description:
Expand Down Expand Up @@ -269,10 +269,10 @@ packages:
dependency: "direct main"
description:
name: postgres_crdt
sha256: dcf34c2ecb7428a4ddb5a341bc2c78e5c95d4fe51ef5b6f5a77ff59f75c71e96
sha256: "42c1fa401e12f5caceb56bc8b7a0df31e3b74e218ca60c30ce52e648beeabcbf"
url: "https://pub.dev"
source: hosted
version: "2.1.3"
version: "2.1.4"
pub_semver:
dependency: transitive
description:
Expand Down
6 changes: 3 additions & 3 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ environment:

dependencies:
args: ^2.4.2
crdt_sync: ^1.0.8
# path: ../../crdt_sync
crdt_sync: ^1.0.9
# path: ../../crdt/crdt_sync
http: ^1.1.0
postgres_crdt: ^2.1.3
postgres_crdt: ^2.1.4
# path: ../../crdt/postgres_crdt
rxdart: ^0.27.7
shelf: ^1.4.1
Expand Down

0 comments on commit 0cbb6f0

Please sign in to comment.