11# Basic Producer Example
22
33``` javascript
4- const { Kafka } = require (' ../.. ' ).KafkaJS
4+ const { Kafka } = require (' @confluentinc/kafka-javascript ' ).KafkaJS ;
55
66async function producerStart () {
77 const producer = new Kafka ().producer ({
@@ -27,38 +27,40 @@ producerStart();
2727# Basic Consumer Example
2828
2929``` javascript
30- const { Kafka } = require (' ../.. ' ).KafkaJS
30+ const { Kafka } = require (' @confluentinc/kafka-javascript ' ).KafkaJS ;
3131
3232async function consumerStart () {
33- const consumer = new Kafka ().consumer ({
33+ let consumer;
34+ let stopped = false ;
35+
36+ // Initialization
37+ consumer = new Kafka ().consumer ({
3438 ' bootstrap.servers' : ' <fill>' ,
3539 ' group.id' : ' test' ,
3640 ' auto.offset.reset' : ' earliest' ,
3741 });
3842
3943 await consumer .connect ();
40-
41- await consumer .subscribe ({ topics: [ " topic" ] });
42-
43- let stopped = false ;
44- while (! stopped) {
45- const message = await consumer .consume (1000 );
46- if (! message) {
47- continue ;
44+ await consumer .subscribe ({ topics: [" topic" ] });
45+
46+ consumer .run ({
47+ eachMessage: async ({ topic, partition, message }) => {
48+ console .log ({
49+ topic,
50+ partition,
51+ offset: message .offset ,
52+ key: message .key ? .toString (),
53+ value: message .value .toString (),
54+ });
4855 }
49- console .log ({
50- topic: message .topic ,
51- partition: message .partition ,
52- offset: message .offset ,
53- key: message .key ? .toString (),
54- value: message .value .toString (),
55- });
56+ });
5657
57- // Update stopped whenever we're done consuming.
58- // stopped = true;
58+ // Update stopped whenever we're done consuming.
59+ // The update can be in another async function or scheduled with setTimeout etc.
60+ while (! stopped) {
61+ await new Promise (resolve => setTimeout (resolve, 1000 ));
5962 }
6063
61- // Disconnect and clean up.
6264 await consumer .disconnect ();
6365}
6466
0 commit comments