Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Allow a custom equals parameter for ObservableStream #771

Merged
merged 4 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/docs/api/observable.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ class LoadingIndicator extends StatelessWidget {

## ObservableStream

#### `ObservableStream(Stream<T> stream, {T initialValue, bool cancelOnError, ReactiveContext context})`
#### `ObservableStream(Stream<T> stream, {T initialValue, bool cancelOnError, ReactiveContext context, String? name,
EqualityComparer<dynamic>? equals})`

- **`Stream<T> stream`**: The stream that is tracked for `status` and `value`
changes.
Expand All @@ -380,6 +381,8 @@ class LoadingIndicator extends StatelessWidget {
- **`ReactiveContext context`**: the context to which this observable-stream is
bound. By default, all `ObservableStream`s are bound to the singleton
`mainContext` of the application.
- **`String? name`**: This string is used as a debug name.
- **`EqualityComparer<dynamic>? equals`**: It acts as a comparison function for comparing the previous value with the next value. If this function considers the values to be equal, then the observers will not be re-evaluated.

Similar to `ObservableFuture`, an **`ObservableStream`** provides a reactive
wrapper around a `Stream`. This gives an easy way to observe and re-render
Expand Down
2 changes: 1 addition & 1 deletion flutter_mobx/lib/version.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated via set_version.dart. !!!DO NOT MODIFY BY HAND!!!

/// The current version as per `pubspec.yaml`.
const version = '2.0.6+2';
const version = '2.0.6+3';
4 changes: 4 additions & 0 deletions mobx/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.1.1

- Allow a custom equals parameter for ObservableStream - [@amondnet](https://github.com/amondnet)

## 2.1.0

- `ObservableSet` now uses `Set` to maintain order of iteration
Expand Down
2 changes: 1 addition & 1 deletion mobx/lib/src/api/async/observable_future.dart
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class ObservableFuture<T> implements Future<T>, ObservableValue<T?> {

@override
ObservableStream<T> asStream() => ObservableStream._(
_context, _future.asStream(), value, false, '${name}_asStream');
_context, _future.asStream(), value, false, '${name}_asStream', null);

@override
ObservableFuture<T> catchError(Function onError,
Expand Down
38 changes: 28 additions & 10 deletions mobx/lib/src/api/async/observable_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ class ObservableStream<T> implements Stream<T>, ObservableValue<T?> {
///
/// If `cancelOnError` is `true`, the stream will be cancelled when an error
/// event is emitted by the source stream. The default value is `false`.
///
/// It is possible to override equality comparison of new values with [equals].
ObservableStream(Stream<T> stream,
{T? initialValue,
bool cancelOnError = false,
ReactiveContext? context,
String? name})
: this._(
context ?? mainContext, stream, initialValue, cancelOnError, name);
String? name,
EqualityComparer<dynamic>? equals})
: this._(context ?? mainContext, stream, initialValue, cancelOnError,
name, equals);

ObservableStream._(ReactiveContext context, this._stream, this._initialValue,
this._cancelOnError, String? name)
: _context = context {
this._cancelOnError, String? name, EqualityComparer<dynamic>? equals)
: _context = context,
_equals = equals {
_name = name ?? _context.nameFor('ObservableStream<$T>');
}

Expand All @@ -56,14 +60,20 @@ class ObservableStream<T> implements Stream<T>, ObservableValue<T?> {
final Stream<T> _stream;

late String _name;

String get name => _name;

final EqualityComparer<dynamic>? _equals;

_ObservableStreamController<T>? _controllerField;

_ObservableStreamController<T> get _controller {
if (_controllerField == null) {
_controllerField = _ObservableStreamController<T>(
_context, _stream, _initialValue,
cancelOnError: _cancelOnError, name: '$name.StreamController');
cancelOnError: _cancelOnError,
name: '$name.StreamController',
equals: _equals);
_initialValue = null;
}
return _controllerField!;
Expand Down Expand Up @@ -123,10 +133,12 @@ class ObservableStream<T> implements Stream<T>, ObservableValue<T?> {
/// Create a new stream with the provided initialValue and cancelOnError.
ObservableStream<T> configure(
{T? initialValue, bool cancelOnError = false}) =>
ObservableStream._(_context, _stream, initialValue, cancelOnError, name);
ObservableStream._(
_context, _stream, initialValue, cancelOnError, name, _equals);

ObservableStream<R> _wrap<R>(Stream<R> stream) =>
ObservableStream._(_context, stream, null, _cancelOnError, name);
ObservableStream<R> _wrap<R>(Stream<R> stream,
{EqualityComparer<dynamic>? equals}) =>
ObservableStream._(_context, stream, null, _cancelOnError, name, _equals ?? equals);

ObservableFuture<R> _wrapFuture<R>(Future<R> future) =>
ObservableFuture._(_context, future, FutureStatus.pending, null, name);
Expand Down Expand Up @@ -323,6 +335,7 @@ class _ObservableStreamController<T> {
T? initialValue, {
required this.cancelOnError,
required this.name,
EqualityComparer<dynamic>? equals,
}) : _initialStreamValue = origStream.isBroadcast ? null : initialValue,
_actions =
ActionController(context: context, name: '$name.ActionController'),
Expand All @@ -332,7 +345,8 @@ class _ObservableStreamController<T> {
name: '$name.status'),
_valueType = Observable(_ValueType.value,
context: context, name: '$name.valueType'),
_data = Observable(initialValue, context: context, name: '$name.data') {
_data = Observable<dynamic>(initialValue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not Observable<T?>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pavanpodila

Yes, I would like to use Observable<T?>, but Object type is assigned inside the _onError function.

  void _onError(Object error) {
    final actionInfo = _actions.startAction();
    try {
      _status.value = StreamStatus.active;
      _valueType.value = _ValueType.error;
      _data.value = error;
    } finally {
      _actions.endAction(actionInfo);
      _tryInsertInitialValue();
      _controller.addError(error);
    }
  }

context: context, name: '$name.data', equals: equals) {
_status
..onBecomeObserved(_listen)
..onBecomeUnobserved(_unsubscribe);
Expand Down Expand Up @@ -366,19 +380,23 @@ class _ObservableStreamController<T> {
final ActionController _actions;

final Observable<_ValueType> _valueType;

_ValueType get valueType => _valueType.value;

final Observable _data;

dynamic get data => _data.value;

final Observable<StreamStatus> _status;

StreamStatus get status => _status.value;

late final Stream<T> stream = _controller.stream;
late final StreamController<T> _controller;

int _listenCount = 0;
bool _isCancelled = false;

bool get isCancelled => _isCancelled;

Future<void> _onCancel() async {
Expand Down
2 changes: 1 addition & 1 deletion mobx/lib/version.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated via set_version.dart. !!!DO NOT MODIFY BY HAND!!!

/// The current version as per `pubspec.yaml`.
const version = '2.1.0';
const version = '2.1.1';
2 changes: 1 addition & 1 deletion mobx/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: mobx
version: 2.1.0
version: 2.1.1
description: "MobX is a library for reactively managing the state of your applications. Use the power of observables, actions, and reactions to supercharge your Dart and Flutter apps."

homepage: https://github.com/mobxjs/mobx.dart
Expand Down
18 changes: 18 additions & 0 deletions mobx/test/observable_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,24 @@ void main() {
reason: 'sub on non-broadcast stream already cancelled');
});

test('can observe value with custom equals', () async {
final ctrl = StreamController<int>();
final stream = ObservableStream(ctrl.stream,
initialValue: 3, equals: (_, __) => false);

final subValues = <int>[];
final sub = stream.listen(subValues.add);

ctrl.add(3);
ctrl.add(3);
ctrl.add(3);
await pumpEventQueue();
expect(stream.value, equals(3), reason: 'with subs, with updates again');
expect(subValues, equals([3, 3, 3, 3]));

await sub.cancel();
});

<String, StreamTestBody>{
'asBroadcastStream': (s) {
final stream = s.asBroadcastStream();
Expand Down
2 changes: 1 addition & 1 deletion mobx_codegen/lib/version.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated via set_version.dart. !!!DO NOT MODIFY BY HAND!!!

/// The current version as per `pubspec.yaml`.
const version = '2.0.7+1';
const version = '2.0.7+2';