-
Notifications
You must be signed in to change notification settings - Fork 1
/
Worker.php
58 lines (50 loc) · 1.26 KB
/
Worker.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?php
namespace core;
use Pheanstalk\Pheanstalk;
class Worker
{
public static function listen($channel, $fn, $expect="json")
{
global $_CLI;
$queue = new Pheanstalk("127.0.0.1");
$queue->watch($channel)->ignore("default");
msg("Process $channel(127.0.0.1)");
if ($_CLI["test"]) {
// Process 1 cmd in testing mode
$job = $queue->reserve(10);
if ($job === false) {
user_error("No job in queue, timed-out after 10sec");
}
$input = $job->getData();
msg(sprintf("Processing job (%d)", $job->getId()), [$input]);
if ($expect === "json") {
$input = json_decode($input, true);
} elseif ($expect === "string") {
// nothing to do
} else {
$queue->bury($job);
user_error("Invalid expect=$expect");
}
$fn($input);
$queue->delete($job);
return;
}
while (true) {
$job = $queue->reserve();
$input = $job->getData();
msg(sprintf("Processing job (%d)", $job->getId()), [$input]);
// Bury by default (no retry on failure)
// On success the job is deleted
$queue->bury($job);
if ($expect === "json") {
$input = json_decode($input, true);
} elseif ($expect === "string") {
// nothing to do
} else {
user_error("Invalid expect=$expect");
}
$fn($input);
$queue->delete($job);
}
}
}