Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending messages with ZStd compression enabled fails in multiple ways #1252

Closed
akrennmair opened this issue Jan 11, 2019 · 11 comments · Fixed by #1477
Closed

Sending messages with ZStd compression enabled fails in multiple ways #1252

akrennmair opened this issue Jan 11, 2019 · 11 comments · Fixed by #1477
Labels

Comments

@akrennmair
Copy link

Versions

Sarama Version: v1.20.1
Kafka Version: 2.1.0 (Scala version 2.12)
Go Version: 1.11.4

Configuration

What configuration values are you using for Sarama and Kafka?

Kafka configuration:
server.properties.txt

Logs
=== RUN   TestSaramaZSTD
SARAMA: Initializing new client
SARAMA: client/metadata fetching metadata for all topics from broker localhost:9092
SARAMA: Connected to broker at localhost:9092 (unregistered)
SARAMA: client/brokers registered new broker #1 at localhost:9092
SARAMA: Successfully initialized new client
SARAMA: producer/broker/1 starting up
SARAMA: producer/broker/1 state change to [open] on sarama-test/0
SARAMA: Connected to broker at localhost:9092 (registered as #1)
SARAMA: client/metadata fetching metadata for [sarama-test] from broker localhost:9092
SARAMA: producer/broker/1 state change to [retrying] on sarama-test/0 because kafka server: Message contents does not match its CRC.
SARAMA: Retrying batch for sarama-test-0 because of kafka server: Message contents does not match its CRC.
TEST:   sending message failed: kafka server: Message contents does not match its CRC.
SARAMA: Producer shutting down.
SARAMA: Closing Client
SARAMA: producer/broker/1 shut down
SARAMA: Closed connection to broker localhost:9092
SARAMA: Initializing new client
SARAMA: client/metadata fetching metadata for all topics from broker localhost:9092
SARAMA: Closed connection to broker localhost:9092
SARAMA: Connected to broker at localhost:9092 (unregistered)
SARAMA: client/brokers registered new broker #1 at localhost:9092
SARAMA: Successfully initialized new client
SARAMA: producer/broker/1 starting up
SARAMA: producer/broker/1 state change to [open] on sarama-test/0
SARAMA: Connected to broker at localhost:9092 (registered as #1)
SARAMA: producer/broker/1 state change to [closing] because EOF
SARAMA: Closed connection to broker localhost:9092
TEST:   sending message failed: EOF
SARAMA: Producer shutting down.
SARAMA: Closing Client
SARAMA: Closed connection to broker localhost:9092
SARAMA: producer/broker/1 shut down
--- PASS: TestSaramaZSTD (0.01s)
PASS
ok  	gitlab.n0q.eu/go/engine/sarama-zstd	0.016s
Problem Description

I created a minimal test case to reproduce the issue:

package main

import (
	"fmt"
	"log"
	"os"
	"testing"

	"github.com/Shopify/sarama"
)

func TestSaramaZSTD(t *testing.T) {
	for _, kafkaVersion := range []sarama.KafkaVersion{sarama.V0_10_2_0, sarama.V2_1_0_0} {

		sarama.Logger = log.New(os.Stdout, "SARAMA: ", 0)

		cfg := sarama.NewConfig()

		cfg.ClientID = "sarama-zstd-test"

		cfg.Producer.Return.Errors = true
		cfg.Producer.Return.Successes = true
		cfg.Producer.Retry.Max = 0

		cfg.Version = kafkaVersion

		if err := cfg.Validate(); err != nil {
			t.Fatalf("configuration validation failed: %v", err)
		}

		cfg.Producer.Compression = sarama.CompressionZSTD

		producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg)
		if err != nil {
			t.Fatalf("sarama.NewAsyncProducer failed: %v", err)
		}

		producer.Input() <- &sarama.ProducerMessage{
			Topic: "sarama-test",
			Value: sarama.StringEncoder("hello world!"),
		}

		select {
		case <-producer.Successes():
			fmt.Println("TEST:   sending message was successful")
		case err := <-producer.Errors():
			fmt.Printf("TEST:   sending message failed: %v\n", err.Err)
		}

		if err := producer.Close(); err != nil {
			t.Errorf("producer.Close failed: %v", err)
		}
	}
}

The test program attempts to send a single message to Kafka using a sarama.AsyncProducer with ZStd compression enabled. It does so twice in a row, with different Kafka versions.

With version sarama.V0_10_2_0, it fails with the error output TEST: sending message failed: kafka server: Message contents does not match its CRC., whereas with version sarama.V2_1_0_0, it fails with the error message TEST: sending message failed: EOF. In the latter case, Kafka also produces an error log output:

[2019-01-11 16:35:36,679] ERROR Closing socket for 127.0.0.1:9092-127.0.0.1:32884-1 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: PRODUCE, apiVersion: 3, connectionId: 127.0.0.1:9092-127.0.0.1:32884-1, listenerName: ListenerName(PLAINTEXT), principal: User:ANONYMOUS
Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce requests with version 3 are note allowed to use ZStandard compression
[2019-01-11 16:35:36,684] ERROR Exception while processing request from 127.0.0.1:9092-127.0.0.1:32884-1 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: PRODUCE, apiVersion: 3, connectionId: 127.0.0.1:9092-127.0.0.1:32884-1, listenerName: ListenerName(PLAINTEXT), principal: User:ANONYMOUS
Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce requests with version 3 are note allowed to use ZStandard compression

The expected behaviour for the first case is that sending the message works without causing an error.

The expected behaviour for the second case is that cfg.Validate indicates the invalid combination of Kafka version and compression codec.

@varun06
Copy link
Contributor

varun06 commented Jan 15, 2019

I am also getting 'InvalidRecordException: Produce requests with version 3 are note allowed to use ZStandard compression'
will have a look around tomorrow.

@varun06
Copy link
Contributor

varun06 commented Jan 15, 2019

@bobrik you fixed something related to zstd earlier, any idea?

@bobrik
Copy link
Contributor

bobrik commented Jan 15, 2019

I added support for zstd before Kafka added it. In further discussions Kafka developers apparently decided to bump protocol version to allow zstd compression. See logic in this file:

I think Sarama needs to mimic the same behavior, probably somewhere here I guess:

@varun06
Copy link
Contributor

varun06 commented Jan 16, 2019

Ah, thanks a lot. I will get the PR soon.

@jwatte
Copy link

jwatte commented May 2, 2019

I would love to get this fixed. (Apparently the PR itself isn't good enough yet?)

@lizthegrey
Copy link
Contributor

lizthegrey commented Aug 1, 2019

Correct, the PR doesn't appear to actually send Producer Protocol Version 7 nor does it send Consumer Protocol Version 10, thus causing the broker to reject requests to read and to write zstd encoded payloads.

I'm frustrated that Zstd support wound up getting advertised in 1.20.0 (2018-12-10) (in #1170) as a feature without it functioning correctly end to end against a version validating broker. Fortunately the immediate fixes aren't that hard and I'll follow up shortly with them, although I worry that it's dangerous to skip over the corresponding changes to the other attributes of the protocol that producer versions 4 to 6 and consumer versions 5 to 9 require.

@lizthegrey
Copy link
Contributor

Yeah, there's a number of breaking protocol changes that I see in terms of adding new expected fields to the request/response, so I don't think I should be the one to do this. I have no idea how Cloudflare made this work (unless they hacked their Kafka broker to not demand a minimum protocol version to use zstd), so... gurp.

klauspost added a commit to klauspost/sarama that referenced this issue Sep 3, 2019
Switch from cgo package to pure Go implementation. This will (when fixed) work even if no cgo is present.

https://github.com/klauspost/compress/tree/master/zstd#zstd

Compression level is removed. It could be made so the first request determines the compression level. But since there is currently only two levels (fast and default) I am not sure it is worth it.

This does NOT fix IBM#1252 - only updates the code used for compression/decompression.
@bai bai closed this as completed in #1477 Sep 7, 2019
@praseodym
Copy link

This does NOT fix #1252 - only updates the code used for compression/decompression.

Could someone reopen this issue?

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 17, 2020

Hey @akrennmair
I tested your gist with my branch and it worked.

#1574

Could you please double check that it works for you? Thanks

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 24, 2020

closed by #1574

@d1egoaz d1egoaz closed this as completed Jan 24, 2020
@lizthegrey
Copy link
Contributor

Fix confirmed working. Thank you!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants