Skip to content

Commit 5087383

Browse files
authored
prepare beta (#5)
* upadte orbs * update readme / composer * reneable cc coverage * fix job * move cc to end * move clover again
1 parent a3a67be commit 5087383

File tree

4 files changed

+285
-8
lines changed

4 files changed

+285
-8
lines changed

.circleci/config.yml

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
version: 2.1
22

33
orbs:
4-
ci-caching: jobcloud/ci-caching@0.5
5-
ci-php: jobcloud/ci-php@0.5
4+
ci-caching: jobcloud/ci-caching@0.12
5+
ci-php: jobcloud/ci-php@0.29
66

77
workflows:
88
test-php-kafka-lib:
@@ -14,9 +14,7 @@ workflows:
1414
dependencyCheckSumFile: "./composer.json"
1515
requires:
1616
- ci-caching/build-docker-images
17-
- ci-php/coverage:
18-
dockerComposeFile: "./docker/docker-compose.yml"
19-
dependencyCheckSumFile: "./composer.json"
17+
- coverage:
2018
requires:
2119
- ci-php/install-dependencies
2220
- ci-php/code-style:
@@ -29,3 +27,21 @@ workflows:
2927
dependencyCheckSumFile: "./composer.json"
3028
requires:
3129
- ci-php/install-dependencies
30+
31+
jobs:
32+
coverage:
33+
machine: true
34+
steps:
35+
- ci-php/coverage-command:
36+
dockerComposeFile: "./docker/docker-compose.yml"
37+
dependencyCheckSumFile: "./composer.json"
38+
- run:
39+
name: Download cc-test-reporter
40+
command: |
41+
mkdir -p tmp/
42+
curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./tmp/cc-test-reporter
43+
chmod +x ./tmp/cc-test-reporter
44+
- run:
45+
name: Upload coverage results to Code Climate
46+
command: |
47+
./tmp/cc-test-reporter after-build -p /var/www/html --coverage-input-type clover --exit-code $?

README.md

Lines changed: 262 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,263 @@
11
# php-kafka-lib
2-
WIP: to replace [messaging-lib](https://github.com/jobcloud/messaging-lib)
2+
3+
[![CircleCI](https://circleci.com/gh/jobcloud/php-kafka-lib.svg?style=shield)](https://circleci.com/gh/jobcloud/php-kafka-lib)
4+
[![Maintainability](https://api.codeclimate.com/v1/badges/beae5fe991d080cbad8c/maintainability)](https://codeclimate.com/github/jobcloud/php-kafka-lib/maintainability)
5+
[![Test Coverage](https://api.codeclimate.com/v1/badges/beae5fe991d080cbad8c/test_coverage)](https://codeclimate.com/github/jobcloud/php-kafka-lib/test_coverage)
6+
[![Latest Stable Version](https://poser.pugx.org/jobcloud/php-kafka-lib/v/stable)](https://packagist.org/packages/jobcloud/php-kafka-lib)
7+
[![Latest Unstable Version](https://poser.pugx.org/jobcloud/php-kafka-lib/v/unstable)](https://packagist.org/packages/jobcloud/php-kafka-lib)
8+
9+
## Description
10+
This is a library that makes it easier to use Kafka in your PHP project.
11+
12+
This library relies on [arnaud-lb/php-rdkafka](https://github.com/arnaud-lb/php-rdkafka)
13+
Avro support relies on [flix-tech/avro-serde-php](https://github.com/flix-tech/avro-serde-php)
14+
The [documentation](https://arnaud.le-blanc.net/php-rdkafka/phpdoc/book.rdkafka.html) of the php extension,
15+
can help out to understand the internals of this library.
16+
17+
18+
## Requirements
19+
- php: ^7.3
20+
- ext-rdkafka: ^4.0.0
21+
22+
## Installation
23+
```composer require jobcloud/php-kafka-lib "~1.0"```
24+
25+
## Usage
26+
27+
### Producer
28+
29+
#### Kafka
30+
31+
##### Simple example
32+
```php
33+
<?php
34+
35+
use Jobcloud\Kafka\Message\KafkaProducerMessage;
36+
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
37+
38+
$producer = KafkaProducerBuilder::create()
39+
->withAdditionalBroker('localhost:9092')
40+
->build();
41+
42+
$message = KafkaProducerMessage::create('test-topic', 0)
43+
->withKey('asdf-asdf-asfd-asdf')
44+
->withBody('some test message payload')
45+
->withHeaders([ 'key' => 'value' ]);
46+
47+
$producer->produce($message);
48+
```
49+
##### Avro Producer
50+
To create an avro prodcuer add the avro encoder.
51+
52+
```php
53+
<?php
54+
55+
use FlixTech\AvroSerializer\Objects\RecordSerializer;
56+
use Jobcloud\Kafka\Message\KafkaProducerMessage;
57+
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
58+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
59+
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
60+
use Jobcloud\Kafka\Message\KafkaAvroSchema;
61+
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
62+
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
63+
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
64+
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
65+
use GuzzleHttp\Client;
66+
67+
$cachedRegistry = new CachedRegistry(
68+
new BlockingRegistry(
69+
new PromisingRegistry(
70+
new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
71+
)
72+
),
73+
new AvroObjectCacheAdapter()
74+
);
75+
76+
$registry = new AvroSchemaRegistry($cachedRegistry);
77+
$recordSerializer = new RecordSerializer($cachedRegistry);
78+
79+
//if no version is defined, latest version will be used
80+
//if no schema definition is defined, the appropriate version will be fetched form the registry
81+
$registry->addSchemaMappingForTopic(
82+
'test-topic',
83+
new KafkaAvroSchema('schemaName' /*, int $version, AvroSchema $definition */)
84+
);
85+
86+
$encoder = new AvroEncoder($registry, $recordSerializer);
87+
88+
$producer = KafkaProducerBuilder::create()
89+
->withAdditionalBroker('kafka:9092')
90+
->withEncoder($encoder)
91+
->build();
92+
93+
$schemaName = 'testSchema';
94+
$version = 1;
95+
$message = KafkaProducerMessage::create('test-topic', 0)
96+
->withKey('asdf-asdf-asfd-asdf')
97+
->withBody(['name' => 'someName'])
98+
->withHeaders([ 'key' => 'value' ]);
99+
100+
$producer->produce($message);
101+
```
102+
103+
**NOTE:** To improve producer latency you can install the `pcntl` extension.
104+
The php-kafka-lib already has code in place, similarly described here:
105+
https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings
106+
107+
### Consumer
108+
109+
#### Kafka High Level
110+
111+
```php
112+
<?php
113+
114+
use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
115+
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
116+
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
117+
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
118+
119+
$consumer = KafkaConsumerBuilder::create()
120+
->withAdditionalConfig(
121+
[
122+
'compression.codec' => 'lz4',
123+
'auto.commit.interval.ms' => 500
124+
]
125+
)
126+
->withAdditionalBroker('kafka:9092')
127+
->withConsumerGroup('testGroup')
128+
->withTimeout(120 * 10000)
129+
->withAdditionalSubscription('test-topic')
130+
->build();
131+
132+
$consumer->subscribe();
133+
134+
while (true) {
135+
try {
136+
$message = $consumer->consume();
137+
// your business logic
138+
$consumer->commit($message);
139+
} catch (KafkaConsumerTimeoutException $e) {
140+
//no messages were read in a given time
141+
} catch (KafkaConsumerEndOfPartitionException $e) {
142+
//only occurs if enable.partition.eof is true (default: false)
143+
} catch (KafkaConsumerConsumeException $e) {
144+
// Failed
145+
}
146+
}
147+
```
148+
149+
#### Kafka Low Level
150+
151+
```php
152+
<?php
153+
154+
use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
155+
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
156+
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
157+
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
158+
159+
$consumer = KafkaConsumerBuilder::create()
160+
->withAdditionalConfig(
161+
[
162+
'compression.codec' => 'lz4',
163+
'auto.commit.interval.ms' => 500
164+
]
165+
)
166+
->withAdditionalBroker('kafka:9092')
167+
->withConsumerGroup('testGroup')
168+
->withTimeout(120 * 10000)
169+
->withAdditionalSubscription('test-topic')
170+
->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL)
171+
->build();
172+
173+
$consumer->subscribe();
174+
175+
while (true) {
176+
try {
177+
$message = $consumer->consume();
178+
// your business logic
179+
$consumer->commit($message);
180+
} catch (KafkaConsumerTimeoutException $e) {
181+
//no messages were read in a given time
182+
} catch (KafkaConsumerEndOfPartitionException $e) {
183+
//only occurs if enable.partition.eof is true (default: false)
184+
} catch (KafkaConsumerConsumeException $e) {
185+
// Failed
186+
}
187+
}
188+
```
189+
190+
#### Avro Consumer
191+
To create an avro consumer add the avro decoder.
192+
193+
```php
194+
<?php
195+
196+
use FlixTech\AvroSerializer\Objects\RecordSerializer;
197+
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
198+
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
199+
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
200+
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
201+
use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
202+
use Jobcloud\Kafka\Message\KafkaAvroSchema;
203+
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
204+
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
205+
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
206+
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
207+
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
208+
use GuzzleHttp\Client;
209+
210+
$cachedRegistry = new CachedRegistry(
211+
new BlockingRegistry(
212+
new PromisingRegistry(
213+
new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
214+
)
215+
),
216+
new AvroObjectCacheAdapter()
217+
);
218+
219+
$registry = new AvroSchemaRegistry($cachedRegistry);
220+
$recordSerializer = new RecordSerializer($cachedRegistry);
221+
222+
//if no version is defined, latest version will be used
223+
//if no schema definition is defined, the appropriate version will be fetched form the registry
224+
$registry->addSchemaMappingForTopic(
225+
'test-topic',
226+
new KafkaAvroSchema('someSchema' , 9 /* , AvroSchema $definition */)
227+
);
228+
229+
$decoder = new AvroDecoder($registry, $recordSerializer);
230+
231+
$consumer = KafkaConsumerBuilder::create()
232+
->withAdditionalConfig(
233+
[
234+
'compression.codec' => 'lz4',
235+
'auto.commit.interval.ms' => 500
236+
]
237+
)
238+
->withDecoder($decoder)
239+
->withAdditionalBroker('kafka:9092')
240+
->withConsumerGroup('testGroup')
241+
->withTimeout(120 * 10000)
242+
->withAdditionalSubscription('test-topic')
243+
->build();
244+
245+
$consumer->subscribe();
246+
247+
while (true) {
248+
try {
249+
$message = $consumer->consume();
250+
// your business logic
251+
$consumer->commit($message);
252+
} catch (KafkaConsumerTimeoutException $e) {
253+
//no messages were read in a given time
254+
} catch (KafkaConsumerEndOfPartitionException $e) {
255+
//only occurs if enable.partition.eof is true (default: false)
256+
} catch (KafkaConsumerConsumeException $e) {
257+
// Failed
258+
}
259+
}
260+
```
261+
262+
## Additional information
263+
Replaces [messaging-lib](https://github.com/jobcloud/messaging-lib)

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
},
3939
"extra": {
4040
"branch-alias": {
41-
"dev-master": "4.0-dev"
41+
"dev-master": "1.0-dev"
4242
}
4343
}
4444
}

phpunit.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<log type="coverage-text" target="php://stdout" showOnlySummary="true"/>
3232
<log type="coverage-html" target="build/logs/phpunit/coverage"/>
3333
<log type="coverage-xml" target="build/logs/phpunit/coverage/coverage-xml"/>
34-
<log type="coverage-clover" target="build/logs/phpunit/coverage/coverage.xml"/>
34+
<log type="coverage-clover" target="clover.xml"/>
3535
<log type="junit" target="build/logs/phpunit/junit.xml"/>
3636
</logging>
3737
<filter>

0 commit comments

Comments
 (0)