Skip to content

Commit

Permalink
Fix distribution of existing information when joining an existing list
Browse files Browse the repository at this point in the history
  • Loading branch information
cachapa committed Aug 17, 2023
1 parent d479ff7 commit b0149da
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 143 deletions.
4 changes: 4 additions & 0 deletions lib/extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ extension StringX on String {
extension RequestX on Request {
Map<String, String> get queryParameters => requestedUri.queryParameters;
}

extension DateTimeX on DateTime {
String get toUtcString => toUtc().toIso8601String();
}
176 changes: 103 additions & 73 deletions lib/tudo_server.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:crdt_sync/crdt_sync.dart';
Expand All @@ -13,10 +14,42 @@ import 'package:web_socket_channel/web_socket_channel.dart';
import 'db_util.dart';
import 'extensions.dart';

const _queries = {
'users': '''
SELECT users.id, users.name, users.is_deleted, users.hlc FROM
(SELECT user_id, max(created_at) AS created_at FROM
(SELECT list_id FROM user_lists WHERE user_id = ?1 AND is_deleted = 0) AS list_ids
JOIN user_lists ON user_lists.list_id = list_ids.list_id
GROUP BY user_lists.user_id
) AS user_ids
JOIN users ON users.id = user_ids.user_id
''',
'user_lists': '''
SELECT user_lists.list_id, user_id, position, user_lists.created_at, is_deleted, hlc FROM
(SELECT list_id, created_at FROM user_lists WHERE user_id = ?1) AS own_lists
JOIN user_lists ON own_lists.list_id = user_lists.list_id
''',
'lists': '''
SELECT * FROM (SELECT lists.id, lists.name, lists.color, lists.creator_id,
lists.created_at, lists.is_deleted, lists.hlc, lists.node_id,
CASE WHEN lists.modified > user_lists.modified THEN lists.modified ELSE user_lists.modified END AS modified
FROM user_lists
JOIN lists ON list_id = lists.id AND user_id = ?1 AND user_lists.is_deleted = 0) a
''',
'todos': '''
SELECT * FROM (SELECT todos.id, todos.list_id, todos.name, todos.done, todos.position,
todos.creator_id, todos.created_at, todos.done_at, todos.done_by,
todos.is_deleted, todos.hlc, todos.node_id,
CASE WHEN todos.modified > user_lists.modified THEN todos.modified ELSE user_lists.modified END AS modified
FROM user_lists
JOIN todos ON user_lists.list_id = todos.list_id AND user_id = ?1 AND user_lists.is_deleted = 0) a
''',
};

class TudoServer {
final String? apiSecret;
late final SqlCrdt _crdt;
late final CrdtSyncServer _crdtSync;
late final CrdtSyncServer _syncServer;

TudoServer(this.apiSecret);

Expand All @@ -28,22 +61,29 @@ class TudoServer {
String? dbUsername,
String? dbPassword,
}) async {
_crdt = await PostgresCrdt.open(
database,
host: dbHost,
port: dbPort,
username: dbUsername,
password: dbPassword,
);
try {
_crdt = await PostgresCrdt.open(
database,
host: dbHost,
port: dbPort,
username: dbUsername,
password: dbPassword,
);
} catch (e) {
print('Failed to open Postgres database.');
rethrow;
}
await DbUtil.createTables(_crdt);
_crdtSync = CrdtSyncServer(
_syncServer = CrdtSyncServer(
_crdt,
// verbose: true,
verbose: true,
);

final router = Router()
..head('/check_version', _checkVersion)
..get('/auth/<userId>', _auth)
..post('/lists/<userId>/<listId>', _joinList)
..get('/changeset/<userId>/<peerId>', _getChangeset)
..get('/ws/<userId>', _wsHandler);

final handler = Pipeline()
Expand All @@ -62,76 +102,65 @@ class TudoServer {
: Response(HttpStatus.upgradeRequired);

/// By the time we arrive here, both the secret and credentials have been validated
Response _auth(Request request) => Response(HttpStatus.noContent);
Response _auth(Request request, String userId) =>
Response(HttpStatus.noContent);

Future<Response> _joinList(
Request request, String userId, String listId) async {
await _crdt.transaction((txn) async {
final maxPosition = (await txn.query('''
SELECT max(position) as max_position FROM user_lists
WHERE user_id = ?1 AND is_deleted = 0
''', [userId])).first['max_position'] as int? ?? -1;
await txn.execute('''
INSERT INTO user_lists (user_id, list_id, created_at, position, is_deleted)
VALUES (?1, ?2, ?3, ?4, 0)
ON CONFLICT (user_id, list_id) DO UPDATE SET
created_at = ?3,
position = ?4,
is_deleted = 0
''', [userId, listId, DateTime.now().toUtcString, maxPosition + 1]);
});
return Response(HttpStatus.created);
}

Future<Response> _getChangeset(
Request request, String userId, String peerId) async {
final changeset = await CrdtSync.getChangeset(
_crdt,
_queries.map((table, sql) => MapEntry(table, (sql, [userId]))),
isClient: false,
peerId: peerId,
afterHlc: Hlc.zero(_crdt.nodeId),
);
return Response.ok(jsonEncode(changeset));
}

Future<Response> _wsHandler(Request request, String userId) async {
final handler = webSocketHandler(
pingInterval: Duration(seconds: 20),
(WebSocketChannel webSocket) async {
late String remoteNodeId;
await _crdtSync.handle(
(WebSocketChannel webSocket) {
CrdtSync(
_crdt,
webSocket,
tables: ['users', 'user_lists', 'lists', 'todos'],
onConnect: (nodeId, __) {
remoteNodeId = nodeId;
print(
'${userId.short} (${nodeId.short}): connect [${_crdtSync.clientCount}]');
},
isClient: false,
changesetQueries:
_queries.map((table, sql) => MapEntry(table, (sql, [userId]))),
onConnect: (nodeId, __) => print(
'${userId.short} (${nodeId.short}): connect [${_syncServer.clientCount}]'),
onDisconnect: (nodeId, code, reason) => print(
'${userId.short} (${nodeId.short}): disconnect [${_crdtSync.clientCount}] $code ${reason ?? ''}'),
onChangesetReceived: (recordCounts) => print(
'⬇️ ${userId.short} (${remoteNodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}'),
onChangesetSent: (recordCounts) => print(
'⬆️ ${userId.short} (${remoteNodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}'),
queryBuilder: (table, lastModified, remoteNodeId) =>
_queryBuilder(userId, table, lastModified, remoteNodeId),
'${userId.short} (${nodeId.short}): disconnect [${_syncServer.clientCount}] $code ${reason ?? ''}'),
onChangesetReceived: (nodeId, recordCounts) => print(
'⬇️ ${userId.short} (${nodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}'),
onChangesetSent: (nodeId, recordCounts) => print(
'⬆️ ${userId.short} (${nodeId.short}) ${recordCounts.entries.map((e) => '${e.key}: ${e.value}').join(', ')}'),
// verbose: true,
);
},
);
return await handler(request);
}

(String, List<Object?>)? _queryBuilder(
String userId, String table, Hlc lastModified, String remoteNodeId) {
final query = switch (table) {
'users' => '''
SELECT users.id, users.name, users.is_deleted, users.hlc FROM
(SELECT user_id, max(created_at) AS created_at FROM
(SELECT list_id FROM user_lists WHERE user_id = ?1 AND is_deleted = 0) AS list_ids
JOIN user_lists ON user_lists.list_id = list_ids.list_id
GROUP BY user_lists.user_id
) AS user_ids
JOIN users ON users.id = user_ids.user_id
WHERE node_id != ?2
AND modified > CASE WHEN user_ids.created_at >= ?3 THEN '' ELSE ?3 END
''',
'user_lists' => '''
SELECT user_lists.list_id, user_id, position, user_lists.created_at, is_deleted, hlc FROM
(SELECT list_id, created_at FROM user_lists WHERE user_id = ?1) AS own_lists
JOIN user_lists ON own_lists.list_id = user_lists.list_id
WHERE node_id != ?2
AND modified > CASE WHEN own_lists.created_at >= ?3 THEN '' ELSE ?3 END
''',
'lists' => '''
SELECT lists.id, lists.name, lists.color, lists.creator_id,
lists.created_at, lists.is_deleted, lists.hlc FROM user_lists
JOIN lists ON list_id = lists.id AND user_id = ?1 AND user_lists.is_deleted = 0
WHERE lists.node_id != ?2
AND lists.modified > CASE WHEN user_lists.created_at >= ?3 THEN '' ELSE ?3 END
''',
'todos' => '''
SELECT todos.id, todos.list_id, todos.name, todos.done, todos.position,
todos.creator_id, todos.created_at, todos.done_at, todos.done_by,
todos.is_deleted, todos.hlc FROM user_lists
JOIN todos ON user_lists.list_id = todos.list_id AND user_id = ?1 AND user_lists.is_deleted = 0
WHERE todos.node_id != ?2
AND todos.modified > CASE WHEN user_lists.created_at >= ?3 THEN '' ELSE ?3 END
''',
_ => throw "$table: I've never seen this table in my life!"
};
return (query, [userId, remoteNodeId, lastModified]);
}

Handler _validateVersion(Handler innerHandler) => (request) =>
_isVersionSupported(request) ? innerHandler(request) : Response(426);

Expand Down Expand Up @@ -160,16 +189,17 @@ class TudoServer {
}

final userId =
request.headers['user_id'] ?? request.url.pathSegments.last;
final token = request.requestedUri.queryParameters['token'];
request.headers['user_id'] ?? request.url.pathSegments[1];
final token = request.headers[HttpHeaders.authorizationHeader]
?.replaceFirst('bearer ', '') ??
request.requestedUri.queryParameters['token'];

// Validate user id length
if (userId.length != 36) {
return Response.forbidden('Invalid user id: $userId');
}

// Validate token length
// print(token!.length);
if (token == null || token.length < 32 || token.length > 128) {
return Response.forbidden('Invalid token: $token');
}
Expand All @@ -182,7 +212,7 @@ class TudoServer {
// happen atomically
final result = await txn
.query('SELECT token FROM auth WHERE user_id = ?1', [userId]);
knownToken = result.isEmpty ? null : result.first['token'] as String?;
knownToken = result.firstOrNull?['token'] as String?;

// Associate new token with user id
if (knownToken == null) {
Expand All @@ -205,7 +235,7 @@ class TudoServer {
final userAgent = request.headers[HttpHeaders.userAgentHeader]!;
final version = Version.parse(userAgent.substring(
userAgent.indexOf('/') + 1, userAgent.indexOf(' ')));
return version >= Version(2, 3, 1);
return version >= Version(2, 3, 3);
}
}

Expand Down
Loading

0 comments on commit b0149da

Please sign in to comment.