Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to set kafka message key with a defaulter being used? #1101

Open
MaerF0x0 opened this issue Oct 2, 2024 · 0 comments
Open

How to set kafka message key with a defaulter being used? #1101

MaerF0x0 opened this issue Oct 2, 2024 · 0 comments

Comments

@MaerF0x0
Copy link

MaerF0x0 commented Oct 2, 2024

Symptom: All my events are being produced to a single topic partition

Context:

  • following kafka sender sample, setting the producer message key here
  • But also using a defaulter to set ID as KSUIDs
// defaultKSUIDs sets the ID to a new KSUID if non is set (recommended)
func defaultKSUIDs() client.EventDefaulter {
	return func(ctx context.Context, evt event.Event) event.Event {
		if evt.ID() == "" {
			evt.Context = evt.Context.Clone()
			kID := ksuid.New().String()
			evt.SetID(kID)
		}
		return evt
	}
}

// withKSUIDs allows the defaultKSUIDs to be used as a client.Option for
func withKSUIDs() client.Option {
	return client.WithEventDefaulter(defaultKSUIDs())
}

// elsewhere

c, err := cloudevents.NewClient(sender,
	cloudevents.WithTimeNow(),
	withKSUIDs(),
)

Problem:

  • in my case the ID is empty, because it should be set by the defaulter

Is there a way I can either manually apply the defaulters to an event, or to make that partition key be lazy evaluated?

(also note how the sender example has to explicitly set the ID, contradictory with the cloudevents.WithUUIDs() defaulter set)

Helper commands i used to debug:

# run the broker
docker run -d -p 9092:9092  --name broker apache/kafka:latest                                                                                                                                                                                                         

# create topic w/ 4 partitions
docker exec --workdir /opt/kafka/bin/  broker ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 4

cd cloudevents/sdk-go/samples/kafka/sender
./main.go

Observe with kafka-console-consumer or kcat

/opt/homebrew/bin/kafka-console-consumer --topic test-topic \
  --bootstrap-server :9092 \
  --from-beginning \
  --property print.key=true \
  --property print.partition=true
Partition:2	3db17c45-95a1-4389-bca9-377d09d23ddb	{"internal-id":9,"message":"Hello, World!"}
Partition:3	72e140a3-8947-4e2f-84e1-de8531e5162c	{"internal-id":1,"message":"Hello, World!"}
Partition:3	03131605-65df-4675-8153-a52ee17bf7a3	{"internal-id":2,"message":"Hello, World!"}
Partition:3	1cc9945b-c1ae-49d0-8558-057667604134	{"internal-id":3,"message":"Hello, World!"}
Partition:3	329f7d3f-e3e1-40ff-b14c-b4c20a336cc6	{"internal-id":6,"message":"Hello, World!"}
Partition:0	3b5993cd-2163-4a24-8479-2f69b8cc5ce2	{"internal-id":0,"message":"Hello, World!"}
Partition:0	f0eb03f4-e344-434d-b0a9-f0c43592f2d8	{"internal-id":4,"message":"Hello, World!"}
Partition:0	afde6b33-d91b-43b6-9a3a-9f7294c72a17	{"internal-id":5,"message":"Hello, World!"}
Partition:0	d55e4e69-92a0-47f6-890c-23856468b241	{"internal-id":7,"message":"Hello, World!"}
Partition:0	d4444993-40fc-432e-a3b8-4f9048f8d46b	{"internal-id":8,"message":"Hello, World!"}
kcat -b localhost:9092 -C -t test-topic -f '%t %p @ %o: %s\n'
test-topic 0 @ 0: {"internal-id":0,"message":"Hello, World!"}
test-topic 0 @ 1: {"internal-id":4,"message":"Hello, World!"}
test-topic 0 @ 2: {"internal-id":5,"message":"Hello, World!"}
test-topic 0 @ 3: {"internal-id":7,"message":"Hello, World!"}
test-topic 0 @ 4: {"internal-id":8,"message":"Hello, World!"}
% Reached end of topic test-topic [0] at offset 5
% Reached end of topic test-topic [1] at offset 0
test-topic 2 @ 0: {"internal-id":9,"message":"Hello, World!"}
test-topic 3 @ 0: {"internal-id":1,"message":"Hello, World!"}
test-topic 3 @ 1: {"internal-id":2,"message":"Hello, World!"}
test-topic 3 @ 2: {"internal-id":3,"message":"Hello, World!"}
test-topic 3 @ 3: {"internal-id":6,"message":"Hello, World!"}
% Reached end of topic test-topic [2] at offset 1
% Reached end of topic test-topic [3] at offset 4

Also aside/nit : I'd recommend we change the sample message to not have an "id" field set because that can be confusing to new folks. Which ID is used?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant