Skip to content

Commit 9af252e

Browse files
author
Phalcon
committed
Merge pull request #1650 from sjinks/beanstalk
Phalcon\Queue\Beanstalk enhancements
2 parents 056440f + 113aaef commit 9af252e

File tree

6 files changed

+300
-19
lines changed

6 files changed

+300
-19
lines changed

CHANGELOG

+3-1
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,10 @@
190190
- Optimized Phalcon\Paginator\Adapter\NativeArray (#1653)
191191
- Phalcon\Queue:
192192
- Fixed bug in Phalcon\Queue\Beanstalk::read() (#1348, #1612)
193-
- Bug fixes in beanstalkd protocol implementation
193+
- Bug fixes in beanstalkd protocol implementation (#1650)
194194
- Optimizations (#1621)
195+
- Added peekDelayed() and peekburied() to Phalcon\Queue\Beanstalk (#1650)
196+
- Added kick(), bury(), release(), touch() to Phalcon\Queue\Beanstalk\Job (#1650)
195197
- Phalcon\Security:
196198
- Phalcon\Security\Exception inherits from Phalcon\Exception, not from \Phalcon\DI\Exception
197199
- Added Phalcon\Security::computeHmac() (#1347)

ext/queue/beanstalk.c

+87-17
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){
369369

370370
PHALCON_OBS_VAR(status);
371371
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
372-
if (PHALCON_IS_STRING(status, "WATCH")) {
372+
if (PHALCON_IS_STRING(status, "WATCHING")) {
373373
PHALCON_OBS_VAR(watching_tube);
374374
phalcon_array_fetch_long(&watching_tube, response, 1, PH_NOISY);
375375
RETURN_CCTOR(watching_tube);
@@ -378,15 +378,42 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){
378378
RETURN_MM_FALSE;
379379
}
380380

381+
static void phalcon_queue_beanstalk_peek_common(zval *return_value, zval *this_ptr, zval *response TSRMLS_DC)
382+
{
383+
zval *job_id, *length, *serialized = NULL, *body;
384+
385+
if (!phalcon_array_isset_long_fetch(&job_id, response, 1)) {
386+
job_id = PHALCON_GLOBAL(z_null);
387+
}
388+
389+
if (!phalcon_array_isset_long_fetch(&length, response, 2)) {
390+
length = PHALCON_GLOBAL(z_null);
391+
}
392+
393+
phalcon_call_method_params(serialized, &serialized, this_ptr, SL("read"), zend_inline_hash_func(SS("read")) TSRMLS_CC, 1, length);
394+
if (EG(exception)) {
395+
return;
396+
}
397+
398+
MAKE_STD_ZVAL(body);
399+
phalcon_unserialize(body, serialized TSRMLS_CC);
400+
zval_ptr_dtor(&serialized);
401+
if (Z_REFCOUNT_P(body) >= 1) {
402+
Z_DELREF_P(body);
403+
}
404+
405+
object_init_ex(return_value, phalcon_queue_beanstalk_job_ce);
406+
phalcon_call_method_params(NULL, NULL, return_value, SL("__construct"), zend_inline_hash_func(SS("__construct")) TSRMLS_CC, 3, this_ptr, job_id, body);
407+
}
408+
381409
/**
382410
* Inspect the next ready job.
383411
*
384412
* @return boolean|Phalcon\Queue\Beanstalk\Job
385413
*/
386414
PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){
387415

388-
zval *command, *response, *status, *job_id, *length;
389-
zval *serialized_body, *body;
416+
zval *command, *response, *status;
390417

391418
PHALCON_MM_GROW();
392419

@@ -400,26 +427,69 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){
400427
PHALCON_OBS_VAR(status);
401428
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
402429
if (PHALCON_IS_STRING(status, "FOUND")) {
403-
PHALCON_OBS_VAR(job_id);
404-
phalcon_array_fetch_long(&job_id, response, 1, PH_NOISY);
405-
406-
PHALCON_OBS_VAR(length);
407-
phalcon_array_fetch_long(&length, response, 2, PH_NOISY);
408-
409-
PHALCON_INIT_VAR(serialized_body);
410-
phalcon_call_method_p1(serialized_body, this_ptr, "read", length);
411-
412-
PHALCON_INIT_VAR(body);
413-
phalcon_unserialize(body, serialized_body TSRMLS_CC);
414-
object_init_ex(return_value, phalcon_queue_beanstalk_job_ce);
415-
phalcon_call_method_p3_noret(return_value, "__construct", this_ptr, job_id, body);
416-
430+
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
417431
RETURN_MM();
418432
}
419433

420434
RETURN_MM_FALSE;
421435
}
422436

437+
/**
438+
* Return the delayed job with the shortest delay left
439+
*
440+
* @return boolean|Phalcon\Queue\Beanstalk\Job
441+
*/
442+
PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed){
443+
444+
zval *command, *response, *status;
445+
446+
PHALCON_MM_GROW();
447+
448+
PHALCON_INIT_VAR(command);
449+
ZVAL_STRING(command, "peek-delayed", 1);
450+
phalcon_call_method_p1_noret(this_ptr, "write", command);
451+
452+
PHALCON_INIT_VAR(response);
453+
phalcon_call_method(response, this_ptr, "readstatus");
454+
455+
PHALCON_OBS_VAR(status);
456+
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
457+
if (PHALCON_IS_STRING(status, "FOUND")) {
458+
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
459+
RETURN_MM();
460+
}
461+
462+
RETURN_MM_FALSE;
463+
}
464+
465+
/**
466+
* Return the next job in the list of buried jobs
467+
*
468+
* @return boolean|Phalcon\Queue\Beanstalk\Job
469+
*/
470+
PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried){
471+
472+
zval *command, *response, *status;
473+
474+
PHALCON_MM_GROW();
475+
476+
PHALCON_INIT_VAR(command);
477+
ZVAL_STRING(command, "peek-buried", 1);
478+
phalcon_call_method_p1_noret(this_ptr, "write", command);
479+
480+
PHALCON_INIT_VAR(response);
481+
phalcon_call_method(response, this_ptr, "readstatus");
482+
483+
PHALCON_OBS_VAR(status);
484+
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
485+
if (PHALCON_IS_STRING(status, "FOUND")) {
486+
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
487+
RETURN_MM();
488+
}
489+
490+
RETURN_MM_FALSE;
491+
}
492+
423493
/**
424494
* Reads the latest status from the Beanstalkd server
425495
*

ext/queue/beanstalk.h

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, reserve);
2828
PHP_METHOD(Phalcon_Queue_Beanstalk, choose);
2929
PHP_METHOD(Phalcon_Queue_Beanstalk, watch);
3030
PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady);
31+
PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed);
32+
PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried);
3133
PHP_METHOD(Phalcon_Queue_Beanstalk, readStatus);
3234
PHP_METHOD(Phalcon_Queue_Beanstalk, read);
3335
PHP_METHOD(Phalcon_Queue_Beanstalk, write);
@@ -68,6 +70,8 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_method_entry){
6870
PHP_ME(Phalcon_Queue_Beanstalk, choose, arginfo_phalcon_queue_beanstalk_choose, ZEND_ACC_PUBLIC)
6971
PHP_ME(Phalcon_Queue_Beanstalk, watch, arginfo_phalcon_queue_beanstalk_watch, ZEND_ACC_PUBLIC)
7072
PHP_ME(Phalcon_Queue_Beanstalk, peekReady, NULL, ZEND_ACC_PUBLIC)
73+
PHP_ME(Phalcon_Queue_Beanstalk, peekDelayed, NULL, ZEND_ACC_PUBLIC)
74+
PHP_ME(Phalcon_Queue_Beanstalk, peekBuried, NULL, ZEND_ACC_PUBLIC)
7175
PHP_ME(Phalcon_Queue_Beanstalk, readStatus, NULL, ZEND_ACC_PROTECTED)
7276
PHP_ME(Phalcon_Queue_Beanstalk, read, arginfo_phalcon_queue_beanstalk_read, ZEND_ACC_PUBLIC)
7377
PHP_ME(Phalcon_Queue_Beanstalk, write, NULL, ZEND_ACC_PROTECTED)

ext/queue/beanstalk/job.c

+146-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody){
104104
/**
105105
* Removes a job from the server entirely
106106
*
107-
* @param string $id
108107
* @return boolean
109108
*/
110109
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){
@@ -132,6 +131,152 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){
132131
RETURN_MM_FALSE;
133132
}
134133

134+
/**
135+
* The release command puts a reserved job back into the ready queue (and marks
136+
* its state as "ready") to be run by any client. It is normally used when the job
137+
* fails because of a transitory error.
138+
*
139+
* @return boolean
140+
*/
141+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release){
142+
143+
zval *priority = NULL, *delay = NULL;
144+
zval *id, *command, *queue, *response, *status;
145+
146+
phalcon_fetch_params(0, 0, 2, &priority, &delay);
147+
148+
PHALCON_MM_GROW();
149+
150+
if (!priority) {
151+
PHALCON_INIT_VAR(priority);
152+
ZVAL_LONG(priority, 100);
153+
}
154+
155+
if (!delay) {
156+
delay = PHALCON_GLOBAL(z_zero);
157+
}
158+
159+
id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
160+
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);
161+
162+
PHALCON_ALLOC_GHOST_ZVAL(command);
163+
PHALCON_CONCAT_SVSVSV(command, "release ", id, " ", priority, " ", delay);
164+
phalcon_call_method_p1_noret(queue, "write", command);
165+
166+
PHALCON_INIT_VAR(response);
167+
phalcon_call_method(response, queue, "readstatus");
168+
169+
PHALCON_OBS_VAR(status);
170+
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
171+
if (PHALCON_IS_STRING(status, "RELEASED")) {
172+
RETURN_MM_TRUE;
173+
}
174+
175+
RETURN_MM_FALSE;
176+
}
177+
178+
/**
179+
* The bury command puts a job into the "buried" state. Buried jobs are put into
180+
* a FIFO linked list and will not be touched by the server again until a client
181+
* kicks them with the "kick" command.
182+
*
183+
* @return boolean
184+
*/
185+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury){
186+
187+
zval *priority = NULL;
188+
zval *id, *command, *queue, *response, *status;
189+
190+
phalcon_fetch_params(0, 0, 1, &priority);
191+
192+
PHALCON_MM_GROW();
193+
194+
if (!priority) {
195+
PHALCON_INIT_VAR(priority);
196+
ZVAL_LONG(priority, 100);
197+
}
198+
199+
id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
200+
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);
201+
202+
PHALCON_ALLOC_GHOST_ZVAL(command);
203+
PHALCON_CONCAT_SVSV(command, "bury ", id, " ", priority);
204+
phalcon_call_method_p1_noret(queue, "write", command);
205+
206+
PHALCON_INIT_VAR(response);
207+
phalcon_call_method(response, queue, "readstatus");
208+
209+
PHALCON_OBS_VAR(status);
210+
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
211+
if (PHALCON_IS_STRING(status, "BURIED")) {
212+
RETURN_MM_TRUE;
213+
}
214+
215+
RETURN_MM_FALSE;
216+
}
217+
218+
/**
219+
* The bury command puts a job into the "buried" state. Buried jobs are put into
220+
* a FIFO linked list and will not be touched by the server again until a client
221+
* kicks them with the "kick" command.
222+
*
223+
* @return boolean
224+
*/
225+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch){
226+
227+
zval *id, *command, *queue, *response, *status;
228+
229+
PHALCON_MM_GROW();
230+
231+
id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
232+
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);
233+
234+
PHALCON_ALLOC_GHOST_ZVAL(command);
235+
PHALCON_CONCAT_SV(command, "touch ", id);
236+
phalcon_call_method_p1_noret(queue, "write", command);
237+
238+
PHALCON_INIT_VAR(response);
239+
phalcon_call_method(response, queue, "readstatus");
240+
241+
PHALCON_OBS_VAR(status);
242+
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
243+
if (PHALCON_IS_STRING(status, "TOUCHED")) {
244+
RETURN_MM_TRUE;
245+
}
246+
247+
RETURN_MM_FALSE;
248+
}
249+
250+
/**
251+
* Move the job to the ready queue if it is delayed or buried.
252+
*
253+
* @return boolean
254+
*/
255+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick){
256+
257+
zval *id, *command, *queue, *response, *status;
258+
259+
PHALCON_MM_GROW();
260+
261+
id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
262+
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);
263+
264+
PHALCON_ALLOC_GHOST_ZVAL(command);
265+
PHALCON_CONCAT_SV(command, "kick-job ", id);
266+
phalcon_call_method_p1_noret(queue, "write", command);
267+
268+
PHALCON_INIT_VAR(response);
269+
phalcon_call_method(response, queue, "readstatus");
270+
271+
PHALCON_OBS_VAR(status);
272+
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
273+
if (PHALCON_IS_STRING(status, "KICKED")) {
274+
RETURN_MM_TRUE;
275+
}
276+
277+
RETURN_MM_FALSE;
278+
}
279+
135280
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup) {
136281

137282
zval *id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);

ext/queue/beanstalk/job.h

+8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __construct);
2525
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getId);
2626
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody);
2727
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete);
28+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release);
29+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury);
30+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch);
31+
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick);
2832
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup);
2933

3034
ZEND_BEGIN_ARG_INFO_EX(arginfo_phalcon_queue_beanstalk_job___construct, 0, 0, 3)
@@ -38,6 +42,10 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_job_method_entry){
3842
PHP_ME(Phalcon_Queue_Beanstalk_Job, getId, NULL, ZEND_ACC_PUBLIC)
3943
PHP_ME(Phalcon_Queue_Beanstalk_Job, getBody, NULL, ZEND_ACC_PUBLIC)
4044
PHP_ME(Phalcon_Queue_Beanstalk_Job, delete, NULL, ZEND_ACC_PUBLIC)
45+
PHP_ME(Phalcon_Queue_Beanstalk_Job, release, NULL, ZEND_ACC_PUBLIC)
46+
PHP_ME(Phalcon_Queue_Beanstalk_Job, bury, NULL, ZEND_ACC_PUBLIC)
47+
PHP_ME(Phalcon_Queue_Beanstalk_Job, touch, NULL, ZEND_ACC_PUBLIC)
48+
PHP_ME(Phalcon_Queue_Beanstalk_Job, kick, NULL, ZEND_ACC_PUBLIC)
4149
PHP_ME(Phalcon_Queue_Beanstalk_Job, __wakeup, NULL, ZEND_ACC_PUBLIC)
4250
PHP_FE_END
4351
};

0 commit comments

Comments
 (0)