Skip to content
Merged
2 changes: 1 addition & 1 deletion melos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ command:
shared_preferences: ^2.5.3
state_notifier: ^1.0.0
stream_feeds: ^0.5.0
stream_core: ^0.3.3
stream_core: ^0.4.0
video_player: ^2.10.0
uuid: ^4.5.1

Expand Down
67 changes: 38 additions & 29 deletions packages/stream_feeds/lib/src/client/feeds_client_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import '../repository/devices_repository.dart';
import '../repository/feeds_repository.dart';
import '../repository/moderation_repository.dart';
import '../repository/polls_repository.dart';
import '../resolvers/resolvers.dart' as event_resolvers;
import '../state/activity.dart';
import '../state/activity_comment_list.dart';
import '../state/activity_list.dart';
Expand All @@ -32,6 +31,7 @@ import '../state/comment_list.dart';
import '../state/comment_reaction_list.dart';
import '../state/comment_reply_list.dart';
import '../state/event/on_activity_added.dart';
import '../state/event/state_update_event.dart';
import '../state/feed.dart';
import '../state/feed_list.dart';
import '../state/follow_list.dart';
Expand Down Expand Up @@ -105,10 +105,6 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
),
messageCodec: const FeedsWsCodec(),
onConnectionEstablished: _authenticateUser,
eventResolvers: [
event_resolvers.pollAnswerCastedFeedEventResolver,
event_resolvers.pollAnswerRemovedFeedEventResolver,
],
wsProvider: wsProvider,
);

Expand Down Expand Up @@ -172,6 +168,12 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
moderation = ModerationClient(_moderationRepository);

// endregion

// Map WebSocket events to state update events
_wsEventToStateMapperSubscription = events.listen((event) {
final stateEvent = StateUpdateEvent.fromWsEvent(event);
stateEvent?.let(_stateUpdateEmitter.tryEmit);
});
}

final String apiKey;
Expand Down Expand Up @@ -235,7 +237,12 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
}

@override
EventEmitter get events => _ws.events;
EventEmitter<WsEvent> get events => _ws.events;

@override
EventEmitter<StateUpdateEvent> get stateUpdateEvents => _stateUpdateEmitter;
late final _stateUpdateEmitter = MutableEventEmitter<StateUpdateEvent>();
StreamSubscription<WsEvent>? _wsEventToStateMapperSubscription;

