Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I produced 65536 messaged but got only hundred of them consumed #1034

Closed
nullne opened this issue Feb 5, 2018 · 4 comments
Closed

I produced 65536 messaged but got only hundred of them consumed #1034

nullne opened this issue Feb 5, 2018 · 4 comments

Comments

@nullne
Copy link

nullne commented Feb 5, 2018

Versions

Sarama Version: 1.15.0
Kafka Version: 1.0.0
Go Version: 1.9.1

Configuration

What configuration values are you using for Sarama and Kafka?

for sarama:

config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll

for kafka, I removed config including zookeeper and broker id, listeners, log.dirs:

delete.topic.enable=true
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
num.partitions=8
default.replication.factor=3
min.insync.replicas=1
unclean.leader.election.enable=true
log.retention.hours=168
auto.create.topics.enable=false
auto.leader.rebalance.enable=true
Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:115: [Initializing new client]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:646: client/metadata fetching metadata for all topics from broker 10.189.2.25:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:146: Connected to broker at 10.189.2.25:9092 (unregistered)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #2 at 10.189.2.22:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #5 at 10.189.2.25:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #4 at 10.189.2.24:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #1 at 10.189.2.21:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #3 at 10.189.2.23:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #6 at 10.189.2.26:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:161: [Successfully initialized new client]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/5 starting up
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/27
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/2 starting up
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/54
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/15
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/33
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/60
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/1 starting up
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/3 starting up
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/51
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.22:9092 (registered as #2)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/42
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.25:9092 (registered as #5)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/4 starting up
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/53
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/61
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.21:9092 (registered as #1)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/57
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/19
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/63
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/39
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/8
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/44
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.24:9092 (registered as #4)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/2
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/6 starting up
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/28
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/52
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/37
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/58
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/43
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.26:9092 (registered as #6)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/45
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/11
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/21
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/23
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/9
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/30
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/5 state change to [open] on fire-6/3
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/5
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/46
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/34
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/4
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/62
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/24
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/59
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/0
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/10
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/36
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/25
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/48
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/32
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/12
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/50
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/16
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/14
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/22
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/26
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/6
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/56
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/38
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-6/18
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/4 state change to [open] on fire-6/20
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/49
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/1
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/41
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/55
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/29
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/6 state change to [open] on fire-6/40
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/7
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/31
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/35
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/3 state change to [open] on fire-6/13
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/17
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/1 state change to [open] on fire-6/47
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.23:9092 (registered as #3)
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/async_producer.go:823: [Producer shutting down.]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/client.go:194: [Closing Client]
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:189: Closed connection to broker 10.189.2.25:9092
2018-02-05T13:52:34+08:00 INFO: github.com/Shopify/sarama/broker.go:189: Closed connection to broker 10.189.2.22:9092
2018/02/05 13:52:34 Successfully produced: 65536; errors: 0
my test code
func main() {
	config := sarama.NewConfig()
	config.Version = sarama.V1_0_0_0
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	// config.Producer.Flush.Frequency = 10 * time.Second
	// config.Producer.Flush.Bytes = 1024 * 1024
	// config.Producer.Flush.MaxMessages = 1024
	producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
	if err != nil {
		panic(err)
	}

	// Trap SIGINT to trigger a graceful shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var (
		wg                          sync.WaitGroup
		enqueued, successes, errors int
	)

	wg.Add(1)
	go func() {
		defer wg.Done()
		for range producer.Successes() {
			successes++
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		for err := range producer.Errors() {
			log.Println(err)
			errors++
		}
	}()
	counter := 0

ProducerLoop:
	for {
		if counter >= 65536 {
			producer.AsyncClose() // Trigger a shutdown of the producer.
			break ProducerLoop
		}
		message := &sarama.ProducerMessage{
			Topic: *topics,
			// Key:       sarama.StringEncoder(fmt.Sprintf("%d", counter)),
			// Partition: int32(counter),
			Value: sarama.StringEncoder(fmt.Sprintf("%d,%d", counter, time.Now().UnixNano())),
			// Timestamp: time.Now(),
		}
		select {
		case producer.Input() <- message:
			enqueued++

		case <-signals:
			producer.AsyncClose() // Trigger a shutdown of the producer.
			break ProducerLoop
		}
		if *verbose {
			fmt.Printf(".")
		}
		if *sleep {
			// fmt.Println(100 * time.Millisecond)
			time.Sleep(1 * time.Millisecond)
		}
		counter++
	}

	wg.Wait()

	log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}

Problem Description

I used program like above to send messages to kafka, 65536 in total. but when I consumed from this topic, there are only hundreds messages, what's wrong ?

test procedure
  1. create a topic fire-10
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $ZK_HOSTS --replication-factor 3 --partitions 1 --topic fire-10
  1. use the code like above to send 100000 messages to this topic
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:115: [Initializing new client]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:646: client/metadata fetching metadata for all topics from broker 10.189.2.24:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/broker.go:146: Connected to broker at 10.189.2.24:9092 (unregistered)
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #2 at 10.189.2.22:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #5 at 10.189.2.25:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #4 at 10.189.2.24:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #1 at 10.189.2.21:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #3 at 10.189.2.23:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:429: client/brokers registered new broker #6 at 10.189.2.26:9092
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:161: [Successfully initialized new client]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/config.go:351: [ClientID is the default of 'sarama', you should consider setting it to something application-specific.]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/async_producer.go:601: producer/broker/2 starting up
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/async_producer.go:612: producer/broker/2 state change to [open] on fire-10/0
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/broker.go:144: Connected to broker at 10.189.2.22:9092 (registered as #2)
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/async_producer.go:823: [Producer shutting down.]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/client.go:194: [Closing Client]
     2018-02-07T15:25:24+08:00 INFO: github.com/Shopify/sarama/broker.go:189: Closed connection to broker 10.189.2.22:9092
     2018/02/07 15:25:24 Successfully produced: 100000; errors: 0
  1. and then use the official consumer tool to consume all messages:

    $ $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BROKERS --topic fire-10  --from-beginning
    0,1517988324567854348
    1,1517988324567856670
    5255,1517988324573265310
    12267,1517988324582062306
    21119,1517988324599318970
    32962,1517988324620456108
    48323,1517988324647772463
    68646,1517988324687259774
    93783,1517988324733413847
    ^CProcessed a total of 9 messages
    

    yes, only 9 messages

  2. and i check the consumer offset


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
fire-10                        0          100000          100000          0          -                                                 -                              -
  1. and i check the origin log file of kafka, it seems there are more than 9 messages there, but i am not sure about it because the log file is not readable.
@getachew
Copy link

getachew commented Feb 6, 2018

Could you try to remove the config.Version and retest?

#1032 reported seeing some inconsistencies on number of messages produced with config.Version added. Removing it seems to produce all messages.

@nullne
Copy link
Author

nullne commented Feb 7, 2018

@getachew great, when I remove the config it works!
This is a serious bug though.
After looking through #1032 , I found there are some differences between us:

  1. I used the async producer
  2. I found some data sent to kafka in kafka data dir but I cannot consume them (I will check this again)

@eapache
Copy link
Contributor

eapache commented Feb 7, 2018

The sync producer uses the async producer under the hood, so this is not a relevant difference.

It sounds a lot like the same issue to me, hopefully it is easy to fix but I have not had a lot of time recently to look into it.

@eapache
Copy link
Contributor

eapache commented Feb 9, 2018

Marking this as a duplicate of #1032 since all the conversation is happening there.

@eapache eapache closed this as completed Feb 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants