Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Bloc Concurrency with restartable transformer does not cancel previous event #3349

Closed
PhilipPurwoko opened this issue May 6, 2022 · 9 comments
Assignees
Labels
question Further information is requested

Comments

@PhilipPurwoko
Copy link

Description
Bloc Concurrency with restartable transformer does not cancel previous event

Steps To Reproduce
The bloc file merchant_product_bloc.dart that listen on an event MerchantProductCartChanged and emit the state MerchantProductCartChange

class MerchantProductBloc extends Bloc<MerchantProductEvent, MerchantProductState> {
  MerchantProductBloc() : super(MerchantProductInitial()) {
    on<MerchantProductCartChanged>(_changeCart, transformer: restartable());
  }

  Future<void> _changeCart(
    MerchantProductCartChanged event,
    Emitter<MerchantProductState> emit,
  ) async {
    await Future.delayed(const Duration(milliseconds: 300));
    logInfo('Cart Changed | productId=${event.productId} qty=${event.qty}');
    emit(const MerchantProductCartChange.loading());
    try {
      await productRepository.changeCart(
        productId: event.productId,
        qty: event.qty,
        date: event.date,
      );
      emit(const MerchantProductCartChange.success());
    } catch (e) {
      emit(MerchantProductCartChange.error(msg: e.toString(), lastData: event));
    }
  }
}

The button I used to trigger the event MerchantProductCartChanged

BlocProvider.of<MerchantProductBloc>(context).add(
  MerchantProductCartChanged(
    productId: product.id!,
    initialQty: 1,
  ),
);

When I pressed the button 4 times very fast. It logs 4 times as well and call API 4 times

I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=13}  {06 May 2022 08:35:58 PM}  {INFO}
I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=14}  {06 May 2022 08:35:58 PM}  {INFO}
I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=15}  {06 May 2022 08:35:58 PM}  {INFO}
I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=16}  {06 May 2022 08:35:58 PM}  {INFO}

Expected Behavior
Should only logs one line at last event

I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=16}  {06 May 2022 08:35:58 PM}  {INFO}
@PhilipPurwoko PhilipPurwoko added the bug Something isn't working label May 6, 2022
@felangel
Copy link
Owner

felangel commented May 7, 2022

Hi @PhilipPurwoko 👋
Thanks for opening an issue!

Are you able to provide a minimal reproduction sample? Thanks 🙏

@felangel felangel self-assigned this May 7, 2022
@felangel felangel added question Further information is requested waiting for response Waiting for follow up and removed bug Something isn't working labels May 7, 2022
@YStepiuk-DOIT
Copy link

Hi @felangel !
The issue is in the on<Event>() method

        void handleEvent() async {
          void onDone() {
            emitter.complete();
            _emitters.remove(emitter);
            if (!controller.isClosed) controller.close();
          }

          try {
            _emitters.add(emitter);
            await handler(event as E, emitter);
          } catch (error, stackTrace) {
            onError(error, stackTrace);
            rethrow;
          } finally {
            onDone();
          }
        }

Execution of handleEvent() is not handled properly. It's 'fire and forget'.
Just emitter is closed.
This is causing all started event handlers to continue doing their job, just without ability to emit new state.

Minimal code to reproduce issue:

class TestEvent {}

class TestBloc extends Bloc<TestEvent, Object> {
  TestBloc() : super(Object()) {
    on<TestEvent>(_onTestEvent, transformer: restartable());
  }

  var testCounter = 0;

  Future<void> _onTestEvent(event, emit) async {
    final counter = testCounter++;
    print('Starting test count: $counter, is emitter alive: ${!emit.isDone}');
    await Future.delayed(const Duration(seconds: 5));
    // We should reach here only for latest event
    print('Finished test count: $counter, is emitter alive: ${!emit.isDone}');
  }
}

void main() {
  final bloc = TestBloc();
  bloc.listen(null);
  bloc.add(TestEvent());
  bloc.add(TestEvent());
  bloc.add(TestEvent());
}
// Expected to print:
// Starting test count: 0, is emitter alive: true
// Starting test count: 1, is emitter alive: true
// Starting test count: 2, is emitter alive: true
// Finished test count: 2, is emitter alive: true

// Actually prints:
// Starting test count: 0, is emitter alive: true
// Starting test count: 1, is emitter alive: true
// Starting test count: 2, is emitter alive: true
// Finished test count: 0, is emitter alive: false
// Finished test count: 1, is emitter alive: false
// Finished test count: 2, is emitter alive: true

@YStepiuk-DOIT
Copy link

The fix would be to save StreamSubscription for await handler(event as E, emitter); and cancel it in StreamController

@felangel
Copy link
Owner

Hi @YStepiuk
This is because Futures aren't truly cancelable. To get the behavior you're describing you can simply check if emit.isDone is true before performing any expensive computations:

import 'package:bloc/bloc.dart';
import 'package:bloc_concurrency/bloc_concurrency.dart';

class TestEvent {}

class TestBloc extends Bloc<TestEvent, Object> {
  TestBloc() : super(Object()) {
    on<TestEvent>(_onTestEvent, transformer: restartable());
  }

  var testCounter = 0;

  Future<void> _onTestEvent(event, emit) async {
    final counter = testCounter++;
    print('Starting test count: $counter, is emitter alive: ${!emit.isDone}');
    await Future.delayed(const Duration(seconds: 5));
    if (emit.isDone) return;
    // We should reach here only for latest event
    print('Finished test count: $counter, is emitter alive: ${!emit.isDone}');
  }
}

void main() {
  final bloc = TestBloc();
  bloc.stream.listen(null);
  bloc.add(TestEvent());
  bloc.add(TestEvent());
  bloc.add(TestEvent());
}
// Actually prints:
// Starting test count: 0, is emitter alive: true
// Starting test count: 1, is emitter alive: true
// Starting test count: 2, is emitter alive: true
// Finished test count: 2, is emitter alive: true

This is also related to #3069

@felangel
Copy link
Owner

felangel commented Jun 6, 2022

Closing for now since there don't appear to be any actionable next steps -- this is working as expected. Feel free to comment with any outstanding questions and I'm happy to continue the conversation 👍

@felangel felangel closed this as completed Jun 6, 2022
@felangel felangel removed the waiting for response Waiting for follow up label Jun 6, 2022
@politebarista
Copy link

politebarista commented Sep 28, 2022

Hello @felangel!
First of all, thank you so much for everything you do for the Flutter community 💙
I recently ran into a similar problem and your solution helped, but your solution (checking emit.isDone) also works without specifying restartable. I'm sorry if the question seems trivial to you, but then why do we need a restartable in bloc_concurrency at all? How does it help?

@felangel
Copy link
Owner

Hello @felangel! First of all, thank you so much for everything you do for the Flutter community 💙 I recently ran into a similar problem and your solution helped, but your solution (checking emit.isDone) also works without specifying restartable. I'm sorry if the question seems trivial to you, but then why do we need a restartable in bloc_concurrency at all? How does it help?

Thanks for the kind words! The advantage of using restartable is you can disregard the results of previous events. This is important when you don't have expensive operations and you want the user to always see the results of their most recent actions. Hope that helps 👍

@vasilich6107
Copy link

vasilich6107 commented Jun 24, 2023

Hi @felangel
Could you clarify where to put this if (emit.isDone) return; in case of await emit.forEach?

@vasilich6107
Copy link

Im trying to implement the approach from this article
https://verygood.ventures/blog/how-to-use-bloc-with-streams-and-concurrency

but the restartable processes all calls
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

5 participants