Skip to content

Flutter 任务调度。支持运行中取消/暂停并保留状态/持续发送状态数据、更改优先级、自动持久化、根据标识符重用或取消、复用Isolate。

License

Notifications You must be signed in to change notification settings

cezres/task_manager

Repository files navigation

Task Manager

Task Manager is a tool for managing and scheduling task execution, designed to simplify and optimize the execution of asynchronous tasks.

Task Manager Example Web Page

Features

  • Cancel and pause tasks that are currently running
  • Pause the task and preserve its state
  • Change the priority of tasks
  • Automatically persist and restore task states
  • Send progress data while tasks are running
  • Reuse or cancel tasks with the same identifier
  • Reusable isolate

Getting started

Add the following dependencies to your pubspec.yaml file:

task_manager:
    git:
        url: https://github.com/cezres/task_manager.git
        ref: main

Usage

Run a task

class ExampleOperation extends Operation<int, String> {
  const ExampleOperation();

  @override
  FutureOr<Result<int, String>> run(OperationContext<int, void> context) async {
    await Future.delayed(const Duration(seconds: 1));
    return Result.completed('Hello World - ${context.data}');
  }
}

void example() async {
  final worker = Worker();
  worker.maxConcurrencies = 2;
  final task = worker.run(const ExampleOperation(), 1);
  await task.wait(); // 'Hello World - 1'
}

Run a task in isolate

class ExampleComputeOperation extends ExampleOperation {
  const ExampleComputeOperation();

  /// Mark this task to be executed in the background.
  @override
  bool get compute => true;
}

Emit task progress

class CountdownOperation extends Operation<int, void> {
  const CountdownOperation();

  @override
  FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
    int data = context.data;
    while (data > 0) {
      await Future.delayed(const Duration(milliseconds: 200));
      data -= 1;

      /// Emit task progress data
      context.emit(data);
    }
    return Result.completed();
  }
}

void example() async {
  final worker = Worker();
  final task = worker.run(const CountdownOperation(), 10);
  task.stream.map((event) => event.data).distinct().listen((event) {
    debugPrint('Data: $event'); /// 9 8 7 6 5 4 ....
  });
  await task.wait();
}

Pause task

class PauseableCountdownOperation extends CountdownOperation {
  const PauseableCountdownOperation();

  @override
  FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
    int data = context.data;
    while (data > 0) {
      await Future.delayed(const Duration(milliseconds: 200));
      data -= 1;

      /// Check if a task should be paused.
      if (context.shouldPause) {
        /// Pause the task and preserve its state.
        return Result.paused(data);
      } else {
        context.emit(data);
      }
    }
    return Result.completed();
  }
}

void example() async {
    final worker = Worker();
    final task = worker.run(const CountdownOperation(), 10, isPaused: true);
    expect(task.status, TaskStatus.paused);

    task.resume();
    expect(task.status, TaskStatus.running);
    await Future.delayed(const Duration(milliseconds: 400));

    task.pause();
    await Future.delayed(const Duration(milliseconds: 400));
    expect(task.data < 10, true);
    expect(task.status, TaskStatus.paused);

    task.resume();
    expect(task.status, TaskStatus.running);

    await task.wait();
    expect(task.status, TaskStatus.completed);
}

Cancel task

class CancellableCountdownOperation extends CountdownOperation {
  const CancellableCountdownOperation();

  @override
  FutureOr<Result<int, void>> run(OperationContext<int, void> context) async {
    int data = context.data;
    while (data > 0) {
      await Future.delayed(const Duration(milliseconds: 200));
      data -= 1;

      /// Check if a task should be cancelled.
      if (context.shouldCancel) {
        /// Cancel the task.
        return Result.canceled();
      } else {
        context.emit(data);
      }
    }
    return Result.completed();
  }
}

void example() async {
    final worker = Worker();
    final task = worker.run(const CountdownOperation(), 10);

    await Future.delayed(const Duration(milliseconds: 400));
    task.cancel();

    await task.wait().onError((error, stackTrace) {
      debugPrint('Error: $error');
    }).whenComplete(() {
      expect(task.status, TaskStatus.canceled);
    });
}

Run a hydrated task

class CountdownHydratedOperation extends CountdownOperation
    implements HydratedOperation<int, void> {
  const CountdownHydratedOperation();

  @override
  int fromJson(json) {
    return json;
  }

  @override
  toJson(int data) {
    return data;
  }
}

void example() {
    final storage = CustomStorage();
    final worker = HydratedWorker(storage: storage, identifier: 'test');
    worker.register(() => const CountdownHydratedOperation());

    final task = worker.run(const CountdownHydratedOperation(), 10);
    await task.wait();
}

About

Flutter 任务调度。支持运行中取消/暂停并保留状态/持续发送状态数据、更改优先级、自动持久化、根据标识符重用或取消、复用Isolate。

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published