Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/stream_feeds/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Next release
- Update API client with renaming `addReaction` to `addActivityReaction` and `deleteReaction` to `deleteActivityReaction`.
- Update `activity.currentFeed` capabilities when adding or updating activity from websocket events.

## 0.3.0
- [BREAKING] Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState`.
Expand Down
7 changes: 7 additions & 0 deletions packages/stream_feeds/lib/src/client/feeds_client_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import '../models/push_notifications_config.dart';
import '../repository/activities_repository.dart';
import '../repository/app_repository.dart';
import '../repository/bookmarks_repository.dart';
import '../repository/capabilities_repository.dart';
import '../repository/comments_repository.dart';
import '../repository/devices_repository.dart';
import '../repository/feeds_repository.dart';
Expand Down Expand Up @@ -163,6 +164,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
_feedsRepository = FeedsRepository(feedsApi);
_moderationRepository = ModerationRepository(feedsApi);
_pollsRepository = PollsRepository(feedsApi);
_capabilitiesRepository = CapabilitiesRepository(feedsApi);

moderation = ModerationClient(_moderationRepository);

Expand Down Expand Up @@ -193,6 +195,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
late final FeedsRepository _feedsRepository;
late final ModerationRepository _moderationRepository;
late final PollsRepository _pollsRepository;
late final CapabilitiesRepository _capabilitiesRepository;

// TODO: Fill this with correct values
late final _systemEnvironmentManager = SystemEnvironmentManager(
Expand Down Expand Up @@ -272,6 +275,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
pollsRepository: _pollsRepository,
eventsEmitter: events,
onReconnectEmitter: onReconnectEmitter,
capabilitiesRepository: _capabilitiesRepository,
);
}

Expand Down Expand Up @@ -306,7 +310,9 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
activitiesRepository: _activitiesRepository,
commentsRepository: _commentsRepository,
pollsRepository: _pollsRepository,
capabilitiesRepository: _capabilitiesRepository,
eventsEmitter: events,
initialActivityData: initialData,
);
}

Expand All @@ -316,6 +322,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
query: query,
currentUserId: user.id,
activitiesRepository: _activitiesRepository,
capabilitiesRepository: _capabilitiesRepository,
eventsEmitter: events,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class OwnCapabilitiesBatchResponse with _$OwnCapabilitiesBatchResponse {
});

@override
final Map<String, List<String>> capabilities;
final Map<String, List<FeedOwnCapability>> capabilities;

@override
final String duration;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions packages/stream_feeds/lib/src/models/activity_data.dart
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ class ActivityData with _$ActivityData {
/// Custom data associated with the activity.
@override
final Map<String, Object?>? custom;

/// Creates a new [ActivityData] instance with the updated ownCapabilities on [currentFeed].
ActivityData copyWithCurrentFeedCapabilities({
required List<FeedOwnCapability> capabilities,
}) {
return copyWith(
currentFeed: currentFeed?.copyWith(
ownCapabilities: capabilities,
),
);
}
}

/// Extension type for activity visibility settings.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import 'dart:async';

import '../../stream_feeds.dart' as api;
import '../../stream_feeds.dart';
import '../utils/batcher.dart';

class CapabilitiesRepository {
CapabilitiesRepository(api.DefaultApi api) : _api = api;

final api.DefaultApi _api;
final Map<String, List<FeedOwnCapability>> _capabilities = {};

late final Batcher<String, Result<Map<String, List<FeedOwnCapability>>>>
_fetchBatcher = Batcher(
action: (feeds) => _fetchWithRetry(feeds: feeds),
);

Future<Result<Map<String, List<FeedOwnCapability>>>> fetchCapabilities({
required List<String> feeds,
}) async {
final result = await _api.ownCapabilitiesBatch(
ownCapabilitiesBatchRequest: api.OwnCapabilitiesBatchRequest(
feeds: feeds,
),
);

return result.map((response) {
_mergeWithCache(response.capabilities);
return response.capabilities;
});
}

void cacheCapabilitiesForFeeds(List<FeedData> feeds) {
for (final feed in feeds) {
cacheCapabilities(feed.id, feed.ownCapabilities);
}
}

void cacheCapabilities(String feed, List<FeedOwnCapability> capabilities) {
_capabilities[feed] = capabilities;
}

Future<List<FeedOwnCapability>?> getCapabilities(String feed) async {
return _capabilities[feed] ??
(await _fetchBatchedFeedCapabilities(feed)).getOrNull();
}
Comment on lines +43 to +46
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I read it correctly that on failures we just give up? I'm thinking if it makes sense to have some retry

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I was not sure yet what to do. Should we just add the list of failed feeds to the next batch, or shall we retry only this specific batch once, so the next batch is clean and can also be retried?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of adding them to the next batch, but no strong opinion. We could also check what they do in JS with @isekovanic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I've added a retry after half a second, but we can improve on this later

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I have not added a retry mechanism on JS, but the queued up IDs will simply not be purged unless the request is successful. So they'll be there the next time something arrives.

But, a retry is much better in my opinion as well, since otherwise you might need to wait. I'll add a retry as well on my side.


void dispose() {
_fetchBatcher.dispose();
}

Future<Result<Map<String, List<FeedOwnCapability>>>> _fetchWithRetry({
required List<String> feeds,
bool isRetry = false,
}) async {
final result = await fetchCapabilities(feeds: feeds);
if (result.shouldRetry() && !isRetry) {
await Future<void>.delayed(const Duration(milliseconds: 500));
return _fetchWithRetry(feeds: feeds, isRetry: true);
}
Comment on lines +57 to +60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it going to loop forever in case of no internet? Maybe better to limit it to n times. We can use the backoff from rate_limiter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the isRetry: true we only retry once.

return result;
}

Future<Result<List<FeedOwnCapability>>> _fetchBatchedFeedCapabilities(
String feed,
) async {
final capabilities = await _fetchBatcher.add(feed);
return capabilities.map((capabilities) => capabilities[feed] ?? []);
}

void _mergeWithCache(Map<String, List<FeedOwnCapability>> capabilities) {
_capabilities.addAll(capabilities);
}
}

extension on Result<Map<String, List<FeedOwnCapability>>> {
bool shouldRetry() {
switch (this) {
case api.Success():
return false;

case final api.Failure failure:
final error = failure.error;
if (error is! StreamDioException) {
return false;
}
final exception = error.exception;
if (exception is! HttpClientException) {
return false;
}
final statusCode = exception.statusCode;
if (statusCode == null) {
return false;
}
return statusCode < 100 || statusCode >= 500;
}
}
}
10 changes: 9 additions & 1 deletion packages/stream_feeds/lib/src/state/activity.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import '../models/request/activity_add_comment_request.dart';
import '../models/request/activity_update_comment_request.dart';
import '../models/threaded_comment_data.dart';
import '../repository/activities_repository.dart';
import '../repository/capabilities_repository.dart';
import '../repository/comments_repository.dart';
import '../repository/polls_repository.dart';
import 'activity_comment_list.dart';
Expand All @@ -40,6 +41,7 @@ class Activity with Disposable {
required this.activitiesRepository,
required this.commentsRepository,
required this.pollsRepository,
required this.capabilitiesRepository,
required this.eventsEmitter,
ActivityData? initialActivityData,
}) {
Expand Down Expand Up @@ -76,6 +78,7 @@ class Activity with Disposable {
final ActivitiesRepository activitiesRepository;
final CommentsRepository commentsRepository;
final PollsRepository pollsRepository;
final CapabilitiesRepository capabilitiesRepository;

late final ActivityCommentList _commentsList;

Expand All @@ -101,7 +104,12 @@ class Activity with Disposable {
Future<Result<ActivityData>> get() async {
final result = await activitiesRepository.getActivity(activityId);

result.onSuccess(_stateNotifier.onActivityUpdated);
result.onSuccess((activity) {
_stateNotifier.onActivityUpdated(activity);
if (activity.currentFeed case final feed?) {
capabilitiesRepository.cacheCapabilitiesForFeeds([feed]);
}
});

// Query the comments as well (state will be updated automatically)
await queryComments();
Expand Down
25 changes: 20 additions & 5 deletions packages/stream_feeds/lib/src/state/activity_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:stream_core/stream_core.dart';
import '../models/activity_data.dart';
import '../models/query_configuration.dart';
import '../repository/activities_repository.dart';
import '../repository/capabilities_repository.dart';
import 'activity_list_state.dart';
import 'event/activity_list_event_handler.dart';
import 'query/activities_query.dart';
Expand All @@ -26,6 +27,7 @@ class ActivityList with Disposable {
required this.query,
required this.currentUserId,
required this.activitiesRepository,
required this.capabilitiesRepository,
required this.eventsEmitter,
}) {
_stateNotifier = ActivityListStateNotifier(
Expand All @@ -34,13 +36,17 @@ class ActivityList with Disposable {
);

// Attach event handlers for real-time updates
final handler = ActivityListEventHandler(state: _stateNotifier);
final handler = ActivityListEventHandler(
state: _stateNotifier,
capabilitiesRepository: capabilitiesRepository,
);
_eventsSubscription = eventsEmitter.listen(handler.handleEvent);
}

final ActivitiesQuery query;
final String currentUserId;
final ActivitiesRepository activitiesRepository;
final CapabilitiesRepository capabilitiesRepository;

late final ActivityListStateNotifier _stateNotifier;

Expand Down Expand Up @@ -93,10 +99,19 @@ class ActivityList with Disposable {
final result = await activitiesRepository.queryActivities(query);

result.onSuccess(
(activitiesData) => _stateNotifier.onQueryMoreActivities(
activitiesData,
QueryConfiguration(filter: query.filter, sort: query.sort),
),
(activitiesData) {
_stateNotifier.onQueryMoreActivities(
activitiesData,
QueryConfiguration(filter: query.filter, sort: query.sort),
);

capabilitiesRepository.cacheCapabilitiesForFeeds(
activitiesData.items
.map((activity) => activity.currentFeed)
.nonNulls
.toList(),
);
},
);

return result.map((activitiesData) => activitiesData.items);
Expand Down
Loading