Last updated:
0 purchases
async task
async_task #
This package brings asynchronous tasks and parallel executors (similar to classic thread pools) for all Dart platforms
(JS/Web, Flutter, VM/Native) through transparent internal implementations, based on dart:isolate or
only dart:async, without having to deal with the Isolate complexity.
Motivation #
Dart parallelism is based on asynchronous, non-blocking and thread-safe code. This creates a language that facilitates
the creation of safe concurrency code, but all running in the same thread, using only 1 thread/core of a device.
If you want to use more than 1 thread/core of a device, Dart VM/Native has Isolate, but the paradigm is not easy to
use like classical Thread Pools, or like environments that shares all the memory/objects between threads.
This package was created to facilitate the creation and execution of multiple tasks asynchronously in all Dart
Platforms, avoiding platform specific codes by the developer.
Usage #
import 'dart:async';
import 'package:async_task/async_task.dart';
void main() async {
// The tasks to execute:
var tasks = [
PrimeChecker(8779),
PrimeChecker(1046527),
PrimeChecker(3139581), // Not Prime
PrimeChecker(16769023),
];
// Instantiate the task executor:
var asyncExecutor = AsyncExecutor(
sequential: false, // Non-sequential tasks.
parallelism: 2, // Concurrency with 2 threads.
taskTypeRegister: _taskTypeRegister, // The top-level function to register the tasks types.
);
// Enable logging output:
asyncExecutor.logger.enabled = true ;
// Execute all tasks:
var executions = asyncExecutor.executeAll(tasks);
// Wait tasks executions:
await Future.wait(executions);
for (var task in tasks) {
var n = task.n; // Number to check for prime.
var prime = task.result; // Task result: true if is prime.
print('$n\t-> $prime \t $task');
}
}
// This top-level function returns the tasks types that will be registered
// for execution. Task instances are returned, but won't be executed and
// will be used only to identify the task type:
List<AsyncTask> _taskTypeRegister() => [PrimeChecker(0)];
// A task that checks if a number is prime:
class PrimeChecker extends AsyncTask<int, bool> {
// The number to check if is prime.
final int n;
PrimeChecker(this.n);
// Instantiates a `PrimeChecker` task with `parameters`.
@override
AsyncTask<int, bool> instantiate(int parameters) {
return PrimeChecker(parameters);
}
// The parameters of this task:
@override
int parameters() {
return n;
}
// Runs the task code:
@override
FutureOr<bool> run() {
return isPrime(n);
}
// A simple prime check function:
bool isPrime(int n) {
if (n < 2) return false;
var limit = n ~/ 2;
for (var p = 2; p <= limit; ++p) {
if (n % p == 0) return false;
}
return true;
}
}
copied to clipboard
Output:
[INFO] Starting AsyncExecutor{ sequential: false, parallelism: 2, executorThread: _AsyncExecutorMultiThread{ totalThreads: 2, queue: 0 } }
[INFO] Starting _AsyncExecutorMultiThread{ totalThreads: 2, queue: 0 }
[INFO] Created _IsolateThread{ id: 2 ; registeredTasksTypes: [PrimeChecker] }
[INFO] Created _IsolateThread{ id: 1 ; registeredTasksTypes: [PrimeChecker] }
8779 -> true PrimeChecker(8779)[finished]{ result: true ; executionTime: 2 ms }
1046527 -> true PrimeChecker(1046527)[finished]{ result: true ; executionTime: 2 ms }
3139581 -> false PrimeChecker(3139581)[finished]{ result: false ; executionTime: 0 ms }
16769023 -> true PrimeChecker(16769023)[finished]{ result: true ; executionTime: 35 ms }
copied to clipboard
parallelism #
Note that the parameter parallelism at AsyncExecutor shouldn't be greater than the CPU cores
(unless you really have a case that justifies a higher number).
The most common error is to put a high number for the parameter parallelism thinking that this will increase tasks
execution, but this actually can reduce your application performance. Note that each allocated Isolateneeds a CPU
core to execute, and a high number of Isolate instances running tasks will be a bottleneck in your system
(specially if you are demanding more Isolate than CPU cores).
SharedData #
The class SharedData facilitates and optimizes data shared between tasks.
The main advantage of data encapsulated with SharedData is to avoid multiple
messages, with the same data, between threads/isolates, avoiding concurrency
performance issues and multiple duplicated objects in memory (a GC bottleneck).
Here's an example of a task using SharedData:
// A task that checks if a number is prime:
class PrimeChecker extends AsyncTask<int, bool> {
// The number to check if is prime.
final int n;
// A list of known primes, shared between tasks.
final SharedData<List<int>, List<int>> knownPrimes;
PrimeChecker(this.n, this.knownPrimes);
// Instantiates a `PrimeChecker` task with `parameters` and `sharedData`.
@override
PrimeChecker instantiate(int parameters, [Map<String, SharedData>? sharedData]) {
return PrimeChecker(
parameters,
sharedData!['knownPrimes'] as SharedData<List<int>, List<int>>,
);
}
// The `SharedData` of this task.
@override
Map<String, SharedData> sharedData() => {'knownPrimes': knownPrimes};
// Loads the `SharedData` from `serial` for each key.
@override
SharedData<List<int>, List<int>> loadSharedData(String key, dynamic serial) {
switch (key) {
case 'knownPrimes':
return SharedData<List<int>, List<int>>(serial);
default:
throw StateError('Unknown key: $key');
}
}
// The parameters of this task:
@override
int parameters() {
return n;
}
// Runs the task code:
@override
FutureOr<bool> run() {
return isPrime(n);
}
// A simple prime check function:
bool isPrime(int n) {
if (n < 2) return false;
// The pre-computed primes, optimizing this checking algorithm:
if (knownPrimes.data.contains(n)) {
return true;
}
// It's sufficient to search for prime factors in the range [1,sqrt(N)]:
var limit = (sqrt(n) + 1).toInt();
for (var p = 2; p < limit; ++p) {
if (n % p == 0) return false;
}
return true;
}
}
copied to clipboard
The field knownPrimes above will be shared between tasks. In platforms with support
for dart:isolate knownPrimes will be sent through an Isolate port only once,
avoiding multiple copies and unnecessary memory allocations.
AsyncTaskChannel #
An AsyncTask can have a communication channel that can be used to send/received messages
during task execution.
To use a task channel just override channelInstantiator, than use channelResolved() inside
the task and await channel() outside it:
class YourTask extends AsyncTask<String, int> {
// ...
@override
AsyncTaskChannel? channelInstantiator() => AsyncTaskChannel(); // You can extend `AsyncTaskChannel` class if needed.
// ...
@override
FutureOr<int> run() async {
// ...
// get the resolved channel:
var channel = channelResolved()!; // The channel is always resolved inside `run()`.
// Send and wait a message in the channel:
channel.send('some message');
var result = await channel.waitMessage<String>();
// Send and wait in a single method:
var result = await channel.sendAndWaitResponse<String, String>('some message');
// Read a message if available in the queue (non-blocking):
var msgOptional = channel.readMessage<String>();
// ...
}
}
copied to clipboard
Outside communication with the task:
void main() async {
// ...
// Execute the task:
asyncExecutor.execute(task);
// ...
// Wait for the channel to be resolved:
var channel = (await task.channel())!;
// Wait for a message:
var msg = await channel.waitMessage();
// process msg...
// Send a response:
channel.send('Some response');
// ...
}
copied to clipboard
An AsyncTaskChannel is automatically closed when a task finishes (returns its result).
Source #
The official source code is hosted @ GitHub:
https://github.com/eneural-net/async_task
Features and bugs #
Please file feature requests and bugs at the issue tracker.
Package shared_map #
See also the package shared_map for a way to transparently share data/objects between Isolates.
Contribution #
Any help from the open-source community is always welcome and needed:
Have an issue?
Please fill a bug report 👍.
Feature?
Request with use cases 🤝.
Like the project?
Promote, post, or donate 😄.
Are you a developer?
Fix a bug, add a feature, or improve tests 🚀.
Already helped?
Many thanks from me, the contributors and all project users 👏👏👏!
Contribute an hour and inspire others to do the same.
Author #
Graciliano M. Passos: gmpassos@GitHub.
Sponsor #
Don't be shy, show some love, and become our GitHub Sponsor.
Your support means the world to us, and it keeps the code caffeinated! ☕✨
Thanks a million! 🚀😄
License #
Apache License - Version 2.0
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.