Skip to content

Commit 4fa5870

Browse files
authored
Merge pull request #167 from php-enqueue/gps
Google Pub/Sub
2 parents 6721dd0 + ddcf39a commit 4fa5870

35 files changed

+2489
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Features:
1414
* [Beanstalk](docs/transport/pheanstalk.md)
1515
* [STOMP](docs/transport/stomp.md)
1616
* [Amazon SQS](docs/transport/sqs.md)
17+
* [Google PubSub](docs/transport/gps.md)
1718
* [Kafka](docs/transport/kafka.md)
1819
* [Redis](docs/transport/redis.md)
1920
* [Gearman](docs/transport/gearman.md)

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"enqueue/gearman": "*@dev",
2121
"enqueue/rdkafka": "*@dev",
2222
"kwn/php-rdkafka-stubs": "^1.0.2",
23+
"enqueue/gps": "*@dev",
2324
"enqueue/enqueue-bundle": "*@dev",
2425
"enqueue/job-queue": "*@dev",
2526
"enqueue/simple-client": "*@dev",
@@ -129,6 +130,10 @@
129130
"type": "path",
130131
"url": "pkg/rdkafka"
131132
},
133+
{
134+
"type": "path",
135+
"url": "pkg/gps"
136+
},
132137
{
133138
"type": "path",
134139
"url": "pkg/simple-client"

docker-compose.yml

+7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ services:
1212
- gearmand
1313
- kafka
1414
- zookeeper
15+
- google-pubsub
1516
volumes:
1617
- './:/mqdev'
1718
environment:
@@ -40,6 +41,8 @@ services:
4041
- GEARMAN_DSN=gearman://gearmand:4730
4142
- RDKAFKA_HOST=kafka
4243
- RDKAFKA_PORT=9092
44+
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
45+
- GCLOUD_PROJECT=mqdev
4346

4447
rabbitmq:
4548
image: enqueue/rabbitmq:latest
@@ -89,6 +92,10 @@ services:
8992
volumes:
9093
- '/var/run/docker.sock:/var/run/docker.sock'
9194

95+
google-pubsub:
96+
image: 'google/cloud-sdk:latest'
97+
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'
98+
9299
volumes:
93100
mysql-data:
94101
driver: local

docs/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [Transports](#transports)
55
- Amqp based on [the ext](transport/amqp.md), [bunny](transport/amqp_bunny.md), [the lib](transport/amqp_lib.md)
66
- [Amazon SQS](transport/sqs.md)
7+
- [Google PubSub](transport/gps.md)
78
- [Beanstalk (Pheanstalk)](transport/pheanstalk.md)
89
- [Gearman](transport/gearman.md)
910
- [Kafka](transport/kafka.md)

docs/transport/gps.md

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Google Pub Sub transport
2+
3+
A transport for [Google Pub Sub](https://cloud.google.com/pubsub/docs/) cloud MQ.
4+
It uses internally official google sdk library [google/cloud-pubsub](https://packagist.org/packages/google/cloud-pubsub)
5+
6+
* [Installation](#installation)
7+
* [Create context](#create-context)
8+
* [Send message to topic](#send-message-to-topic)
9+
* [Consume message](#consume-message)
10+
11+
## Installation
12+
13+
```bash
14+
$ composer require enqueue/gps
15+
```
16+
17+
## Create context
18+
19+
To enable the Google Cloud Pub/Sub Emulator, set the `PUBSUB_EMULATOR_HOST` environment variable.
20+
There is a handy docker container [google/cloud-sdk](https://hub.docker.com/r/google/cloud-sdk/).
21+
22+
```php
23+
<?php
24+
use Enqueue\Gps\GpsConnectionFactory;
25+
26+
putenv('PUBSUB_EMULATOR_HOST=http://localhost:8900');
27+
28+
$connectionFactory = new GpsConnectionFactory();
29+
30+
$psrContext = $connectionFactory->createContext();
31+
```
32+
33+
## Send message to topic
34+
35+
Before you can send message you have to declare a topic.
36+
The operation creates a topic on a broker side.
37+
Google allows messages to be sent only to topic.
38+
39+
```php
40+
<?php
41+
/** @var \Enqueue\Gps\GpsContext $psrContext */
42+
43+
$fooTopic = $psrContext->createTopic('foo');
44+
$message = $psrContext->createMessage('Hello world!');
45+
46+
$psrContext->declareTopic($fooTopic);
47+
48+
$psrContext->createProducer()->send($fooTopic, $message);
49+
```
50+
51+
## Consume message:
52+
53+
Before you can consume message you have to subscribe a queue to the topic.
54+
Google does not allow consuming message from the topic directly.
55+
56+
```php
57+
<?php
58+
/** @var \Enqueue\Gps\GpsContext $psrContext */
59+
60+
$fooTopic = $psrContext->createTopic('foo');
61+
$fooQueue = $psrContext->createQueue('foo');
62+
63+
$psrContext->subscribe($fooTopic, $fooQueue);
64+
65+
$consumer = $psrContext->createConsumer($fooQueue);
66+
$message = $consumer->receive();
67+
68+
// process a message
69+
70+
$consumer->acknowledge($message);
71+
// $consumer->reject($message);
72+
```
73+
74+
[back to index](../index.md)

phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@
7373
<directory>pkg/rdkafka/Tests</directory>
7474
</testsuite>
7575

76+
<testsuite name="Google Pub/Sub transport">
77+
<directory>pkg/gps/Tests</directory>
78+
</testsuite>
79+
7680
<testsuite name="enqueue-bundle">
7781
<directory>pkg/enqueue-bundle/Tests</directory>
7882
</testsuite>

pkg/gps/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

pkg/gps/.travis.yml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 10
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

pkg/gps/Client/GpsDriver.php

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
<?php
2+
3+
namespace Enqueue\Gps\Client;
4+
5+
use Enqueue\Client\Config;
6+
use Enqueue\Client\DriverInterface;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\Meta\QueueMetaRegistry;
9+
use Enqueue\Gps\GpsContext;
10+
use Enqueue\Gps\GpsMessage;
11+
use Enqueue\Gps\GpsQueue;
12+
use Enqueue\Gps\GpsTopic;
13+
use Interop\Queue\PsrMessage;
14+
use Psr\Log\LoggerInterface;
15+
use Psr\Log\NullLogger;
16+
17+
class GpsDriver implements DriverInterface
18+
{
19+
/**
20+
* @var GpsContext
21+
*/
22+
private $context;
23+
24+
/**
25+
* @var Config
26+
*/
27+
private $config;
28+
29+
/**
30+
* @var QueueMetaRegistry
31+
*/
32+
private $queueMetaRegistry;
33+
34+
/**
35+
* @param GpsContext $context
36+
* @param Config $config
37+
* @param QueueMetaRegistry $queueMetaRegistry
38+
*/
39+
public function __construct(GpsContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
40+
{
41+
$this->context = $context;
42+
$this->config = $config;
43+
$this->queueMetaRegistry = $queueMetaRegistry;
44+
}
45+
46+
/**
47+
* {@inheritdoc}
48+
*/
49+
public function sendToRouter(Message $message)
50+
{
51+
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
52+
throw new \LogicException('Topic name parameter is required but is not set');
53+
}
54+
55+
$topic = $this->createRouterTopic();
56+
$transportMessage = $this->createTransportMessage($message);
57+
58+
$this->context->createProducer()->send($topic, $transportMessage);
59+
}
60+
61+
/**
62+
* {@inheritdoc}
63+
*/
64+
public function sendToProcessor(Message $message)
65+
{
66+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
67+
throw new \LogicException('Processor name parameter is required but is not set');
68+
}
69+
70+
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
71+
throw new \LogicException('Queue name parameter is required but is not set');
72+
}
73+
74+
$transportMessage = $this->createTransportMessage($message);
75+
$destination = $this->context->createTopic(
76+
$this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName())
77+
;
78+
79+
$this->context->createProducer()->send($destination, $transportMessage);
80+
}
81+
82+
/**
83+
* {@inheritdoc}
84+
*/
85+
public function setupBroker(LoggerInterface $logger = null)
86+
{
87+
$logger = $logger ?: new NullLogger();
88+
$log = function ($text, ...$args) use ($logger) {
89+
$logger->debug(sprintf('[GpsDriver] '.$text, ...$args));
90+
};
91+
92+
// setup router
93+
$routerTopic = $this->createRouterTopic();
94+
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
95+
96+
$log('Subscribe router topic to queue: %s -> %s', $routerTopic->getTopicName(), $routerQueue->getQueueName());
97+
$this->context->subscribe($routerTopic, $routerQueue);
98+
99+
// setup queues
100+
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
101+
$topic = $this->context->createTopic($meta->getTransportName());
102+
$queue = $this->context->createQueue($meta->getTransportName());
103+
104+
$log('Subscribe processor topic to queue: %s -> %s', $topic->getTopicName(), $queue->getQueueName());
105+
$this->context->subscribe($topic, $queue);
106+
}
107+
}
108+
109+
/**
110+
* {@inheritdoc}
111+
*
112+
* @return GpsQueue
113+
*/
114+
public function createQueue($queueName)
115+
{
116+
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
117+
118+
return $this->context->createQueue($transportName);
119+
}
120+
121+
/**
122+
* {@inheritdoc}
123+
*
124+
* @return GpsMessage
125+
*/
126+
public function createTransportMessage(Message $message)
127+
{
128+
$headers = $message->getHeaders();
129+
$properties = $message->getProperties();
130+
131+
$transportMessage = $this->context->createMessage();
132+
$transportMessage->setBody($message->getBody());
133+
$transportMessage->setHeaders($headers);
134+
$transportMessage->setProperties($properties);
135+
$transportMessage->setMessageId($message->getMessageId());
136+
$transportMessage->setTimestamp($message->getTimestamp());
137+
$transportMessage->setReplyTo($message->getReplyTo());
138+
$transportMessage->setCorrelationId($message->getCorrelationId());
139+
140+
return $transportMessage;
141+
}
142+
143+
/**
144+
* @param GpsMessage $message
145+
*
146+
* {@inheritdoc}
147+
*/
148+
public function createClientMessage(PsrMessage $message)
149+
{
150+
$clientMessage = new Message();
151+
152+
$clientMessage->setBody($message->getBody());
153+
$clientMessage->setHeaders($message->getHeaders());
154+
$clientMessage->setProperties($message->getProperties());
155+
$clientMessage->setMessageId($message->getMessageId());
156+
$clientMessage->setTimestamp($message->getTimestamp());
157+
$clientMessage->setReplyTo($message->getReplyTo());
158+
$clientMessage->setCorrelationId($message->getCorrelationId());
159+
160+
return $clientMessage;
161+
}
162+
163+
/**
164+
* @return Config
165+
*/
166+
public function getConfig()
167+
{
168+
return $this->config;
169+
}
170+
171+
/**
172+
* @return GpsTopic
173+
*/
174+
private function createRouterTopic()
175+
{
176+
$topic = $this->context->createTopic(
177+
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
178+
);
179+
180+
return $topic;
181+
}
182+
}

0 commit comments

Comments
 (0)