Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/abort listener #448

Open
wants to merge 44 commits into
base: master
Choose a base branch
from

Conversation

joshLong145
Copy link
Contributor

Adds an abortListener to workers which allow for cleanup of async tasks which can be run as a cleanup operation to allow workers to be reused if a task timeout or cancellation occurs.

connects PR #441

Copy link
Owner

@josdejong josdejong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, this looks good!

I made some inline comments, can you have a look at those?

test/workers/cleanup-abort.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
test/Pool.test.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
src/index.js Outdated Show resolved Hide resolved
test/Pool.test.js Outdated Show resolved Hide resolved
@joshLong145
Copy link
Contributor Author

joshLong145 commented Jun 17, 2024

@josdejong
After some testing of offloaded functions I think there are issues with referencing variables which are defined outside of the offloaded function scope

var workerpool = require('../');
var pool = workerpool.pool();

function asycTimeout() {
  return new Promise(function (resolve) {
    let timeout = setTimeout(function () {
        resolve();
    }, 5000);
    
    workerpool.addAbortListener(async function () {
        await new Promise((res, rej) => {
          setTimeout(res, 1000);
        });
        clearTimeout(timeout);
        resolve();
    });
  });
};

pool.exec(asycTimeout, [])

The above will error with workerpool is not defined. It does not seem like we can define an instance of workerpool within the global if using offloaded functions. I think the only way to support it would be to pass the registration handler to the function.

UPDATE:

After playing around with scopes on the function wrapper from in worker.methods.run I was able to bind addAbortListener to the function itself so it may be accessed with this.addAbortListener

worker.methods.run = function run(fn, args) {
  var f = new Function('return (' + fn + ').apply(this, arguments);');
  f.addAbortListener = function(listener) {
    worker.abortListeners.push(listener); 
  }

  return f.apply(f, args);
};

If we modify the global value to this instead of null we can then modify the f object to provide the addEventListener context. this is hacky but does now allow for a unique this context from within an offloaded function
we still have access to the global through globalThis below is an example of an offloaded function which uses the above modified run implementation

var workerpool = require('../');
var pool = workerpool.pool();

function asycTimeout() {
  var me = this;
  return new Promise(function (resolve) {
    let timeout = setTimeout(function () {
        resolve();
    }, 5000);
    console.log(me.addAbortListener, globalThis);
    me.addAbortListener(async function () {
        console.log("adasd", clearTimeout);
        clearTimeout(timeout);
        resolve();
    });
  });
};

pool.exec(asycTimeout, [])

@josdejong
Copy link
Owner

josdejong commented Jun 19, 2024

It sounds like a good idea to attach this.addAbortListener to the function itself rather than a "magic" global variable workerpool.addAbortListener 👍 . Can you make sure that it is possible to use it as follows?

const addAbortListener = this.addAbortListener
// ...
addAbortListener(...)

EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right?

@joshLong145
Copy link
Contributor Author

EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right?

Yes this makes sense to me. can make the updates and implement the tests/examples now that we have this worked out.

Since we now can extend the function created for the worker task we can create a worker api with

  • addEventListener
  • emit

So for example

this.worker.addEventListener

@joshLong145
Copy link
Contributor Author

@josdejong

Question on how errors are processed. Since cancel and timeout produce unique error types. and upon an error received over the worker rpc bridge will cause terminate to be invoked on the WorkerHandler instance. see exec on WorkerHandler for where terminateAndNotify is called. I do not think the current implementation is sufficient for properly keeping WorkerHandler instances preserved when attempting to re use workers. since the handler will ve deleted from the pool and a new worker will be created regardless on if the worker is actually cleaned up. I have added some unit tests which show that the onCreateWorker handler gets invoked multiple times when it should only have created a single worker for the lifetime of the pool.

I think for this new feature to work as intended the message protocol does need extending to account for cleanup such that handlers can be notified that the worker can survive the exception and not need cleanup.

Copy link
Owner

@josdejong josdejong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! I added a few inline comments.

About the WorkerHandler terminating the worker when receiving a cancel or timeout error: that is a very good point. Do you have ideas on how to solve this? Indeed both the worker side and the WorkerHandler side both need to know whether there is an abort listener in place or not, and adjust their behavior accordingly.

src/worker.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
@joshLong145
Copy link
Contributor Author

joshLong145 commented Jul 12, 2024

Thanks for the updates! I added a few inline comments.

