An Asynchronous Queue Implementation with Synchronous / Asynchronous Enqueueing of Synchronous / Asynchronous Jobs for JS / TS (Node / Deno)
AQ is a lightweight asynchronous queue implementation with no dependencies. You may enqueue
synchronous or asynchronous items either synchronously or asynchronously.
As of in it's current state (v0.4.1) AQ is still under development phase with a lot of console.log()
s to display resolutions and rejections. Besides, at this phase of development it's not guaranteed that a new version to be backward compatible. So it may not be safe to use AQ in production code unless you wish to delete all console.log()
statements yourself and stick with a certain version. Also please keep in mind that AQ is based upon modern ES2019 (ES10) features like Private Class Fields and Async Iterators, make sure that you have the right environment. AQ is tested with Deno 1.92+ and should also be fine with Node v12+, Chrome 74+, Edge 79+.
- AQ is a "kind of relaxed" FIFO queue structure which can take both asynchronous and synchronous items at the same time. Sync or async, all
enqueue
d items are wrapped by a promise (outer promise). A resolution of the inner promise (enqueue
d item) triggers the previous outer promise in the queue to be resolved. Since the previous inner promise is doing the same thing, this interlaced async chaining mechanism forms the basis of an uninterrupted continuum. Such as when the queue becomes empty (when all inner promises are depleted) there is still one outer promise yielded at the tip of the queue, awaiting for a resolution or rejection. AQ will remain there, keeping the queue alive up until you.kill()
AQ abrubtly. At the meantime you may safelyenqueue
new items asynchronously regardless the queue had become empty or not. - The item at the head of the queue gets automatically dequeued once it resolves or instantly if it's already in the resolved state. Under normal operation all other items in the queue must wait until they become the head to be dequeued. So there is no
dequeue
method in AQ at all. - AQ can also be used as a race machine. In
raceMode = true
case all pending items up to the the first resolving promise get wiped out to allow the winner item to dequeue. However unlikePromise.race()
the itemsenqueue
d after the winner will remain in the queue. This is particularly so since the ones coming after the winner might have been asynchronously enqueued at a later time. The late ones should be granted with their chances. - In the basic operation rejections are handled inside the queue silently. This also means while consuming from AQ instances you don't need to deploy a
try
&catch
functionality. However in order to capture the rejections you can watch them by registering an eventlistener function to the"error"
event to see why and which promise was rejected. - There are four events those can be listened by eventlistener functions such as
"next"
,"error"
,"reply"
and"empty"
. The eventlistener functions can be added / removed freely at any time. Every event can hold multiple eventlistener functions. The eventlistener functions can also be anonymous and they can still be removed because all eventlistener functions are assigned with a unique id. - Once an AQ instance is initiated it will remain being available for asynchronous
enqueue
ing even if in time it becomes empty. So you may keep it alive indefinitelly or just.kill()
if it's no longer needed. - Being an async iterator, AQ instances are ideally consumed by a
for await of
loop.
Just add
import {AQ} from "https://gitlab.com/Redu/aq/-/raw/v0.4.1/mod.ts";
to your script. To test AQ in your projects please follow the instructions down here.
The AQ constructor takes an options argument and simply called as,
var aq = new AQ(opts);
and consumed like
async function getAsyncValues(aq){
for await (let item of aq){
console.log(`The Promise at endpoint resolved with a value of "${item}"`);
};
};
The options object argument (opts
above) could take the following shape
var opts = { timeout : 200
, clearMode: "soft"
, raceMode : false
};
-
timeout
: Since AQ instances getdequeue
d automatically one thing that we shall avoid is to have an indefinitelly pending promise in the queue. Thetimeout
property is ofNumber
type and defines a duration in ms.timeout
should always be provided unless you are sure that allenqueue
d items are either synchronous or promises those will certainly resolve or reject within a reasonable time. -
clearMode
: AQ instances have a.clear(clearMode)
method used to clear the queue at any time.clearMode
can take only two values.<"hard" | "soft">
."hard"
clears all the items in the queue regardless their state."soft"
clears all the items in the queue except for the ones in resolved state.
-
raceMode
: Boolean<true | false>
option is used to switch the queue into the race mode. In race mode the queue is cleared only upto the first resolving item in the queue. Once the first resolving item is dequeued the queue now contains only the recent items thoseenqueue
d after the resolving item and remains available for furtherenqueue
ing operations.
As of v0.4.1 the following methods are available
-
.enqueue(item)
: Inserts an item to the end of the queue and increments thesize
of the queue. The return value is apanel
object. Thepanel
object has three properties and a method as followsitem
: A reference to the enqueued item itself.state
: Shows the current state of the item in the queue such as"pending"
,"resolved"
,"rejected"
or"aborted"
.id
: An ID to the item which can be checked against thepid
property of theError
object caught at the"error"
eventlistener function. This can be useful to retry a particular async call or whatnot..abort()
: Method is used to manually abort a pending item prematurely whenever needed. An aborted item will not be dequeued.
-
.clear("hard" | "soft" | "upto", [targetId])
: Depending on the provided argument clears the queue accordingly;"hard"
: Clears the queue completelly."soft"
: Clears the queue but leaves the already resolved items"upto"
: Clears the queue up to the item with theid
maching the provided optionaltargetId
argument. The items coming after are kept.
The return value is the current AQ instance.
-
.flush()
: Similar to.clear("hard")
but returns an array of items those are in resolved or pending states. This can be used to prematurely clear the queue and apply the remaining resolved or pending items to standardPromise
methods like.all()
,.race()
or.any()
etc. -
.on("event")
: Adds or removes eventlisteners. You can add multiple eventlisteners per event. AQ instances can take four event types"next"
event is fired per successfull yielding of a pending or an already resolved item at the head of the queue. Some pending items at the head might of course get rejected and the"next"
event won't fire for rejections. The uniqueid
of the resolving promise is passed to the eventhandler."error"
event is fired once an item in the queue gets rejected. Anerror
object is passed to the event handler. Theerror
object can take the shape as follows;
{ name : "Timeout" // name of the error , message: "Promise timed out before fullfillment" // descrtiption , pid : "D8VJQ7ZMIDA" // the ID of the rejected or aborted promise }
"empty"
event is fired whenever the queue becomes empty."reply"
event is fired whenever a promise in the queue gets resolved and areply
object is passed to the eventhandler. Thereply
object has two properties."value"
which holds the resolved value and"pid"
which holds the uniqueid
of the resolving promise.
When invoked, the
.on("event")
returns an object with two methods..do(f)
and.forget(id)
whereasf
is a function andid
is a unique id string..on("event").do(f)
: Such asvar id = aq.on("error").do(e => doSomethingWith(e));
. The return value will be a Unique Id String like"4E34SIO5X56"
even if your have provided an anonymous function. This Unique Id String can be saved to remove a particular eventlistener at a later time..on("event").forget(id)
: Such asaq.on("error").forget("4E34SIO5X56");
which will, if exists, remove the eventlistener function with the correspoing ID string from the eventlisteners list of that particular event. The return value is eithertrue
orfalse
depending on the outcome.
As of v0.4.1, AQ instances have only one read only property which is .size
that gives you the number of items in the queue.
AQ is a very lightweight tool but at the same time like a Swiss Army Knife, it will allow you to perform many interesting tasks easily. It doesn't offer a big API just because I want to keep it as simple as possible while being functional. This means, you may easily extend AQ's functionalities by using it's availabe methods cleverly. Having said that, there already exists many built in asynchronous capabilities in JS/TS language so you should consider using them in the first place. However only when some exceptional cases arise where the naked Promises are not sufficient then you may consider using AQ. The point being, all Promise methods are supplied with asynchronous tasks synchronously, while AQ can always be enqueued
with asynchronous tasks asynchronously whenever you have something to enqueue
.
Let us start with the case where you provide your asynchronous tasks synchronously.
-
Sync Input - Async Output Task Queue
This is the basic operation where we
enqueue
the async tasks synchronously. In this case the main differences compared to Promise methods are;- The whole system won't collapse all at once when a Promise fails.
- You have full control on aborting at a certain
timeout
value. - You have full control on taking an action such as retrying by using the
"error"
event.
In other words you simply
enqueue
the Promises sequentially and then;- Some of them may timeout and you may not care.
- You may register an
"error"
eventlistener to take the necessary action such as retrying in case a Promise in AQ fails. - You may register a
"next"
eventlistener to take necessary action once a specific Promise resolves. Perhaps you may like to do aaq.clear("hard")
in the eventlisterner function and abort the remaining promises.
Assuming that we already have an array of multiple
urls
to fetch from an API, a simpleretry
for 5 times errorhandler may look like;var aq = new AQ({timeout: 80}), tryCnt = 5, panels = [], retry = e => { const ix = panels.findIndex(p => p.id === e.pid), tc = panels[ix].tryCnt; ix >= 0 && tc && ( panels[ix] = aq.enqueue(fetch(urls[ix])) , panels[ix].tryCnt = tc - 1 , console.log(`Retry #${(tryCnt-panels[ix].tryCnt).toString().padStart(2," ")} for "${urls[ix]}"`) ); }; getAsyncValues(aq); aq.on("error").do(retry); // Literature BITCH..! panels = urls.map(url => { const panel = aq.enqueue(fetch(url)); panel.tryCnt = tryCnt; return panel; });
In the above
retry
attempt we are utilizing the AQ functionalities at hand such as the"error"
eventlistener and thepanel
object however in future releases AQ may employ a built in retry functionality. This is also a good place to point out some importantfetch()
API rejection cases. Keep in mind that thefetch()
API rejections are only limited with network errors and all server side errors resolve within the response object. You should check theok
property of the receivedresponse
object in the consumingfor await
loop, in a similar manner to the following;async function getAsyncValues(aq){ for await (const res of aq){ res.ok ? res.json() .then(json => doSomethingWith(json)) : handleServerError(res); } console.log("The stream is finalized"); }
Also not all JS/TS asynchronous functionalities serve you with Promises. One example could be the Workers API. You may easily promisify your worker tasks like
var wp = new Promise((v,x) => ( firstWorker.onmessage(v) , firstWorker.onmessageerror(x) ));
-
Async Input - Async Output Task Queue
Say you want to make Short Polling requests to an API once in every 500ms and you would like to use the first resolving response. Think like, the request that you made @0ms happens to resolve @1000ms and for some reason the one that you make @500ms resolves @900ms. You are interested in the second one since it gives you the most fresh state from the API. At this point you no longer need the first request and it's best to get rid of it. Then just add to your
options
object{raceMode: true}
and keepenqueue
ing your requests (polling) indefinitely at every 500ms. You will get the most fresh resolutions continusously and the previouslyenqueue
d slower ones will be wiped out. Awesome..!
AQ is being developed on Deno. So I would advise you to install and use Deno, at least for your experiments. Working on Deno projects, Denon is the tool which does the job what Nodemon does in Node. So go ahead and simply install Denon too as shown in it's page. The following is the scripts.json
file used by Denon in this project.
{
"$schema": "https://deno.land/x/denon@2.4.7/schema.json",
"scripts": {
"start": {
"cmd": "deno run app.js",
"desc": "run my app.js file"
},
"test": {
"cmd": "deno run --inspect ./test/test-async-queue.js",
"desc": "run the test file"
},
"race": {
"cmd": "deno run ./test/test-AQ-race.js",
"desc": "run the race test"
},
"retry": {
"cmd": "deno run --allow-net ./test/test-AQ-retry.js",
"desc": "run the retry test"
}
}
}
Now in the project folder run the tests like
# to run test-async-queue.js
$ denon test
# to run test-Aq-race.js
$ denon race
# to run test-AQ-retry.js
$ denon retry
and play with their source code as you like.
- Implementing an AQ native interface for Fetch API to include auto retry functionality etc.
- Implementing an AQ native interface for Web Workers API.
- Implementing an AQ native interface for Broadcast Channel API.
I hope to have PRs inline with the fancy wide indenting of this code or i will have to rephrase them and it will make me a dull boy. Also, if you may, please pay attention to the followings,
- We are not using any
if
clauses unless it is essential like in the case ofthrow
ing errors. Please try to use ternary with proper indenting instead. - For single choice conditionals please try to use shortcircuits.
- If you have multiple instructions to do after the conditional then use the comma operator to group them like.
ifTrue && ( firstDoThis
, thenDoThis
, andReturnThis
);
- Use arrow functions whenever possible.
- Any bleeding edge JS/TS functionalities are welcome if need be. AQ is not thought to be backward compatible.
Copyright© 2021, Redu. Relesed under GNU General Public License v3.0