@override
ConnectionStateEmitter get connectionState => _ws.connectionState;
Expand All @@ -262,6 +269,9 @@ class StreamFeedsClientImpl implements StreamFeedsClient {

@override
Future<void> disconnect() async {
await _wsEventToStateMapperSubscription?.cancel();
await _stateUpdateEmitter.close();

await _connectionRecoveryHandler.dispose();
await _ws.disconnect();
}
Expand All @@ -280,9 +290,9 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
commentsRepository: _commentsRepository,
feedsRepository: _feedsRepository,
pollsRepository: _pollsRepository,
eventsEmitter: events,
onReconnectEmitter: onReconnectEmitter,
capabilitiesRepository: _capabilitiesRepository,
onReconnectEmitter: onReconnectEmitter,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -291,7 +301,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return FeedList(
query: query,
feedsRepository: _feedsRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -300,7 +310,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return FollowList(
query: query,
feedsRepository: _feedsRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -311,15 +321,15 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
ActivityData? initialData,
}) {
return Activity(
activityId: activityId,
fid: fid,
activityId: activityId,
currentUserId: user.id,
initialActivityData: initialData,
activitiesRepository: _activitiesRepository,
commentsRepository: _commentsRepository,
pollsRepository: _pollsRepository,
capabilitiesRepository: _capabilitiesRepository,
eventsEmitter: events,
initialActivityData: initialData,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -330,7 +340,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
currentUserId: user.id,
activitiesRepository: _activitiesRepository,
capabilitiesRepository: _capabilitiesRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand Down Expand Up @@ -359,7 +369,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return ActivityReactionList(
query: query,
activitiesRepository: _activitiesRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -368,7 +378,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return BookmarkList(
query: query,
bookmarksRepository: _bookmarksRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -377,27 +387,27 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return BookmarkFolderList(
query: query,
bookmarksRepository: _bookmarksRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

@override
CommentList commentList(CommentsQuery query) {
return CommentList(
query: query,
commentsRepository: _commentsRepository,
eventsEmitter: events,
currentUserId: user.id,
commentsRepository: _commentsRepository,
eventsEmitter: _stateUpdateEmitter,
);
}

@override
ActivityCommentList activityCommentList(ActivityCommentsQuery query) {
return ActivityCommentList(
query: query,
commentsRepository: _commentsRepository,
eventsEmitter: events,
currentUserId: user.id,
commentsRepository: _commentsRepository,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -407,7 +417,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
query: query,
currentUserId: user.id,
commentsRepository: _commentsRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -416,7 +426,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return CommentReactionList(
query: query,
commentsRepository: _commentsRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -425,7 +435,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return MemberList(
query: query,
feedsRepository: _feedsRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -434,17 +444,17 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return PollVoteList(
query: query,
pollsRepository: _pollsRepository,
eventsEmitter: events,
eventsEmitter: _stateUpdateEmitter,
);
}

@override
PollList pollList(PollsQuery query) {
return PollList(
query: query,
pollsRepository: _pollsRepository,
eventsEmitter: events,
currentUserId: user.id,
pollsRepository: _pollsRepository,
eventsEmitter: _stateUpdateEmitter,
);
}

Expand All @@ -453,7 +463,6 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
return ModerationConfigList(
query: query,
moderationRepository: _moderationRepository,
eventsEmitter: events,
);
}

Expand Down Expand Up @@ -494,7 +503,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
}

Stream<void> get onReconnectEmitter {
return connectionState.stream.scan(
return connectionState.scan(
(state, connectionStatus, i) => switch (connectionStatus) {
Initialized() || Connecting() => (
wasDisconnected: state.wasDisconnected,
Expand Down
24 changes: 24 additions & 0 deletions packages/stream_feeds/lib/src/feeds_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import 'state/comment_list.dart';
import 'state/comment_reaction_list.dart';
import 'state/comment_reply_list.dart';
import 'state/event/on_activity_added.dart';
import 'state/event/state_update_event.dart';
import 'state/feed.dart';
import 'state/feed_list.dart';
import 'state/follow_list.dart';
Expand Down Expand Up @@ -179,8 +180,31 @@ abstract interface class StreamFeedsClient {
/// events of a specific subtype of [WsEvent].
EventEmitter get events;

/// The connection state emitter for monitoring WebSocket connection status.
///
/// Emits connection state changes including [Connected], [Connecting],
/// [Disconnected], and [Disconnecting] states.
///
/// Example:
/// ```dart
/// client.connectionState.listen((state) {
/// if (state is Connected) {
/// print('Connected to Stream');
/// } else if (state is Disconnected) {
/// print('Disconnected: ${state.source.closeReason}');
/// }
/// });
/// ```
ConnectionStateEmitter get connectionState;

/// The state update events emitter for testing purposes only.
///
/// This emitter is exposed only for internal testing and should not be
/// accessed in production code. Use the [events] emitter for WebSocket
/// events instead.
@visibleForTesting
SharedEmitter<StateUpdateEvent> get stateUpdateEvents;

/// Updates the system environment information used by the client.
///
/// It allows you to set environment-specific information that will be
Expand Down
44 changes: 0 additions & 44 deletions packages/stream_feeds/lib/src/models/poll_data.dart
Original file line number Diff line number Diff line change
Expand Up @@ -220,28 +220,6 @@ extension PollDataMutations on PollData {
);
}

/// Adds or updates an answer in this poll.
///
/// Updates the own votes and answers list by adding or updating [answer]. Only adds answers
/// that belong to [currentUserId].
///
/// Returns a new [PollData] instance with the updated own votes and answers.
PollData upsertAnswer(
PollData updatedPoll,
PollVoteData answer,
String currentUserId,
) {
final updatedOwnVotesAndAnswers = ownVotesAndAnswers.let((it) {
if (answer.userId != currentUserId) return it;
return it.upsert(answer, key: (it) => it.id);
});

return updateWith(
updatedPoll,
ownVotesAndAnswers: updatedOwnVotesAndAnswers,
);
}

/// Removes a vote from this poll.
///
/// Updates the own votes and answers list by removing [vote]. Only removes votes
Expand All @@ -263,28 +241,6 @@ extension PollDataMutations on PollData {
ownVotesAndAnswers: updatedOwnVotesAndAnswers,
);
}

/// Removes an answer from this poll.
///
/// Updates the own votes and answers list by removing [answer]. Only removes answers
/// that belong to [currentUserId].
///
/// Returns a new [PollData] instance with the updated own votes and answers.
PollData removeAnswer(
PollData updatedPoll,
PollVoteData answer,
String currentUserId,
) {
final updatedOwnVotesAndAnswers = ownVotesAndAnswers.let((it) {
if (answer.userId != currentUserId) return it;
return it.where((it) => it.id != answer.id).toList();
});

return updateWith(
updatedPoll,
ownVotesAndAnswers: updatedOwnVotesAndAnswers,
);
}
}

/// Extension function to convert a [PollResponseData] to a [PollData] model.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,10 @@ class ActivitiesRepository {
///
/// Creates a new reaction on the activity with [activityId] using the provided [request] data.
///
/// Returns a [Result] containing the [FeedsReactionData] or an error.
Future<Result<FeedsReactionData>> addActivityReaction(
/// Returns a [Result] containing a record with the updated [ActivityData] and
/// created [FeedsReactionData] or an error.
Future<Result<({ActivityData activity, FeedsReactionData reaction})>>
addActivityReaction(
String activityId,
api.AddReactionRequest request,
) async {
Expand All @@ -204,15 +206,23 @@ class ActivitiesRepository {
addReactionRequest: request,
);

return result.map((response) => response.reaction.toModel());
return result.map(
(response) => (
activity: response.activity.toModel(),
reaction: response.reaction.toModel(),
),
);
}

/// Deletes a reaction from an activity.
///
/// Removes the reaction of [type] from the activity with [activityId].
///
/// Returns a [Result] containing the deleted [FeedsReactionData] or an error.
Future<Result<FeedsReactionData>> deleteActivityReaction(
/// Returns a [Result] containing a record with keys `activity` and `reaction`.
/// The `activity` field contains the updated [ActivityData] after the reaction
/// was removed, and the `reaction` field contains the deleted [FeedsReactionData].
Future<Result<({ActivityData activity, FeedsReactionData reaction})>>
deleteActivityReaction(
String activityId,
String type,
) async {
Expand All @@ -221,7 +231,12 @@ class ActivitiesRepository {
type: type,
);

return result.map((response) => response.reaction.toModel());
return result.map(
(response) => (
activity: response.activity.toModel(),
reaction: response.reaction.toModel(),
),
);
}

/// Queries reactions for a specific activity.
Expand Down
Loading