About the WorkerHandler terminating the worker when receiving a cancel or timeout error: that is a very good point. Do you have ideas on how to solve this? Indeed both the worker side and the WorkerHandler side both need to know whether there is an abort listener in place or not, and adjust their behavior accordingly.

@josdejong Sorry for the delayed response.

After giving it some thought I see a possible flow for communication which will allow for proper tracking of handlers based on if a worker can be reused after the OnAbort handlers are triggered

sequenceDiagram
    participant Pool
    participant WorkerHandler
    participant Worker
    Pool->>WorkerHandler: TimeoutError/CamcelationError occures, move task with rosolver to `tracking` queue. Send a message to the worker to run cleanup with the task id
    WorkerHandler ->> Worker: Worker recieves message, execute abort handlers.
    Worker ->> WorkerHandler: Send the result of abort handler execution to the worker handler with the task id sent
    WorkerHandler ->> Pool: Check the task id for a tracking and if present either resolve or reject the resolver promise based on the data sent in the message from the worker. Cleanup the task context
    
Loading

With the above model, the resolver Promise created when exec is first called on the WorkerHandler will either resolve or reject based on the result of the message sent back from the onAbort listener execution.
Which will be looked up from a tracking queue. The pool can now have a concept of tasks which need to be tracked for potential future cleanup. Since Cleanup operations are a parallel operation which requires resource tracking a second queue seems like the most obvious way of managing the resource.

The other idea, although much more involved is to rewrite how items are processed on the producer. Instead of items only being processed in a single promise chain with a recursive call. We could use something like p-queue to handle assigning tasks to workers and managing WorkerHandlers if tasks cause a worker to be terminated.

@josdejong
Copy link
Owner

Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎.

I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not.

I think we could do something like this (it's close to your diagram I think):

  1. In case of a timeout error or termination request, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:
    1. The Worker responds with "abort succesful". After that the WorkerHandler can terminate the Worker
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.
  2. In case of a cancel error, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:
    1. The Worker responds with "abort succesful". Then we're done.
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.

What do you think?

@joshLong145
Copy link
Contributor Author

Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎.

I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not.

I think we could do something like this (it's close to your diagram I think):

  1. In case of a timeout error or termination request, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:

    1. The Worker responds with "abort succesful". After that the WorkerHandler can terminate the Worker
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.
  2. In case of a cancel error, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:

    1. The Worker responds with "abort succesful". Then we're done.
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.

What do you think?

Your outline makes sense and aligns with what the diagram outlines but with better definitions of possible execution results. I think you have mapped out most of the remaining details. However, I think we might want an explicit case for when there are abort callbacks but they have thrown an error. While this will result in the same outcome as if the worker was stuck in some CPU bound operation we should handle it explicitly.

@josdejong
Copy link
Owner

Thanks. That makes sense indeed, the abort handler can throw an error too.

@joshLong145
Copy link
Contributor Author

joshLong145 commented Aug 5, 2024

Hey @josdejong

I think I have made good progress and was able to implement the feature set we have discussed above where

  1. We are able to prevent the termination of a worker in the pool if the abortListeners resolve and no errors occur within scope of a listener
  2. We can timeout the execution of abortListeners such that if they are too long running / never leave scope we can short circuit and terminate the worker.
  3. If there is an error within the listener we can handle the rejection and terminate the worker.

I have added tests for both timeout and cancel behaviors to confirm both have the desired behaviors.

I still have to update the docs / examples but I think it is ready for review.

One thing I am noticing is that when I run tests the after all hook in Pool.test.js is failing to fulfill in some case and preventing the test run to exit. When running the tests individually I am not able to get the hook to hang. Was wondering if you had any idea what could be the culprit? Below is the final test output

  1) "after all" hook in "{root}"

  130 passing (17s)
  2 pending
  1 failing

  1) "after all" hook in "{root}":
     Error: Timeout of 10000ms exceeded. For async tests and hooks, ensure "done()" is called; if returning a Promise, ensure it resolves.
      at listOnTimeout (node:internal/timers:569:17)
      at process.processTimers (node:internal/timers:512:7)

@joshLong145
Copy link
Contributor Author

I checked out the latest version of the PR, running on node 20.17.0, Windows 11. I exect though that that doesn't make a difference. When running npm test, it ends with the following error:

1) "after all" hook in "{root}":
     Error: Timeout of 10000ms exceeded. For async tests and hooks, ensure "done()" is called; 
       if returning a Promise, ensure it resolves.

