Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Redpanda Migrator consumer group offset migration logic #3182

Merged
merged 8 commits into from
Feb 20, 2025

Conversation

mihaitodor
Copy link
Collaborator

@mihaitodor mihaitodor commented Feb 12, 2025

  • Field is_high_watermark added to the redpanda_migrator_offsets output.
  • Metadata field kafka_is_high_watermark added to the redpanda_migrator_offsets input.
  • Fixed a bug with the redpanda_migrator_offsets input and output where the consumer group update migration logic based on timestamp lookup should no longer skip ahead in the destination cluster. This should enforce at-least-once delivery guarantees.
  • The redpanda_migrator_bundle output no longer drops messages if either the redpanda_migrator or the redpanda_migrator_offsets child output throws an error. Connect will keep retrying to write the messages and apply backpressure to the input.

TODO:

  • Push extra integration tests

@mihaitodor mihaitodor force-pushed the mihaitodor-fix-migrator-cg-offset-translation-logic branch 3 times, most recently from 2e64fd5 to 9688466 Compare February 13, 2025 01:10
Comment on lines -90 to -112
fallback:
- label: %s_redpanda_migrator_output
redpanda_migrator: %s
processors:
- mapping: |
meta input_label = deleted()
# TODO: Use a DLQ
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
label: %s_redpanda_migrator_output
redpanda_migrator: %s
processors:
- mapping: |
meta input_label = deleted()
- check: metadata("input_label") == "redpanda_migrator_offsets_input"
output:
fallback:
- label: %s_redpanda_migrator_offsets_output
redpanda_migrator_offsets: %s
# TODO: Use a DLQ
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really shouldn't be dropping messages here... I left this as a TODO initially thinking that we might want to configure DLQs, but it's best to just block if we can't write data. If it's a transitive issue, it will resume when the broker becomes responsive again. If not, an operator will have to look into it. Restarting Migrator should be a safe operation (even if it potentially introduces duplicate records).

Comment on lines -137 to -159
fallback:
- label: %s_redpanda_migrator_output
redpanda_migrator: %s
processors:
- mapping: |
meta input_label = deleted()
# TODO: Use a DLQ
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
label: %s_redpanda_migrator_output
redpanda_migrator: %s
processors:
- mapping: |
meta input_label = deleted()
- check: metadata("input_label") == "redpanda_migrator_offsets_input"
output:
fallback:
- label: %s_redpanda_migrator_offsets_output
redpanda_migrator_offsets: %s
# TODO: Use a DLQ
- drop: {}
processors:
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
Copy link
Collaborator Author

@mihaitodor mihaitodor Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same logic here, just repeated for the case when we don't use schemas.

@mihaitodor mihaitodor force-pushed the mihaitodor-fix-migrator-cg-offset-translation-logic branch 3 times, most recently from 56673b2 to 62c0a1b Compare February 13, 2025 02:43
@mihaitodor mihaitodor marked this pull request as ready for review February 13, 2025 02:44
Copy link
Collaborator

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks great! Is it possible to write any tests for this case? I will take a deeper look at the PR on Monday. Thanks Mihai

@mihaitodor
Copy link
Collaborator Author

Thank you for having a look Tyler! By the way, I think we won't have to decrement the timestamps in any way, since ListOffsetsAfterMilli() is inclusive as you mentioned in twmb/franz-go#897. Will think more through it as I clean up the tests...

@mihaitodor mihaitodor force-pushed the mihaitodor-fix-migrator-cg-offset-translation-logic branch 3 times, most recently from 851ff84 to 407f84a Compare February 17, 2025 19:54
return 0, fmt.Errorf("couldn't find the last record for topic %q partition %q: %s", topic, partition, err)
}

return it.Next().Offset, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can replace this function with a ListEndOffsets call right? It should be max(ListEndOffsets - 1, 0) I think because if the topic is empty you'll get back a high watermark that matches the log start position:

PARTITIONS
==========
PARTITION  LEADER  EPOCH  REPLICAS  LOG-START-OFFSET  HIGH-WATERMARK
0          0       1      [0]       0                 0

Also possible after retention kicks in

PARTITIONS
==========
PARTITION  LEADER  EPOCH  REPLICAS  LOG-START-OFFSET  HIGH-WATERMARK
0          0       1      [0]       1                 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can replace this function with a ListEndOffsets call right?

That's what I wanted to do... However, ListEndOffsets returns the offset after the offset of the last message (which I denoted as "last offset" - not sure what's a better name for it) in the topic. Here's an experiment:

> docker run --rm -it --name=source -p 8081:8081 -p 9092:9092 -p 9644:9644 redpandadata/redpanda redpanda start --node-id 0 --mode dev-container --set "rpk.additional_start_flags=[--reactor-backend=epoll]" --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr host.docker.internal:9092 --schema-registry-addr 0.0.0.0:8081
$ rpk topic create test -p1 -X brokers=localhost:9092
TOPIC  STATUS
test   OK
$ echo foobar | rpk topic produce test -X brokers=localhost:9092
Produced to partition 0 at offset 0 with timestamp 1739834099974.

getLastRecordOffset():

Offset:  0

ListEndOffsets() / ListCommittedOffsets():

Offset_end:  1
Offset_comm:  1
Click this to reveal the messy code if you wish to play with it:

getLastRecordOffset():

