Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Better stream and access token management #1019

Merged
merged 17 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class RealtimeCloseEvent {
required this.code,
required this.reason,
});

@override
String toString() {
return 'RealtimeCloseEvent{code: $code, reason: $reason}';
}
}

class RealtimeClient {
Expand Down Expand Up @@ -353,7 +358,7 @@ class RealtimeClient {

for (final channel in channels) {
if (token != null) {
channel.updateJoinPayload({'user_token': token});
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
channel.updateJoinPayload({'access_token': token});
}
if (channel.joinedOnce && channel.isJoined) {
channel.push(ChannelEvents.accessToken, {'access_token': token});
Expand Down
2 changes: 1 addition & 1 deletion packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ void main() {
});

group('setAuth', () {
final updateJoinPayload = {'user_token': 'token123'};
final updateJoinPayload = {'access_token': 'token123'};
final pushPayload = {'access_token': 'token123'};

test(
Expand Down
50 changes: 44 additions & 6 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class _Order {
final bool ascending;
}

class RealtimeSubscribeException implements Exception {
RealtimeSubscribeException(this.status, [this.details]);

final RealtimeSubscribeStatus status;
final Object? details;

@override
String toString() {
return 'RealtimeSubscribeException(status: $status, details: $details)';
}
}

typedef SupabaseStreamEvent = List<Map<String, dynamic>>;

class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
Expand Down Expand Up @@ -64,6 +76,9 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
/// Count of record to be returned
int? _limit;

/// Flag that the stream has at least one time been subscribed to realtime
bool _wasSubscribed = false;

SupabaseStreamBuilder({
required PostgrestQueryBuilder queryBuilder,
required String realtimeTopic,
Expand Down Expand Up @@ -195,12 +210,31 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
}
})
.subscribe((status, [error]) {
if (error != null) {
_addException(error);
switch (status) {
case RealtimeSubscribeStatus.subscribed:
// Reload all data after a reconnect from postgrest
// First data from postgrest gets loaded before the realtime connect
if (_wasSubscribed) {
_getPostgrestData();
}
_wasSubscribed = true;
break;
case RealtimeSubscribeStatus.closed:
_streamController?.close();
break;
case RealtimeSubscribeStatus.timedOut:
_addException(RealtimeSubscribeException(status, error));
break;
case RealtimeSubscribeStatus.channelError:
_addException(RealtimeSubscribeException(status, error));
break;
}
});
_getPostgrestData();
}

PostgrestFilterBuilder query = _queryBuilder.select();
Future<void> _getPostgrestData() async {
PostgrestFilterBuilder<PostgrestList> query = _queryBuilder.select();
if (_streamFilter != null) {
switch (_streamFilter!.type) {
case PostgresChangeFilterType.eq:
Expand All @@ -226,7 +260,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
break;
}
}
PostgrestTransformBuilder? transformQuery;
PostgrestTransformBuilder<PostgrestList>? transformQuery;
if (_orderBy != null) {
transformQuery =
query.order(_orderBy!.column, ascending: _orderBy!.ascending);
Expand All @@ -237,11 +271,15 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

try {
final data = await (transformQuery ?? query);
final rows = SupabaseStreamEvent.from(data as List);
_streamData.addAll(rows);
final rows = SupabaseStreamEvent.from(data);
_streamData = rows;
_addStream();
} catch (error, stackTrace) {
_addException(error, stackTrace);
// In case the postgrest call fails, there is no need to keep the
// realtime connection open
_channel?.unsubscribe();
_streamController?.close();
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/supabase_flutter/lib/src/supabase.dart
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class Supabase {
accessToken: accessToken,
);
_instance._debugEnable = debug ?? kDebugMode;
_instance.log('***** Supabase init completed $_instance');
_instance.log('***** Supabase init completed *****');

_instance._supabaseAuth = SupabaseAuth();
await _instance._supabaseAuth.initialize(options: authOptions);
Expand Down
7 changes: 6 additions & 1 deletion packages/supabase_flutter/lib/src/supabase_auth.dart
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ class SupabaseAuth with WidgetsBindingObserver {
case AppLifecycleState.detached:
case AppLifecycleState.inactive:
case AppLifecycleState.paused:
Supabase.instance.client.auth.stopAutoRefresh();
// Realtime channels are kept alive in the background for some amount
// of time after the app is paused. If we stop refreshing the token
// here, the channels will be closed.
if (Supabase.instance.client.realtime.getChannels().isEmpty) {
Supabase.instance.client.auth.stopAutoRefresh();
}
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
default:
}
}
Expand Down