Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 37 additions & 53 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class StreamChatClient {
},
);

_connectionStatusSubscription = wsConnectionStatusStream.pairwise().listen(
(statusPair) {
final [prevStatus, currStatus] = statusPair;
return _onConnectionStatusChanged(prevStatus, currStatus);
},
);

state = ClientState(this);
}

Expand Down Expand Up @@ -220,7 +227,7 @@ class StreamChatClient {
///```
final LogHandlerFunction logHandlerFunction;

StreamSubscription<ConnectionStatus>? _connectionStatusSubscription;
StreamSubscription<List<ConnectionStatus>>? _connectionStatusSubscription;
Comment on lines -223 to +230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to make this a (named) record instead of a list? Should also be 2 states right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but then we have to map the value to a record before listening to it. Also the type is not really useful for us here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I thought the pairwise() was something from us, but it's a feature from rxdart.


final _eventController = PublishSubject<Event>();

Expand All @@ -238,20 +245,14 @@ class StreamChatClient {
},
);

final _wsConnectionStatusController =
BehaviorSubject.seeded(ConnectionStatus.disconnected);

set _wsConnectionStatus(ConnectionStatus status) =>
_wsConnectionStatusController.add(status);

/// The current status value of the [_ws] connection
ConnectionStatus get wsConnectionStatus =>
_wsConnectionStatusController.value;
ConnectionStatus get wsConnectionStatus => _ws.connectionStatus;

/// This notifies the connection status of the [_ws] connection.
/// Listen to this to get notified when the [_ws] tries to reconnect.
Stream<ConnectionStatus> get wsConnectionStatusStream =>
_wsConnectionStatusController.stream.distinct();
Stream<ConnectionStatus> get wsConnectionStatusStream {
return _ws.connectionStatusStream.distinct();
}