This is the same error that we see in the tests running on Github Actions (running on Linux, node 18, 20, 22): https://github.com/josdejong/workerpool/actions/runs/10802599625/job/29964991022?pr=448

When only running this one test it.only('should timeout a task', function () { ... }) I get the exact same error message.

Interesting, on my system (OSX 13.2) I get a completed test run if I add it.only to should timeout a task I added it to CI to see if the process would terminate on ubuntu linux but it seems the pool.terminate call hangs in that environment as well.

@joshLong145
Copy link
Contributor Author

joshLong145 commented Sep 14, 2024

@josdejong

After reading mocha hook docs I was curious on the use of after since it is a global cleanup hook that runs once per test suite. I have modified the createPool hook to afterEach as we make a new pool in many test cases so to me the use of the after hook should be afterEach as we want a new pool on each test individually and for each pool to clean up after the test completes.

When I make this refactor all tests pass and terminate successfully, the result is that each test suite concludes operation and mocha is able to terminate the process. Test now pass and complete in CI as well. I do not have access to a machine running Windows at this time. Could you re run the suite and see if you get the same result?

@josdejong
Copy link
Owner

Thanks for your updates. Running the latest version of this PR on my Windows machine indeed works and all tests pass.

However, I don't think that this is an issue related to Mocha but some other race condition introduced in the code of this PR. When reducing Pool.test.js to a single test that I posted at #448 (comment), the unit test fails. When extracting the code from this test in a Node.js script and running it without Mocha, it also still fails (never prints "terminated").

I'm afraid we really have to dig deeper to figure out where this issue is coming from before merging this PR.

@joshLong145
Copy link
Contributor Author

joshLong145 commented Sep 18, 2024

Thanks for your updates. Running the latest version of this PR on my Windows machine indeed works and all tests pass.

However, I don't think that this is an issue related to Mocha but some other race condition introduced in the code of this PR. When reducing Pool.test.js to a single test that I posted at #448 (comment), the unit test fails. When extracting the code from this test in a Node.js script and running it without Mocha, it also still fails (never prints "terminated").

I'm afraid we really have to dig deeper to figure out where this issue is coming from before merging this PR.

Thanks for testing. I can reproduce the #448 (comment) on my machine. I still think that the after hook should instead be afterEach but this isolated test failure needs addressing.

Going to keep digging. Now that I can reproduce this error.

@josdejong
Copy link
Owner

Thanks! If I have some time I'll also do some debugging to see if I can find anything.

@joshLong145
Copy link
Contributor Author

@josdejong

After tweaking the finally implementation to not return itself on either resolve or reject of the internal promise wrapper we now seem to have a passing test per #448 (comment). With the change I just pushed the isolated test case now passes for me.

@josdejong
Copy link
Owner

Wow, good find!!! 👏

I've done some more digging, and the issue originates in the method Promise.finally not handling promises returned by the callback correctly, like in our failing unit test .finally(() => pool.terminate());. So it looks like this issue is not introduced in this PR but we discovered an existing bug 😅 .

Just removing .then(res) from .finally is not a good solution though: it is there for a reason, namely propagating the original promise chain (and value or errror). But there where no unit tests guarding this. I've worked out a PR to fix the issue with Promise.finally and add more unit tests: #470. Can you revert your change in Promise.finally? I'll merge this PR soon, and if you update your branch with the fix from #470 this issue will be solved.

Copy link
Owner

@josdejong josdejong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small remarks, otherwise I think we're good to go.

We should first merge #470 though.

src/Promise.js Outdated Show resolved Hide resolved
src/WorkerHandler.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
test/Pool.test.js Outdated Show resolved Hide resolved
@josdejong
Copy link
Owner

Ok I've now merge #470. Can you update the PR to the latest version of the master branch with updated Promise.finally?

@joshLong145
Copy link
Contributor Author

Ok I've now merge #470. Can you update the PR to the latest version of the master branch with updated Promise.finally?

Merged in,I was thinking the return in the initially finally would suffice for returning the promise, but you are correct in that preserving the chain is still important. Good catch!

@josdejong
Copy link
Owner

josdejong commented Oct 3, 2024

When changing after into afterEach right now, we get a timeout error again. Somehow we still get some race condition when creating multiple pools. I've done some debugging and found the following minimal code demonstrating the issue:

