This module is a bridge of koco/messenger-kafka for Magento 2.
It is built to use Magento 2 native queue system with some adjustments to the settings.
It also handles reading with Avro Schemes.
Only the reading part is done from now. You can't write to a queue.
composer require adexos/m2-kafka-connector
To do so, you can simply include in your app/code/Namespace/Module/etc/adminhtml/system.xml
the kafka configuration
form :
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Config:etc/system_file.xsd">
<system>
<section id="kafka" translate="label" type="text" sortOrder="300" showInDefault="1">
<group id="warehouse" translate="" type="text" sortOrder="20" showInDefault="1">
<label>Warehouse</label>
<include path="Adexos_KafkaConnector::includes/kafka_conf_included.xml"/>
</group>
</section>
</system>
</config>
Please note the group id
you set, it will be used to connect the queue runner to the Kafka broker
You can find the configuration here : Stores -> Configuration -> Services -> Kafka
app/code/Namespace/Module/etc/communication.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="warehouse.update.stock" request="Namespace\Module\Model\StockMessage"/>
</config>
The request
field will be used as the model of the message you are receiving.
This class must be full typed with PhpDoc because of Magento 2 requirements
Please note that the
topic name
DO NOT HAVE to be the same as the Kafka queue you are looking for. Since most of Kafka queues are in the same broker but have different names depending on the environment, we cannot define them in .xml as we do for other connection types likedb
queuesInstead, the real Kafka topic name must be defined in the system you have set earlier.**
app/code/Namespace/Module/etc/queue_consumer.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="warehouse"
queue="warehouse.stock.update"
connection="kafka.warehouse"
handler="Namespace\Module\Handler\StockMessageHandler::handle"
onlySpawnWhenMessageAvailable="0"
/>
</config>
queue
: as per Magento doc, it must be identical to the topic name defined in thecommunication.xml
fileconnection
: please note that the Kafka connection must starts withkafka.
, for example :kafka.warehouse
. This is done to detect all kafka connection types and to retrieve them in the configuration defined.
If connection is
kafka.warehouse
, the group id defined in the system.xml file but bewarehouse
. This allow us to map through thecore_config_data
table automatically
handler
: The handler that will take your message and process it. The parameter type must be as same type as the one defined incommunication.xml
inside therequest
fieldonlySpawnWhenMessageAvailable
: this flag must be set to zero. Since natively Magento only spawns a consumer when there is a message available, the Kafka consumer will be spawned and despawned endlessly. A Kafka consumer only commits its offset when a message is read. However, if no message is read and no offset is committed yet, the next time the consumer will spawn, it'll ready from the very end of the queue. Adding this flag ensure that a message will be read at least once and the offset will be commited. In fact, you can remove it after the offset is commit (manually or automatically). It is not advised to do so.