/// Default log handler function for the [StreamChatClient] logger.
static void defaultLogHandler(LogRecord record) {
Expand Down Expand Up @@ -447,16 +448,6 @@ class StreamChatClient {
throw StreamChatError('Connection already available for ${user.id}');
}

_wsConnectionStatus = ConnectionStatus.connecting;

// skipping `ws` seed connection status -> ConnectionStatus.disconnected
// otherwise `client.wsConnectionStatusStream` will emit in order
// 1. ConnectionStatus.disconnected -> client seed status
// 2. ConnectionStatus.connecting -> client connecting status
// 3. ConnectionStatus.disconnected -> ws seed status
_connectionStatusSubscription =
_ws.connectionStatusStream.skip(1).listen(_connectionStatusHandler);

try {
final event = await _ws.connect(
user,
Expand All @@ -479,13 +470,7 @@ class StreamChatClient {
/// This will not trigger default auto-retry mechanism for reconnection.
/// You need to call [openConnection] to reconnect to [_ws].
void closeConnection() {
if (wsConnectionStatus == ConnectionStatus.disconnected) return;

logger.info('Closing web-socket connection for ${state.currentUser?.id}');
_wsConnectionStatus = ConnectionStatus.disconnected;

_connectionStatusSubscription?.cancel();
_connectionStatusSubscription = null;

// Stop listening to events
state.cancelEventSubscription();
Expand Down Expand Up @@ -513,19 +498,25 @@ class StreamChatClient {
return _eventController.add(event);
}

void _connectionStatusHandler(ConnectionStatus status) async {
final previousState = wsConnectionStatus;
final currentState = _wsConnectionStatus = status;
void _onConnectionStatusChanged(
ConnectionStatus prevStatus,
ConnectionStatus currStatus,
) async {
// If the status hasn't changed, we don't need to do anything.
if (prevStatus == currStatus) return;

if (previousState != currentState) {
handleEvent(Event(
type: EventType.connectionChanged,
online: status == ConnectionStatus.connected,
));
}
final wasConnected = prevStatus == ConnectionStatus.connected;
final isConnected = currStatus == ConnectionStatus.connected;

// Notify the connection status change event
handleEvent(Event(
type: EventType.connectionChanged,
online: isConnected,
));

if (currentState == ConnectionStatus.connected &&
previousState != ConnectionStatus.connected) {
final connectionRecovered = !wasConnected && isConnected;

if (connectionRecovered) {
// connection recovered
final cids = state.channels.keys.toList(growable: false);
if (cids.isNotEmpty) {
Expand Down Expand Up @@ -2025,6 +2016,9 @@ class StreamChatClient {
Future<void> disconnectUser({bool flushChatPersistence = false}) async {
logger.info('Disconnecting user : ${state.currentUser?.id}');

// closing web-socket connection
closeConnection();

// resetting state.
state.dispose();
state = ClientState(this);
Expand All @@ -2035,27 +2029,17 @@ class StreamChatClient {
_connectionIdManager.reset();

// closing persistence connection.
await closePersistenceConnection(flush: flushChatPersistence);

// closing web-socket connection
return closeConnection();
return closePersistenceConnection(flush: flushChatPersistence);
}

/// Call this function to dispose the client
Future<void> dispose() async {
logger.info('Disposing new StreamChatClient');

// disposing state.
state.dispose();

// closing persistence connection.
await closePersistenceConnection();

// closing web-socket connection.
closeConnection();
logger.info('Disposing StreamChatClient');

await disconnectUser();
await _ws.dispose();
await _eventController.close();
await _wsConnectionStatusController.close();
await _connectionStatusSubscription?.cancel();
}
}

Expand Down
38 changes: 31 additions & 7 deletions packages/stream_chat/lib/src/ws/websocket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class WebSocket with TimerHelper {
this.reconnectionMonitorInterval = 10,
this.healthCheckInterval = 20,
this.reconnectionMonitorTimeout = 40,
this.maxReconnectAttempts = 6,
this.queryParameters = const {},
}) : _logger = logger;

Expand Down Expand Up @@ -93,6 +94,11 @@ class WebSocket with TimerHelper {
/// connection unhealthy
final int reconnectionMonitorTimeout;

/// Maximum number of reconnection attempts before giving up.
///
/// Default is 6 attempts. ~30 seconds of reconnection attempts
final int maxReconnectAttempts;

User? _user;
String? _connectionId;
DateTime? _lastEventAt;
Expand All @@ -105,18 +111,20 @@ class WebSocket with TimerHelper {
///
String? get connectionId => _connectionId;

final _connectionStatusController =
BehaviorSubject.seeded(ConnectionStatus.disconnected);

set _connectionStatus(ConnectionStatus status) =>
_connectionStatusController.safeAdd(status);
final _connectionStatusController = BehaviorSubject.seeded(
ConnectionStatus.disconnected,
);

/// The current connection status value
ConnectionStatus get connectionStatus => _connectionStatusController.value;
set _connectionStatus(ConnectionStatus status) {
_connectionStatusController.safeAdd(status);
}

/// This notifies of connection status changes
Stream<ConnectionStatus> get connectionStatusStream =>
_connectionStatusController.stream.distinct();
Stream<ConnectionStatus> get connectionStatusStream {
return _connectionStatusController.stream.distinct();
}

void _initWebSocketChannel(Uri uri) {
_logger?.info('Initiating connection with $baseUrl');
Expand Down Expand Up @@ -250,6 +258,12 @@ class WebSocket with TimerHelper {
void _reconnect({bool refreshToken = false}) async {
_logger?.info('Retrying connection : $_reconnectAttempt');
if (_reconnectRequestInProgress) return;

if (_reconnectAttempt >= maxReconnectAttempts) {
_logger?.severe('Max reconnect attempts reached: $maxReconnectAttempts');
return disconnect();
}

_reconnectRequestInProgress = true;

_stopMonitoringEvents();
Expand Down Expand Up @@ -495,4 +509,14 @@ class WebSocket with TimerHelper {

_closeWebSocketChannel();
}

/// Disposes the web-socket connection and releases resources
Future<void> dispose() async {
_logger?.info('Disposing web-socket connection');

_stopMonitoringEvents();
_unsubscribeFromWebSocketChannel();
_closeWebSocketChannel();
_connectionStatusController.close();
}
}
33 changes: 9 additions & 24 deletions packages/stream_chat/test/src/client/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,6 @@ void main() {
}
});

test('should throw if connection is already in progress', () async {
expect(client.state.currentUser, isNull);
try {
await client.connectAnonymousUser();
await client.openConnection();
} catch (e) {
expect(e, isA<StreamChatError>());
final err = e as StreamChatError;
expect(
err.message.contains('Connection already in progress for'),
isTrue,
);
}
});

test('should throw if connection is already available', () async {
expect(client.state.currentUser, isNull);
try {
Expand Down Expand Up @@ -499,7 +484,6 @@ void main() {
group('Client with connected user with persistence', () {
const apiKey = 'test-api-key';
late final api = FakeChatApi();
late final ws = FakeWebSocket();
late final persistence = MockPersistenceClient();

final user = User(id: 'test-user-id');
Expand All @@ -518,6 +502,7 @@ void main() {
when(() => persistence.updateLastSyncAt(any()))
.thenAnswer((_) => Future.value());
when(persistence.getLastSyncAt).thenAnswer((_) async => null);
final ws = FakeWebSocket();
client = StreamChatClient(apiKey, chatApi: api, ws: ws)
..chatPersistenceClient = persistence;
await client.connectUser(user, token);
Expand All @@ -526,8 +511,8 @@ void main() {
expect(client.wsConnectionStatus, ConnectionStatus.connected);
});

tearDown(() {
client.dispose();
tearDown(() async {
await client.dispose();
});

group('`.sync`', () {
Expand Down Expand Up @@ -815,7 +800,6 @@ void main() {
const apiKey = 'test-api-key';
const userId = 'test-user-id';
late final api = FakeChatApi();
late final ws = FakeWebSocket();

final user = User(id: userId);
final token = Token.development(user.id).rawValue;
Expand All @@ -832,15 +816,16 @@ void main() {
});

setUp(() async {
final ws = FakeWebSocket();
client = StreamChatClient(apiKey, chatApi: api, ws: ws);
await client.connectUser(user, token);
await delay(300);
expect(client.persistenceEnabled, isFalse);
expect(client.wsConnectionStatus, ConnectionStatus.connected);
});

tearDown(() {
client.dispose();
tearDown(() async {
await client.dispose();
});

group('`.sync`', () {
Expand Down Expand Up @@ -3490,22 +3475,22 @@ void main() {
group('PersistenceConnectionTests', () {
const apiKey = 'test-api-key';
late final api = FakeChatApi();
late final ws = FakeWebSocket();

final user = User(id: 'test-user-id');
final token = Token.development(user.id).rawValue;

late StreamChatClient client;

setUp(() async {
final ws = FakeWebSocket();
client = StreamChatClient(apiKey, chatApi: api, ws: ws);
expect(client.persistenceEnabled, isFalse);
});

tearDown(() {
tearDown(() async {
client.chatPersistenceClient = null;
expect(client.persistenceEnabled, isFalse);
client.dispose();
await client.dispose();
});

test('openPersistenceConnection connects the client to the user', () async {
Expand Down
Loading