Skip to content

Commit

Permalink
Added EventLoop.Terminate() which cancels all active timers and preve…
Browse files Browse the repository at this point in the history
…nts further job submission. Closes #81
  • Loading branch information
dop251 committed Jul 26, 2024
1 parent 2aae10d commit 5de51f3
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.18.x, 1.x]
go-version: [1.20.x, 1.x]
os: [ubuntu-latest, windows-latest]
arch: ["", "386"]
fail-fast: false
Expand Down
166 changes: 135 additions & 31 deletions eventloop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
)

type job struct {
cancel func() bool
fn func()
idx int

cancelled bool
fn func()
}

type Timer struct {
Expand All @@ -33,6 +36,7 @@ type Immediate struct {
type EventLoop struct {
vm *goja.Runtime
jobChan chan func()
jobs []*job
jobCount int32
canRun int32

Expand All @@ -41,9 +45,10 @@ type EventLoop struct {

auxJobsSpare, auxJobs []func()

stopLock sync.Mutex
stopCond *sync.Cond
running bool
stopLock sync.Mutex
stopCond *sync.Cond
running bool
terminated bool

enableConsole bool
registry *require.Registry
Expand Down Expand Up @@ -107,11 +112,22 @@ func (loop *EventLoop) schedule(call goja.FunctionCall, repeating bool) goja.Val
}
f := func() { fn(nil, args...) }
loop.jobCount++
var job *job
var ret goja.Value
if repeating {
return loop.vm.ToValue(loop.addInterval(f, time.Duration(delay)*time.Millisecond))
interval := loop.newInterval(f)
interval.start(loop, time.Duration(delay)*time.Millisecond)
job = &interval.job
ret = loop.vm.ToValue(interval)
} else {
return loop.vm.ToValue(loop.addTimeout(f, time.Duration(delay)*time.Millisecond))
timeout := loop.newTimeout(f)
timeout.start(loop, time.Duration(delay)*time.Millisecond)
job = &timeout.job
ret = loop.vm.ToValue(timeout)
}
job.idx = len(loop.jobs)
loop.jobs = append(loop.jobs, job)
return ret
}
return nil
}
Expand Down Expand Up @@ -143,12 +159,18 @@ func (loop *EventLoop) setImmediate(call goja.FunctionCall) goja.Value {
// The instance of goja.Runtime that is passed to the function and any Values derived
// from it must not be used outside the function. SetTimeout is
// safe to call inside or outside the loop.
// If the loop is terminated (see Terminate()) returns nil.
func (loop *EventLoop) SetTimeout(fn func(*goja.Runtime), timeout time.Duration) *Timer {
t := loop.addTimeout(func() { fn(loop.vm) }, timeout)
loop.addAuxJob(func() {
t := loop.newTimeout(func() { fn(loop.vm) })
if loop.addAuxJob(func() {
loop.jobCount++
})
return t
t.idx = len(loop.jobs)
loop.jobs = append(loop.jobs, &t.job)
}) {
t.start(loop, timeout)
return t
}
return nil
}

// ClearTimeout cancels a Timer returned by SetTimeout if it has not run yet.
Expand All @@ -166,12 +188,18 @@ func (loop *EventLoop) ClearTimeout(t *Timer) {
// function and any Values derived from it must not be used outside
// the function. SetInterval is safe to call inside or outside the
// loop.
// If the loop is terminated (see Terminate()) returns nil.
func (loop *EventLoop) SetInterval(fn func(*goja.Runtime), timeout time.Duration) *Interval {
i := loop.addInterval(func() { fn(loop.vm) }, timeout)
loop.addAuxJob(func() {
i := loop.newInterval(func() { fn(loop.vm) })
if loop.addAuxJob(func() {
loop.jobCount++
})
return i
i.idx = len(loop.jobs)
loop.jobs = append(loop.jobs, &i.job)
}) {
i.start(loop, timeout)
return i
}
return nil
}

// ClearInterval cancels an Interval returned by SetInterval.
Expand All @@ -190,6 +218,9 @@ func (loop *EventLoop) setRunning() {
}
loop.running = true
atomic.StoreInt32(&loop.canRun, 1)
loop.auxJobsLock.Lock()
loop.terminated = false
loop.auxJobsLock.Unlock()
}

// Run calls the specified function, starts the event loop and waits until there are no more delayed jobs to run
Expand Down Expand Up @@ -222,7 +253,7 @@ func (loop *EventLoop) StartInForeground() {

// Stop the loop that was started with Start(). After this function returns there will be no more jobs executed
// by the loop. It is possible to call Start() or Run() again after this to resume the execution.
// Note, it does not cancel active timeouts.
// Note, it does not cancel active timeouts (use Terminate() instead if you want this).
// It is not allowed to run Start() (or Run()) and Stop() concurrently.
// Calling Stop() on a non-running loop has no effect.
// It is not allowed to call Stop() from the loop, because it is synchronous and cannot complete until the loop
Expand Down Expand Up @@ -250,12 +281,44 @@ func (loop *EventLoop) StopNoWait() {
loop.stopLock.Unlock()
}

// Terminate stops the loop and clears all active timeouts and interval. After it returns there are no
// active timers or goroutines associated with the loop. Any attempt to submit a task (by using RunOnLoop(),
// SetTimeout() or SetInterval()) will not succeed.
// After being terminated the loop can be restarted again by using Start() or Run().
func (loop *EventLoop) Terminate() {
loop.Stop()
for i := 0; i < len(loop.jobs); i++ {
job := loop.jobs[i]
if !job.cancelled {
job.cancelled = true
if job.cancel() {
loop.removeJob(job)
i--
}
}
}

loop.auxJobsLock.Lock()
loop.terminated = true
loop.auxJobsLock.Unlock()

for len(loop.jobs) > 0 {
select {
case job := <-loop.jobChan:
job()
case <-loop.wakeupChan:
loop.runAux()
}
}
}

// RunOnLoop schedules to run the specified function in the context of the loop as soon as possible.
// The order of the runs is preserved (i.e. the functions will be called in the same order as calls to RunOnLoop())
// The instance of goja.Runtime that is passed to the function and any Values derived from it must not be used
// outside the function. It is safe to call inside or outside the loop.
func (loop *EventLoop) RunOnLoop(fn func(*goja.Runtime)) {
loop.addAuxJob(func() { fn(loop.vm) })
// Returns true on success or false if the loop is terminated (see Terminate()).
func (loop *EventLoop) RunOnLoop(fn func(*goja.Runtime)) bool {
return loop.addAuxJob(func() { fn(loop.vm) })
}

func (loop *EventLoop) runAux() {
Expand Down Expand Up @@ -304,42 +367,54 @@ func (loop *EventLoop) wakeup() {
}
}

func (loop *EventLoop) addAuxJob(fn func()) {
func (loop *EventLoop) addAuxJob(fn func()) bool {
loop.auxJobsLock.Lock()
if loop.terminated {
loop.auxJobsLock.Unlock()
return false
}
loop.auxJobs = append(loop.auxJobs, fn)
loop.auxJobsLock.Unlock()
loop.wakeup()
return true
}

func (loop *EventLoop) addTimeout(f func(), timeout time.Duration) *Timer {
func (loop *EventLoop) newTimeout(f func()) *Timer {
t := &Timer{
job: job{fn: f},
}
t.cancel = t.doCancel

return t
}

func (t *Timer) start(loop *EventLoop, timeout time.Duration) {
t.timer = time.AfterFunc(timeout, func() {
loop.jobChan <- func() {
loop.doTimeout(t)
}
})

return t
}

func (loop *EventLoop) addInterval(f func(), timeout time.Duration) *Interval {
// https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
if timeout <= 0 {
timeout = time.Millisecond
}

func (loop *EventLoop) newInterval(f func()) *Interval {
i := &Interval{
job: job{fn: f},
ticker: time.NewTicker(timeout),
stopChan: make(chan struct{}),
}
i.cancel = i.doCancel

go i.run(loop)
return i
}

func (i *Interval) start(loop *EventLoop, timeout time.Duration) {
// https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
if timeout <= 0 {
timeout = time.Millisecond
}
i.ticker = time.NewTicker(timeout)
go i.run(loop)
}

func (loop *EventLoop) addImmediate(f func()) *Immediate {
i := &Immediate{
job: job{fn: f},
Expand All @@ -351,6 +426,7 @@ func (loop *EventLoop) addImmediate(f func()) *Immediate {
}

func (loop *EventLoop) doTimeout(t *Timer) {
loop.removeJob(&t.job)
if !t.cancelled {
t.cancelled = true
loop.jobCount--
Expand All @@ -374,18 +450,34 @@ func (loop *EventLoop) doImmediate(i *Immediate) {

func (loop *EventLoop) clearTimeout(t *Timer) {
if t != nil && !t.cancelled {
t.timer.Stop()
t.cancelled = true
loop.jobCount--
if t.doCancel() {
loop.removeJob(&t.job)
}
}
}

func (loop *EventLoop) clearInterval(i *Interval) {
if i != nil && !i.cancelled {
i.cancelled = true
close(i.stopChan)
loop.jobCount--
i.doCancel()
}
}

func (loop *EventLoop) removeJob(job *job) {
idx := job.idx
if idx < 0 {
return
}
if idx < len(loop.jobs)-1 {
loop.jobs[idx] = loop.jobs[len(loop.jobs)-1]
loop.jobs[idx].idx = idx
}
loop.jobs[len(loop.jobs)-1] = nil
loop.jobs = loop.jobs[:len(loop.jobs)-1]
job.idx = -1
}

func (loop *EventLoop) clearImmediate(i *Immediate) {
Expand All @@ -395,6 +487,15 @@ func (loop *EventLoop) clearImmediate(i *Immediate) {
}
}

func (i *Interval) doCancel() bool {
close(i.stopChan)
return false
}

func (t *Timer) doCancel() bool {
return t.timer.Stop()
}

func (i *Interval) run(loop *EventLoop) {
L:
for {
Expand All @@ -408,4 +509,7 @@ L:
}
}
}
loop.jobChan <- func() {
loop.removeJob(&i.job)
}
}
33 changes: 33 additions & 0 deletions eventloop/eventloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/dop251/goja"

"go.uber.org/goleak"
)

func TestRun(t *testing.T) {
Expand Down Expand Up @@ -594,3 +596,34 @@ func TestEventLoop_ClearRunningTimeout(t *testing.T) {
t.Fatal(called)
}
}

func TestEventLoop_Terminate(t *testing.T) {
defer goleak.VerifyNone(t)

loop := NewEventLoop()
loop.Start()
interval := loop.SetInterval(func(vm *goja.Runtime) {}, 10*time.Millisecond)
time.Sleep(500 * time.Millisecond)
loop.ClearInterval(interval)
loop.Terminate()

if loop.SetTimeout(func(*goja.Runtime) {}, time.Millisecond) != nil {
t.Fatal("was able to SetTimeout()")
}
if loop.SetInterval(func(*goja.Runtime) {}, time.Millisecond) != nil {
t.Fatal("was able to SetInterval()")
}
if loop.RunOnLoop(func(*goja.Runtime) {}) {
t.Fatal("was able to RunOnLoop()")
}

ch := make(chan struct{})
loop.Start()
if !loop.RunOnLoop(func(runtime *goja.Runtime) {
close(ch)
}) {
t.Fatal("RunOnLoop() has failed after restart")
}
<-ch
loop.Terminate()
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
module github.com/dop251/goja_nodejs

go 1.18
go 1.20

require (
github.com/dop251/base64dec v0.0.0-20231022112746-c6c9f9a96217
github.com/dop251/goja v0.0.0-20231014103939-873a1496dc8e
go.uber.org/goleak v1.3.0
golang.org/x/net v0.17.0
golang.org/x/text v0.13.0
)
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4M
github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
Expand All @@ -25,8 +26,12 @@ github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NB
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand Down Expand Up @@ -62,3 +67,4 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

0 comments on commit 5de51f3

Please sign in to comment.