-
Notifications
You must be signed in to change notification settings - Fork 281
/
Copy pathPheanstalk.php
192 lines (159 loc) · 5.15 KB
/
Pheanstalk.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
<?php
declare(strict_types=1);
namespace Pheanstalk;
use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Contract\PheanstalkManagerInterface;
use Pheanstalk\Contract\PheanstalkPublisherInterface;
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
use Pheanstalk\Contract\SocketFactoryInterface;
use Pheanstalk\Values\Job;
use Pheanstalk\Values\JobStats;
use Pheanstalk\Values\ServerStats;
use Pheanstalk\Values\Timeout;
use Pheanstalk\Values\TubeList;
use Pheanstalk\Values\TubeName;
use Pheanstalk\Values\TubeStats;
/**
* Pheanstalk is a PHP client for the beanstalkd work queue.
* This class implements all functionality in one big object.
* It is recommended to instead inject instances of the more specific interface implementations.
* For example, your frontend is unlikely to subscribe to requests so probably does not need `PheanstalkPublisherInterface`
* or `PheanstalkManagerInterface`.
*/
final class Pheanstalk implements PheanstalkManagerInterface, PheanstalkPublisherInterface, PheanstalkSubscriberInterface
{
private PheanstalkManagerInterface $manager;
private PheanstalkPublisherInterface $publisher;
private PheanstalkSubscriberInterface $subscriber;
public function __construct(private readonly Connection $connection)
{
$this->manager = new PheanstalkManager($this->connection);
$this->subscriber = new PheanstalkSubscriber($this->connection);
$this->publisher = new PheanstalkPublisher($this->connection);
}
/**
* Static constructor that uses auto-detection to choose an underlying socket implementation
*/
public static function create(
string $host,
int $port = 11300,
?Timeout $connectTimeout = null,
?Timeout $receiveTimeout = null
): static {
return static::createWithFactory(new SocketFactory($host, $port, null, $connectTimeout, $receiveTimeout));
}
/**
* Static constructor that uses a given socket factory for underlying connections
*/
public static function createWithFactory(SocketFactoryInterface $factory): static
{
return new static(new Connection($factory));
}
public function kick(int $max): int
{
return $this->manager->kick($max);
}
public function kickJob(JobIdInterface $job): void
{
$this->manager->kickJob($job);
}
public function listTubes(): TubeList
{
return $this->manager->listTubes();
}
public function pauseTube(TubeName $tube, int $delay): void
{
$this->manager->pauseTube($tube, $delay);
}
public function resumeTube(TubeName $tube): void
{
$this->manager->resumeTube($tube);
}
public function peek(JobIdInterface $job): Job
{
return $this->manager->peek($job);
}
public function peekReady(): ?Job
{
return $this->manager->peekReady();
}
public function peekDelayed(): ?Job
{
return $this->manager->peekDelayed();
}
public function peekBuried(): ?Job
{
return $this->manager->peekBuried();
}
public function statsJob(JobIdInterface $job): JobStats
{
return $this->manager->statsJob($job);
}
public function statsTube(TubeName $tube): TubeStats
{
return $this->manager->statsTube($tube);
}
public function stats(): ServerStats
{
return $this->manager->stats();
}
public function bury(JobIdInterface $job, int $priority = self::DEFAULT_PRIORITY): void
{
$this->subscriber->bury($job, $priority);
}
public function listTubeUsed(): TubeName
{
return $this->publisher->listTubeUsed();
}
public function put(
string $data,
int $priority = self::DEFAULT_PRIORITY,
int $delay = self::DEFAULT_DELAY,
int $timeToRelease = self::DEFAULT_TTR
): JobIdInterface {
return $this->publisher->put($data, $priority, $delay, $timeToRelease);
}
public function useTube(TubeName $tube): void
{
$this->publisher->useTube($tube);
}
public function delete(JobIdInterface $job): void
{
$this->subscriber->delete($job);
}
public function ignore(TubeName $tube): int
{
return $this->subscriber->ignore($tube);
}
public function listTubesWatched(): TubeList
{
return $this->subscriber->listTubesWatched();
}
public function release(
JobIdInterface $job,
int $priority = PheanstalkPublisherInterface::DEFAULT_PRIORITY,
int $delay = PheanstalkPublisherInterface::DEFAULT_DELAY
): void {
$this->subscriber->release($job, $priority, $delay);
}
public function reserve(): Job
{
return $this->subscriber->reserve();
}
public function reserveJob(JobIdInterface $job): Job
{
return $this->subscriber->reserveJob($job);
}
public function reserveWithTimeout(int $timeout): ?Job
{
return $this->subscriber->reserveWithTimeout($timeout);
}
public function touch(JobIdInterface $job): void
{
$this->subscriber->touch($job);
}
public function watch(TubeName $tube): int
{
return $this->subscriber->watch($tube);
}
}