func main() {
	seeds := []string{"localhost:9092"}
	consumeStart := map[string]map[int32]kgo.Offset{
		"test": {
			// The default offset begins at the end.
			0: kgo.NewOffset().Relative(-1),
		},
	}

	for {
		client, err := kgo.NewClient([]kgo.Opt{kgo.SeedBrokers(seeds...), kgo.ConsumePartitions(consumeStart)}...)
		if err != nil {
			log.Fatalf("failed to create Kafka client: %s", err)
		}

		ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
		fetches := client.PollFetches(ctx)
		if fetches.IsClientClosed() {
			log.Fatalf("failed to read message: client closed")
		}

		if err := fetches.Err(); err != nil {
			log.Fatalf("failed to read message: %s", err)
		}

		it := fetches.RecordIter()
		if it.Done() {
			log.Fatalf("couldn't find message: %s", err)
		}

		rec := it.Next()

		fmt.Println("Offset: ", rec.Offset)
		fmt.Println("Timestamp: ", rec.Timestamp)

		client.Close()

		time.Sleep(200 * time.Millisecond)
	}
}

ListEndOffsets() / ListCommittedOffsets():

func main() {
	seeds := []string{"localhost:9092"}

	client, err := kgo.NewClient([]kgo.Opt{kgo.SeedBrokers(seeds...)}...)
	if err != nil {
		log.Fatalf("failed to create Kafka client: %s", err)
	}
	defer client.Close()
	adm := kadm.NewClient(client)
	for {

		offsets, err := adm.ListEndOffsets(context.Background(), "test")
		if err != nil {
			log.Fatalf("failed to read offsets: %s", err)
		}

		o, ok := offsets.Lookup("test", 0)
		if !ok {
			log.Fatalf("failed to find offset for test-0")
		}

		fmt.Println("Offset_end: ", o.Offset)
		fmt.Println("Timestamp_end: ", o.Timestamp)

		offsets, err = adm.ListCommittedOffsets(context.Background(), "test")
		if err != nil {
			log.Fatalf("failed to read offsets: %s", err)
		}

		o, ok = offsets.Lookup("test", 0)
		if !ok {
			log.Fatalf("failed to find offset for test-0")
		}

		fmt.Println("Offset_comm: ", o.Offset)
		fmt.Println("Timestamp_comm: ", o.Timestamp)

		time.Sleep(200 * time.Millisecond)
	}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I wanted to do... However, ListEndOffsets returns the offset after the offset of the last message (which I denoted as "last offset" - not sure what's a better name for it) in the topic. Here's an experiment:

Yeah I think we just do end offset (known as High Watermark or HWM) minus one.

@rockwotj rockwotj self-requested a review February 18, 2025 20:46
- Field `is_last_stable_offset` added to the `redpanda_migrator_offsets` output.
- Metadata field `is_last_stable_offset` added to the `redpanda_migrator_offsets` input.
- Fixed a bug with the `redpanda_migrator_offsets` input and output where the consumer group update migration logic based on timestamp lookup should no longer skip ahead in the destination cluster. This should enforce at-least-once delivery guarantees.
The `redpanda_migrator_bundle` output no longer drops messages if either the `redpanda_migrator` or the `redpanda_migrator_offsets` child output throws an error. Connect will keep retrying to write the messages and apply backpressure to the input.

Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
@mihaitodor mihaitodor force-pushed the mihaitodor-fix-migrator-cg-offset-translation-logic branch from 1d6ed07 to 742f029 Compare February 18, 2025 21:10
Signed-off-by: Mihai Todor <todormihai@gmail.com>
@mihaitodor mihaitodor force-pushed the mihaitodor-fix-migrator-cg-offset-translation-logic branch from 93053fa to e8b2097 Compare February 19, 2025 02:32
Copy link
Collaborator

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one question about retries and then I think it LGTM

Comment on lines +709 to +711
// TODO: Since `read_until` can't guarantee that we will read `count` messages, `topic` needs to have exactly `count`
// messages in it. We should add some mechanism to the Kafka inputs to allow us to read a range of offsets if possible
// (or up to a certain offset).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more about when we commit not what we read? So do we ack the messages after check returns true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we ack the messages after check returns true?

Yep, if I use read_until there, instead of reading 5 messages out of 10, it reads about 6-7... read_until should tell the child input to shut down at that point, but it's not immediate, so I think it gets a bit of time to ack before kafka_franz tears down the franz-go client. I'm not entirely sure how it's wired under the hood, but I recall bumping into issues with read_until before. For example, this generates 10 messages:

input:
  read_until:
    check: counter() == 2
    input:
      generate:
        count: 20
        batch_size: 5
        interval: 0s
        mapping: root = counter()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was reading the code for read_until and was surprised that check only runs on the first message of every batch and the whole batch is still passed through..

Thank you Tyler for the suggestion! <3

Signed-off-by: Mihai Todor <todormihai@gmail.com>
@mihaitodor mihaitodor force-pushed the mihaitodor-fix-migrator-cg-offset-translation-logic branch from c13626d to 71d120e Compare February 19, 2025 23:56
@mihaitodor mihaitodor requested a review from rockwotj February 19, 2025 23:57
@mihaitodor
Copy link
Collaborator Author

I did the cleanup & I'll merge it later tomorrow in case you think of other stuff I should cover in there. Thank you again!

@mihaitodor mihaitodor merged commit b498a31 into main Feb 20, 2025
4 checks passed
@mihaitodor mihaitodor deleted the mihaitodor-fix-migrator-cg-offset-translation-logic branch February 20, 2025 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants