Skip to content

Commit

Permalink
Feat: Allow a custom equals parameter for ObservableStream (mobxjs#771)
Browse files Browse the repository at this point in the history
* feat: allow a custom equals parameter for ObservableStream

Signed-off-by: Minsu Lee <amond@amond.net>

* feat: allow a custom equals parameter for ObservableStream

Signed-off-by: Minsu Lee <amond@amond.net>

Signed-off-by: Minsu Lee <amond@amond.net>
Co-authored-by: Pavan Podila <pavanpodila@users.noreply.github.com>
  • Loading branch information
2 people authored and tlvenn committed Dec 31, 2022
1 parent 3448a84 commit 0f91678
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 14 deletions.
5 changes: 4 additions & 1 deletion docs/docs/api/observable.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ class LoadingIndicator extends StatelessWidget {

## ObservableStream

#### `ObservableStream(Stream<T> stream, {T initialValue, bool cancelOnError, ReactiveContext context})`
#### `ObservableStream(Stream<T> stream, {T initialValue, bool cancelOnError, ReactiveContext context, String? name,
EqualityComparer<dynamic>? equals})`

- **`Stream<T> stream`**: The stream that is tracked for `status` and `value`
changes.
Expand All @@ -380,6 +381,8 @@ class LoadingIndicator extends StatelessWidget {
- **`ReactiveContext context`**: the context to which this observable-stream is
bound. By default, all `ObservableStream`s are bound to the singleton
`mainContext` of the application.
- **`String? name`**: This string is used as a debug name.
- **`EqualityComparer<dynamic>? equals`**: It acts as a comparison function for comparing the previous value with the next value. If this function considers the values to be equal, then the observers will not be re-evaluated.

Similar to `ObservableFuture`, an **`ObservableStream`** provides a reactive
wrapper around a `Stream`. This gives an easy way to observe and re-render
Expand Down
4 changes: 4 additions & 0 deletions mobx/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.1.1

- Allow a custom equals parameter for ObservableStream - [@amondnet](https://github.com/amondnet)

## 2.1.0

- `ObservableSet` now uses `Set` to maintain order of iteration
Expand Down
2 changes: 1 addition & 1 deletion mobx/lib/src/api/async/observable_future.dart
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class ObservableFuture<T> implements Future<T>, ObservableValue<T?> {

@override
ObservableStream<T> asStream() => ObservableStream._(
_context, _future.asStream(), value, false, '${name}_asStream');
_context, _future.asStream(), value, false, '${name}_asStream', null);

@override
ObservableFuture<T> catchError(Function onError,
Expand Down
38 changes: 28 additions & 10 deletions mobx/lib/src/api/async/observable_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ class ObservableStream<T> implements Stream<T>, ObservableValue<T?> {
///
/// If `cancelOnError` is `true`, the stream will be cancelled when an error
/// event is emitted by the source stream. The default value is `false`.
///
/// It is possible to override equality comparison of new values with [equals].
ObservableStream(Stream<T> stream,
{T? initialValue,
bool cancelOnError = false,
ReactiveContext? context,
String? name})
: this._(
context ?? mainContext, stream, initialValue, cancelOnError, name);
String? name,
EqualityComparer<dynamic>? equals})
: this._(context ?? mainContext, stream, initialValue, cancelOnError,
name, equals);

ObservableStream._(ReactiveContext context, this._stream, this._initialValue,
this._cancelOnError, String? name)
: _context = context {
this._cancelOnError, String? name, EqualityComparer<dynamic>? equals)
: _context = context,
_equals = equals {
_name = name ?? _context.nameFor('ObservableStream<$T>');
}

Expand All @@ -56,14 +60,20 @@ class ObservableStream<T> implements Stream<T>, ObservableValue<T?> {
final Stream<T> _stream;

late String _name;

String get name => _name;

final EqualityComparer<dynamic>? _equals;

_ObservableStreamController<T>? _controllerField;

_ObservableStreamController<T> get _controller {
if (_controllerField == null) {
_controllerField = _ObservableStreamController<T>(
_context, _stream, _initialValue,
cancelOnError: _cancelOnError, name: '$name.StreamController');
cancelOnError: _cancelOnError,
name: '$name.StreamController',
equals: _equals);
_initialValue = null;
}
return _controllerField!;
Expand Down Expand Up @@ -123,10 +133,12 @@ class ObservableStream<T> implements Stream<T>, ObservableValue<T?> {
/// Create a new stream with the provided initialValue and cancelOnError.
ObservableStream<T> configure(
{T? initialValue, bool cancelOnError = false}) =>
ObservableStream._(_context, _stream, initialValue, cancelOnError, name);
ObservableStream._(
_context, _stream, initialValue, cancelOnError, name, _equals);

ObservableStream<R> _wrap<R>(Stream<R> stream) =>
ObservableStream._(_context, stream, null, _cancelOnError, name);
ObservableStream<R> _wrap<R>(Stream<R> stream,
{EqualityComparer<dynamic>? equals}) =>
ObservableStream._(_context, stream, null, _cancelOnError, name, _equals ?? equals);

ObservableFuture<R> _wrapFuture<R>(Future<R> future) =>
ObservableFuture._(_context, future, FutureStatus.pending, null, name);
Expand Down Expand Up @@ -323,6 +335,7 @@ class _ObservableStreamController<T> {
T? initialValue, {
required this.cancelOnError,
required this.name,
EqualityComparer<dynamic>? equals,
}) : _initialStreamValue = origStream.isBroadcast ? null : initialValue,
_actions =
ActionController(context: context, name: '$name.ActionController'),
Expand All @@ -332,7 +345,8 @@ class _ObservableStreamController<T> {
name: '$name.status'),
_valueType = Observable(_ValueType.value,
context: context, name: '$name.valueType'),
_data = Observable(initialValue, context: context, name: '$name.data') {
_data = Observable<dynamic>(initialValue,
context: context, name: '$name.data', equals: equals) {
_status
..onBecomeObserved(_listen)
..onBecomeUnobserved(_unsubscribe);
Expand Down Expand Up @@ -366,19 +380,23 @@ class _ObservableStreamController<T> {
final ActionController _actions;

final Observable<_ValueType> _valueType;

_ValueType get valueType => _valueType.value;

final Observable _data;

dynamic get data => _data.value;

final Observable<StreamStatus> _status;

StreamStatus get status => _status.value;

late final Stream<T> stream = _controller.stream;
late final StreamController<T> _controller;

int _listenCount = 0;
bool _isCancelled = false;

bool get isCancelled => _isCancelled;

Future<void> _onCancel() async {
Expand Down
2 changes: 1 addition & 1 deletion mobx/lib/version.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated via set_version.dart. !!!DO NOT MODIFY BY HAND!!!

/// The current version as per `pubspec.yaml`.
const version = '2.1.0';
const version = '2.1.1';
2 changes: 1 addition & 1 deletion mobx/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: mobx
version: 2.1.0
version: 2.1.1
description: "MobX is a library for reactively managing the state of your applications. Use the power of observables, actions, and reactions to supercharge your Dart and Flutter apps."

homepage: https://github.com/mobxjs/mobx.dart
Expand Down
18 changes: 18 additions & 0 deletions mobx/test/observable_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,24 @@ void main() {
reason: 'sub on non-broadcast stream already cancelled');
});

test('can observe value with custom equals', () async {
final ctrl = StreamController<int>();
final stream = ObservableStream(ctrl.stream,
initialValue: 3, equals: (_, __) => false);

final subValues = <int>[];
final sub = stream.listen(subValues.add);

ctrl.add(3);
ctrl.add(3);
ctrl.add(3);
await pumpEventQueue();
expect(stream.value, equals(3), reason: 'with subs, with updates again');
expect(subValues, equals([3, 3, 3, 3]));

await sub.cancel();
});

<String, StreamTestBody>{
'asBroadcastStream': (s) {
final stream = s.asBroadcastStream();
Expand Down

0 comments on commit 0f91678

Please sign in to comment.