Skip to content

Commit 602846b

Browse files
authored
Merge pull request #123 from php-enqueue/pheanstalk-transport
[WIP][beanstalk] Add transport for beanstalkd
2 parents 961d99a + 23c7c4f commit 602846b

File tree

50 files changed

+2454
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2454
-3
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Features:
99

1010
* [Feature rich](docs/quick_tour.md).
1111
* [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transport [abstraction](https://github.com/php-enqueue/psr-queue).
12-
* Supports transports [AMQP (RabbitMQ, ActiveMQ)](docs/transport/amqp.md), [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
12+
* Supports transports [AMQP (RabbitMQ, ActiveMQ)](docs/transport/amqp.md), [Beanstalk (Pheanstalk)](docs/transport/pheanstalk.md) [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
1313
* [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
1414
* [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md)
1515
* [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support.

bin/test

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ function waitForService()
2323
waitForService rabbitmq 5672 50
2424
waitForService mysql 3306 50
2525
waitForService redis 6379 50
26+
waitForService beanstalkd 11300
2627

2728
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
2829
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"enqueue/null": "*@dev",
1414
"enqueue/dbal": "*@dev",
1515
"enqueue/sqs": "*@dev",
16+
"enqueue/pheanstalk": "*@dev",
1617
"enqueue/enqueue-bundle": "*@dev",
1718
"enqueue/job-queue": "*@dev",
1819
"enqueue/simple-client": "*@dev",
@@ -85,6 +86,10 @@
8586
"type": "path",
8687
"url": "pkg/sqs"
8788
},
89+
{
90+
"type": "path",
91+
"url": "pkg/pheanstalk"
92+
},
8893
{
8994
"type": "path",
9095
"url": "pkg/simple-client"

docker-compose.yml

+11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
version: '2'
2+
23
services:
34
dev:
45
image: enqueue/dev:latest
@@ -7,6 +8,7 @@ services:
78
- rabbitmq
89
- mysql
910
- redis
11+
- beanstalkd
1012
volumes:
1113
- './:/mqdev'
1214
environment:
@@ -29,6 +31,9 @@ services:
2931
- AWS__SQS__KEY=$ENQUEUE_AWS__SQS__KEY
3032
- AWS__SQS__SECRET=$ENQUEUE_AWS__SQS__SECRET
3133
- AWS__SQS__REGION=$ENQUEUE_AWS__SQS__REGION
34+
- BEANSTALKD_HOST=beanstalkd
35+
- BEANSTALKD_PORT=11300
36+
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
3237

3338
rabbitmq:
3439
image: enqueue/rabbitmq:latest
@@ -37,6 +42,12 @@ services:
3742
- RABBITMQ_DEFAULT_USER=guest
3843
- RABBITMQ_DEFAULT_PASS=guest
3944
- RABBITMQ_DEFAULT_VHOST=mqdev
45+
ports:
46+
- "15677:15672"
47+
48+
beanstalkd:
49+
image: 'schickling/beanstalkd'
50+
4051
redis:
4152
image: 'redis:3'
4253
ports:

docs/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Transports
55
- [Amqp (RabbitMQ, ActiveMQ)](transport/amqp.md)
66
- [Amazon SQS](transport/sqs.md)
7+
- [Beanstalk (Pheanstalk)](transport/pheanstalk.md)
78
- [Stomp](transport/stomp.md)
89
- [Redis](transport/redis.md)
910
- [Doctrine DBAL](transport/dbal.md)

docs/transport/pheanstalk.md

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Beanstalk (Pheanstalk) transport
2+
3+
The transport uses [Beanstalkd](http://kr.github.io/beanstalkd/) job manager.
4+
The transport uses [Pheanstalk](https://github.com/pda/pheanstalk) library internally.
5+
6+
* [Installation](#installation)
7+
* [Create context](#create-context)
8+
* [Send message to topic](#send-message-to-topic)
9+
* [Send message to queue](#send-message-to-queue)
10+
* [Consume message](#consume-message)
11+
12+
## Installation
13+
14+
```bash
15+
$ composer require enqueue/pheanstalk
16+
```
17+
18+
19+
## Create context
20+
21+
```php
22+
<?php
23+
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
24+
25+
// connects to localhost:11300
26+
$factory = new PheanstalkConnectionFactory();
27+
28+
// same as above
29+
$factory = new PheanstalkConnectionFactory('beanstalk://');
30+
31+
// connects to example host and port 5555
32+
$factory = new PheanstalkConnectionFactory('beanstalk://example:5555');
33+
34+
// same as above but configured by array
35+
$factory = new PheanstalkConnectionFactory([
36+
'host' => 'example',
37+
'port' => 5555
38+
]);
39+
```
40+
41+
## Send message to topic
42+
43+
```php
44+
<?php
45+
/** @var \Enqueue\Pheanstalk\PheanstalkContext $psrContext */
46+
47+
$fooTopic = $psrContext->createTopic('aTopic');
48+
$message = $psrContext->createMessage('Hello world!');
49+
50+
$psrContext->createProducer()->send($fooTopic, $message);
51+
```
52+
53+
## Send message to queue
54+
55+
```php
56+
<?php
57+
/** @var \Enqueue\Pheanstalk\PheanstalkContext $psrContext */
58+
59+
$fooQueue = $psrContext->createQueue('aQueue');
60+
$message = $psrContext->createMessage('Hello world!');
61+
62+
$psrContext->createProducer()->send($fooQueue, $message);
63+
```
64+
65+
## Consume message:
66+
67+
```php
68+
<?php
69+
/** @var \Enqueue\Pheanstalk\PheanstalkContext $psrContext */
70+
71+
$fooQueue = $psrContext->createQueue('aQueue');
72+
$consumer = $psrContext->createConsumer($fooQueue);
73+
74+
$message = $consumer->receive(2000); // wait for 2 seconds
75+
76+
$message = $consumer->receiveNoWait(); // fetch message or return null immediately
77+
78+
// process a message
79+
80+
$consumer->acknowledge($message);
81+
// $consumer->reject($message);
82+
```
83+
84+
[back to index](../index.md)

phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@
4949
<directory>pkg/sqs/Tests</directory>
5050
</testsuite>
5151

52+
<testsuite name="pheanstalk transport">
53+
<directory>pkg/pheanstalk/Tests</directory>
54+
</testsuite>
55+
5256
<testsuite name="enqueue-bundle">
5357
<directory>pkg/enqueue-bundle/Tests</directory>
5458
</testsuite>

pkg/amqp-ext/AmqpConnectionFactory.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private function parseDsn($dsn)
150150
], $dsnConfig);
151151

152152
if ('amqp' !== $dsnConfig['scheme']) {
153-
throw new \LogicException('The given DSN scheme "%s" is not supported. Could be "amqp" only.');
153+
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme']));
154154
}
155155

156156
if ($dsnConfig['query']) {

pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig()
2424
public function testThrowIfSchemeIsNotAmqp()
2525
{
2626
$this->expectException(\LogicException::class);
27-
$this->expectExceptionMessage('The given DSN scheme "%s" is not supported. Could be "amqp" only.');
27+
$this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "amqp" only.');
2828

2929
new AmqpConnectionFactory('http://example.com');
3030
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Enqueue\Psr\PsrContext;
8+
use Enqueue\Psr\Spec\SendToAndReceiveFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createQueue(PsrContext $context, $queueName)
31+
{
32+
$queue = $context->createQueue($queueName);
33+
$context->declareQueue($queue);
34+
$context->purge($queue);
35+
36+
return $queue;
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Enqueue\Psr\PsrContext;
8+
use Enqueue\Psr\Spec\SendToAndReceiveFromTopicSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createTopic(PsrContext $context, $topicName)
31+
{
32+
$topic = $context->createTopic($topicName);
33+
$topic->setType(\AMQP_EX_TYPE_FANOUT);
34+
$topic->addFlag(\AMQP_DURABLE);
35+
$context->declareTopic($topic);
36+
37+
return $topic;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Enqueue\Psr\PsrContext;
8+
use Enqueue\Psr\Spec\SendToAndReceiveNoWaitFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createQueue(PsrContext $context, $queueName)
31+
{
32+
$queue = $context->createQueue($queueName);
33+
$context->declareQueue($queue);
34+
$context->purge($queue);
35+
36+
return $queue;
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Enqueue\Psr\PsrContext;
8+
use Enqueue\Psr\Spec\SendToAndReceiveNoWaitFromTopicSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitFromTopicSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createTopic(PsrContext $context, $topicName)
31+
{
32+
$topic = $context->createTopic($topicName);
33+
$topic->setType(\AMQP_EX_TYPE_FANOUT);
34+
$topic->addFlag(\AMQP_DURABLE);
35+
$context->declareTopic($topic);
36+
37+
return $topic;
38+
}
39+
}

0 commit comments

Comments
 (0)