Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix for the issue (#121) #124

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Note

## v0.10.0

- Allows parallel processing to use the main thread `ExecutionContext`. ([#121](https://github.com/batch-dart/batch.dart/issues/121))

## v0.9.0

- Improved schedule checking process. ([#118](https://github.com/batch-dart/batch.dart/issues/118))
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ The above example is a very simple, and so you should refer to other documents a

`Batch.dart` supports powerful parallel processing and is easy to define.

When defining parallel processing, all you have to do is just inherit from `ParallelTask` and describe the process you want to parallelize in the `invoke` method.
When defining parallel processing, all you have to do is just inherit from `ParallelTask` and describe the process you want to parallelize in the `execute` method.

`SharedParameters` and `JobParameters` set in the main thread can be referenced through `ExecutionContext`. However, note that under the current specification, changes to the `ExecutionContext` value during parallel processing are not reflected in the main thread's `ExecutionContext`.

**_Example_**

Expand Down Expand Up @@ -179,7 +181,7 @@ void main() => BatchApplication()

class DoHeavyTask extends ParallelTask<DoHeavyTask> {
@override
FutureOr<void> invoke() {
FutureOr<void> execute(ExecutionContext context) {
int i = 0;
while (i < 10000000000) {
i++;
Expand Down
2 changes: 1 addition & 1 deletion example/example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class RetryTask extends Task<RetryTask> {

class TestParallelTask extends ParallelTask<TestParallelTask> {
@override
FutureOr<void> invoke() {
FutureOr<void> execute(ExecutionContext context) {
int i = 0;
while (i < 10000000000) {
i++;
Expand Down
2 changes: 1 addition & 1 deletion lib/src/banner/default_banner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DefaultBanner implements Banner {
║ ╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╰━━╯ ║
║ ║
╠═════════════════════════════════════════════════════════════════════════╣
║ Version : ${Version.current}
║ Version : ${Version.current} ║
║ License : BSD 3-Clause ║
║ Author : Kato Shinya (https://github.com/myConsciousness) ║
╚═════════════════════════════════════════════════════════════════════════╝
Expand Down
18 changes: 7 additions & 11 deletions lib/src/job/event/parallel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import 'dart:async';
// Project imports:
import 'package:batch/src/job/context/execution_context.dart';
import 'package:batch/src/job/event/event.dart';
import 'package:batch/src/job/parallel/parallel_executor.dart';
import 'package:batch/src/job/task/parallel_task.dart';

class Parallel extends Event<Parallel> {
Expand All @@ -22,24 +21,21 @@ class Parallel extends Event<Parallel> {
Function(ExecutionContext context, dynamic error, StackTrace stackTrace)?
onError,
Function(ExecutionContext context)? onCompleted,
}) : super(
}) : _tasks = tasks,
super(
name: name,
precondition: precondition,
onStarted: onStarted,
onSucceeded: onSucceeded,
onError: onError,
onCompleted: onCompleted,
) {
for (final task in tasks) {
_executors.add(ParallelExecutor(parallelTask: task));
}
}
);

/// The parallel executors
final List<ParallelExecutor> _executors = [];
/// The parallel tasks
final List<ParallelTask> _tasks;

/// Returns the copied executors.
List<ParallelExecutor> get executors => List.from(_executors);
/// Returns the copied tasks.
List<ParallelTask> get tasks => List.from(_tasks);

@override
@Deprecated('not supported operation and always UnsupportedError throws')
Expand Down
25 changes: 21 additions & 4 deletions lib/src/job/launcher/parallel_launcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'package:async_task/async_task.dart';
// Project imports:
import 'package:batch/batch.dart';
import 'package:batch/src/job/launcher/launcher.dart';
import 'package:batch/src/job/parallel/parallel_executor.dart';

class ParallelLauncher extends Launcher<Parallel> {
/// Returns the new instance of [ParallelLauncher].
Expand All @@ -27,9 +28,11 @@ class ParallelLauncher extends Launcher<Parallel> {
Future<void> run() async => await super.executeRecursively(
event: _parallel,
execute: (parallel) async {
final executors = _buildExecutor(parallel.tasks);

final asyncExecutor = AsyncExecutor(
parallelism: parallel.executors.length,
taskTypeRegister: () => parallel.executors,
parallelism: executors.length,
taskTypeRegister: () => executors,
logger: (String type, dynamic message,
[dynamic error, dynamic stackTrace]) =>
info(message),
Expand All @@ -38,10 +41,10 @@ class ParallelLauncher extends Launcher<Parallel> {
asyncExecutor.logger.enabled = true;

try {
final executions = asyncExecutor.executeAll(parallel.executors);
final executions = asyncExecutor.executeAll(executors);
await Future.wait(executions);

for (final executor in parallel.executors) {
for (final executor in executors) {
for (final isolatedMessage in executor.result!) {
isolatedMessage.output();
}
Expand All @@ -51,4 +54,18 @@ class ParallelLauncher extends Launcher<Parallel> {
}
},
);

List<ParallelExecutor> _buildExecutor(final List<ParallelTask> tasks) {
final executors = <ParallelExecutor>[];
for (final task in tasks) {
executors.add(
ParallelExecutor(
parallelTask: task,
context: context,
),
);
}

return executors;
}
}
14 changes: 9 additions & 5 deletions lib/src/job/parallel/parallel_executor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ import 'package:batch/src/job/task/parallel_task.dart';

class ParallelExecutor extends AsyncTask<String, List<IsolatedLogMessage>> {
/// Returns the new instance of [ParallelExecutor].
ParallelExecutor({required this.parallelTask});
ParallelExecutor({
required this.parallelTask,
required this.context,
});

/// The parallel task
final ParallelTask parallelTask;

/// The context from main thread.
final ExecutionContext context;

@override
AsyncTask<String, List<IsolatedLogMessage>> instantiate(String parameters,
[Map<String, SharedData>? sharedData]) {
Expand All @@ -33,14 +39,12 @@ class ParallelExecutor extends AsyncTask<String, List<IsolatedLogMessage>> {

@override
FutureOr<List<IsolatedLogMessage>> run() async {
final context = ExecutionContext();

try {
await parallelTask.execute(context);
await parallelTask.invoke(context);
} catch (e) {
rethrow;
}

return context.stepParameters['isolatedLogMessages'];
return context.jobParameters['_internalIsolatedLogMessages'];
}
}
20 changes: 0 additions & 20 deletions lib/src/job/parameter/parameter.dart

This file was deleted.

39 changes: 11 additions & 28 deletions lib/src/job/parameter/parameters.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,27 @@
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided the conditions.

// Project imports:
import 'package:batch/src/job/parameter/parameter.dart';

class Parameters {
/// The objects
final _objects = <Parameter>[];
final _objects = <String, dynamic>{};

/// Returns the parameter value associated with [key].
dynamic operator [](final String key) {
if (!contains(key)) {
throw ArgumentError('There is no parameter associated with [key=$key].');
}

for (final parameter in _objects) {
if (parameter.key == key) {
return parameter.value;
}
}
}
dynamic operator [](final String key) => _objects[key];

/// Adds [value] as a parameter associated with [key].
void operator []=(final String key, final dynamic value) => _objects
..removeWhere((parameter) => parameter.key == key)
..add(Parameter(key: key, value: value));
void operator []=(final String key, final dynamic value) =>
_objects[key] = value;

/// Removes all parameters.
void removeAll() => _objects.removeRange(0, _objects.length);
void removeAll() => _objects.clear();

/// Returns true if this object has [key] passed as an argument, otherwise false.
bool contains(final String key) {
for (final parameter in _objects) {
if (parameter.key == key) {
return true;
}
}

return false;
}
bool contains(final String key) => _objects.containsKey(key);

/// Applies [action] to each key/value pair of the map.
/// Calling action must not add or remove keys from the map.
void forEach(void Function(String key, dynamic value) action) =>
_objects.forEach((key, value) => action(key, value));

// Returns true if this object has no parameter, otherwise false.
bool get isEmpty => _objects.isEmpty;
Expand Down
10 changes: 3 additions & 7 deletions lib/src/job/task/parallel_task.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@ import 'package:batch/src/job/parallel/parallel_task_support.dart';

abstract class ParallelTask<T extends Task<T>> extends Task<T>
with ParallelTaskSupport {
@override
FutureOr<void> execute(ExecutionContext context) async {
Future<void> invoke(ExecutionContext context) async {
try {
await invoke();
await execute(context);
} catch (e) {
rethrow;
}

// ignore: invalid_use_of_visible_for_overriding_member
context.stepParameters['isolatedLogMessages'] = isolatedMessages;
context.jobParameters['_internalIsolatedLogMessages'] = isolatedMessages;
}

/// Invokes piece of parallel processing.
FutureOr<void> invoke();
}
5 changes: 2 additions & 3 deletions lib/src/log/logger.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import 'package:batch/src/log/printer/log_printer.dart';

class Logger {
/// Returns the new instance of [Logger].
Logger.loadFrom({
required LogConfiguration config,
}) : _filter = config.filter ?? DefaultLogFilter(),
Logger.loadFrom({required LogConfiguration config})
: _filter = config.filter ?? DefaultLogFilter(),
_printer = config.printer ?? DefaultLogPrinter(),
_output = config.output ?? ConsoleLogOutput(),
_printLog = config.printLog {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/version/version.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ abstract class Version {
factory Version() => _Version();

/// The current version
static const current = '0.9.0';
static const current = '0.10.0';

/// Returns the version status.
Future<VersionStatus> get status;
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: batch
description: A lightweight and powerful Job Scheduling Framework written in Dart. With this framework, you can easily develop a job scheduling and batch program in Dart.
version: 0.9.0
version: 0.10.0
homepage: https://github.com/batch-dart/batch.dart

environment:
Expand Down
2 changes: 1 addition & 1 deletion test/src/banner/default_banner_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const _builtBanner = '''
║ ╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╱╰━━╯ ║
║ ║
╠═════════════════════════════════════════════════════════════════════════╣
║ Version : 0.9.0
║ Version : 0.10.0
║ License : BSD 3-Clause ║
║ Author : Kato Shinya (https://github.com/myConsciousness) ║
╚═════════════════════════════════════════════════════════════════════════╝
Expand Down
9 changes: 5 additions & 4 deletions test/src/job/event/parallel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'dart:async';
import 'package:test/test.dart';

// Project imports:
import 'package:batch/src/job/context/execution_context.dart';
import 'package:batch/src/job/event/parallel.dart';
import 'package:batch/src/job/task/parallel_task.dart';

Expand All @@ -21,10 +22,10 @@ void main() {
);

expect(parallel.name, 'Test Parallel');
expect(parallel.executors.length, 4);
expect(parallel.tasks.length, 4);

for (final executor in parallel.executors) {
expect(executor.parallelTask, task);
for (final task in parallel.tasks) {
expect(task, task);
}
});

Expand Down Expand Up @@ -65,5 +66,5 @@ void main() {

class _ParallelTask extends ParallelTask<_ParallelTask> {
@override
FutureOr<void> invoke() {}
FutureOr<void> execute(ExecutionContext context) {}
}
11 changes: 7 additions & 4 deletions test/src/job/parallel/parallel_executor_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import 'dart:async';
import 'package:test/test.dart';

// Project imports:
import 'package:batch/src/job/context/execution_context.dart';
import 'package:batch/src/job/parallel/parallel_executor.dart';
import 'package:batch/src/job/task/parallel_task.dart';

void main() {
test('Test ParallelExecutor', () {
final task = _ParallelTask();
final executor = ParallelExecutor(parallelTask: task);
final executor =
ParallelExecutor(parallelTask: task, context: ExecutionContext());

expect(executor.parallelTask, task);
expect(executor.instantiate(''), executor);
Expand All @@ -25,7 +27,8 @@ void main() {

test('Test ParallelExecutor with error', () {
final task = _ParallelTaskWithError();
final executor = ParallelExecutor(parallelTask: task);
final executor =
ParallelExecutor(parallelTask: task, context: ExecutionContext());

expect(executor.parallelTask, task);
expect(executor.instantiate(''), executor);
Expand All @@ -39,12 +42,12 @@ void main() {

class _ParallelTask extends ParallelTask<_ParallelTask> {
@override
FutureOr<void> invoke() {}
FutureOr<void> execute(ExecutionContext context) {}
}

class _ParallelTaskWithError extends ParallelTask<_ParallelTaskWithError> {
@override
FutureOr<void> invoke() {
FutureOr<void> execute(ExecutionContext context) {
throw UnimplementedError('success');
}
}
Loading