forked from glycerine/goq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wkto_test.go
91 lines (72 loc) · 2.46 KB
/
wkto_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main
import (
"fmt"
"os"
"testing"
"time"
cv "github.com/glycerine/goconvey/convey"
)
// worker timeout test
//
func TestWorkerTimeout(t *testing.T) {
cv.Convey("remotely, over nanomsg, if a goq worker doesn't accept a job after a timeout, the job server should note this", t, func() {
cv.Convey("and return the job to the waitq to be run by someone else", func() {
// try to let previous sockets clear out
//time.Sleep(1000 * time.Millisecond)
// *** universal test cfg setup
skipbye := false
cfg := NewTestConfig()
defer cfg.ByeTestConfig(&skipbye)
// *** end universal test setup
// we'll see results much faster if the sender times out faster
cfg.SendTimeoutMsec = 1000
//os.Setenv("GOQ_SENDTIMEOUT_MSEC", "1")
//setSendTimeoutDefaultFromEnv()
jobserv, err := NewJobServ(cfg) // use a local jobserv that listens for external worker
if err != nil {
panic(err)
}
defer CleanupServer(cfg, 0, jobserv, false, &skipbye)
defer CleanupOutdir(cfg)
fmt.Printf("\n[pid %d] spawned a new local JobServ, listening at '%s'.\n", os.Getpid(), cfg.JservAddr())
j := NewJob()
j.Cmd = "bin/good.sh"
sub, err := NewSubmitter(GenAddress(), cfg, false)
if err != nil {
panic(err)
}
sub.SubmitJob(j)
// deaf, the key difference.
worker := HelperNewWorkerDeaf(cfg)
_, err = worker.DoOneJobTimeout(1 * time.Second)
if err != nil {
// we expect a timeout here, because we are playing deaf and we closed our listening socket.
fmt.Printf("\n expected timeout err, err we see is: %s\n", err)
}
fmt.Printf("\n before worker.Destroy()\n")
worker.Destroy()
fmt.Printf("\n after worker.Destroy()\n")
// have to poll until everything gets done. Give ourselves 5 seconds.
to := time.Duration(cfg.SendTimeoutMsec) * 30 * time.Millisecond
timeout := time.After(to)
var deafcount int
OuterFor:
for {
fmt.Printf("wkto_test: just before blocking on deafcount request.\n")
select {
case deafcount = <-jobserv.DeafChan:
if deafcount > 0 {
fmt.Printf("wkto_test *success! excellent*: done blocking on deafcount request, deafcount = %d\n", deafcount)
break OuterFor
} else {
fmt.Printf("wkto_test: *ugh, still waiting* done blocking on deafcount request, deafcount = %d\n", deafcount)
}
case <-timeout:
cv.So(deafcount, cv.ShouldEqual, 1)
fmt.Printf("\nfailing test, no DeafChan 1 after... %v\n", to)
break OuterFor
}
}
})
})
}