// file: terminatest.js
const { Promise } = require('./src/Promise');
const Pool = require('./src/Pool');

run()

async function run() {
  console.log('start')

  const pool1 = new Pool();
  await pool1.exec(sleep, [100])
    .timeout(50)
    .catch(err => { console.error(String(err)) })

  const pool2 = new Pool();
  const result = await pool2.exec(add, [3, 4])
  console.log({ result })

  await sleep(1000)
  
  console.log('pool1 workers:', pool1.workers.length) // <-- is 1 but should be 0

  await pool1.terminate() // <-- never finishes
  await pool2.terminate()

  console.log('done')
}

function sleep(delay) {
  return new Promise((resolve) => setTimeout(resolve, delay))
}

function add(a, b) {
  return a + b
}

I did some debugging but didn't yet found the cause of this. Some observations:

  1. This only happens when creating 2 pools. This suggests that somehow these pools are not fully isolated from each other. I did look if we mess up the context somewhere (this and me) but didn't see anything.

  2. The issue is gone when reverting the following piece of code to what it is in the master branch:
    // on cancellation, force the worker to terminate
    https://github.com/josdejong/workerpool/pull/448/files#diff-7e092d00b94d598e9df27965dd8a8157654881fcd56f5735e873d45c32c47146R418-R470
    (though this effectively disables the abort listener and all these tests fail).

  3. By adding console.log statements, I noticed that the .timeout(50) causes the worker of pool1 to be terminated twice, I think that should be one. And the callback of WorkerHandler.prototype.terminate is not invoked at all. When adding the following lines to WorkerHandler.prototype.terminate, the issue disappears:

    WorkerHandler.prototype.terminate = function (force, callback) {
      if (this.terminated && typeof callback === 'function') {
        return callback()
      }
    
      // ...

But (3) is not the right solution, we should probably look into why the terminate callback is not invoked in terminateAndNotify and/or why the workerhandler of pool1 is not fully terminated when the .timeout(50) is hit. Maybe we miss calling cleanup() somewhere in WorkerHandler.prototype.terminate? And I still don't see why this issue only triggers when having 2 workerpools.

src/worker.js Outdated
]);
}
// if there are no listeners just reject in a promise and let the worker cleanup start
return new Promise(function(_resolve, reject) { reject(); });
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this to me? It feels odd triggering a rejection when there is no error occurring, and it is also a bit odd to leave err undefined, i.e. should be something like reject(new Error('...'))

Copy link
Contributor Author

@joshLong145 joshLong145 Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot and yes we should be throwing an instance of Error here. I have updated this and also removed the worker.kill() invocation of the tryCleanup method as it will be handled through the workerHandler cleanup function. Now that the tracking promise for abort resolution is throwing an error on reject we are now seeing correct reject of the tracked task promise here.

which allows the tracking promise to fulfill and the timeout promise can therefore fully resolve from the catch where we return the tracking promise for abort resolution.

With these changes I now get the following output from your isolated test implementation

start
TimeoutError: Promise timed out after 50 ms
{ result: 7 }
pool1 workers: 0
done

I have also reverted the afterEach hook back to after see #448 (comment)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. It's indeed better when throwing a meaningful error.

But can you explain why there is need to reject there? Since worker.abortListeners.length === 0 is a valid case, it feels odd to throw an exception that cleaning up failed?

How about moving the following logic inside tryCleanup() and rename the function to cleanup()? Would that simplify things? (I'm not sure, just thinking aloud here)

    return worker.tryCleanup().then(function () {
      worker.send({
        id: request.id,
        method: CLEANUP_METHOD_ID,
        error: null,
      });
    }).catch(function(err) {
      worker.send({
        id: request.id,
        method: CLEANUP_METHOD_ID,
        error: err ? convertError(err) : null,
      });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to move this logic into tryCleanup where we send the message to the main thread the cleanup result. With the case of worker.abortListeners.length === 1 returning a promise tracking the result of the handlers or a timeout promise. not returning a promise in the case of worker.abortListeners.length === 0 diverges the return value of the function depending on the length of the abortListeners array.

@josdejong
Copy link
Owner

I tested the issue I mentioned here again and and all works smooth now 🎉 . I guess the underlying bug is fixed via 35a08d2?

@joshLong145
Copy link
Contributor Author

I tested the issue I mentioned here again and and all works smooth now 🎉 . I guess the underlying bug is fixed via 35a08d2?

Yes 😄
Explained in this comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants