Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OffsetManager, ability to mark previous offset #554

Closed
yulrizka opened this issue Oct 20, 2015 · 6 comments
Closed

OffsetManager, ability to mark previous offset #554

yulrizka opened this issue Oct 20, 2015 · 6 comments

Comments

@yulrizka
Copy link

SHA: a23cf43

in https://github.com/Shopify/sarama/blob/master/offset_manager.go#L321-L325
Offset manager only commit the offset if it's larger than current offset.

I'm having a problem in this one particular use-case where my program accept an option to read kafka from beginning. If for example current recorded offset is 10000 and I start from the beginning but stop at 500, next time I run the process again it will start at 10000 instead.

What do you think if we could add an extra function / parameter to skip this check ?

@wvanbergen
Copy link
Contributor

Interesting. It's a legitimate use case to "reset" the offset, but that currently cannot be done with the offset manager. You well have to hand-craft a CommitOffset request to do this (for an example, see here).

I am not necessarily against removing the if statement. Maybe adding a ResetOffset method would be a better idea, to make it explicit that you're trying to commit a lower offset? Unsure.

@hasnickl
Copy link

another option is a SetOffset method to make it explicit that you're committing an arbitrary offset, instead of doing ResetOffset followed by a MarkOffset

@amitech74
Copy link

amitech74 commented Oct 4, 2016

Can someone confirm this - In order to "hand-craft a CommitOffset request" (to reset offset), it needs access to the latest generationID, without which resetting an offset doesn't seem to work, if consumers are running already. And the generationID is not exposed presently.

@Xaelias
Copy link

Xaelias commented Apr 21, 2017

Hi guys. I'm trying to do the same thing.

I tried to remove the if just to test, and the program just blocked...
So now I'm trying to manually commit the offset but it doesn't look like it's working reliably either.

Here is an example of what I'm trying to achieve:

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"os"
)

func main() {
	config := sarama.NewConfig()
	config.ClientID, _ = os.Hostname()
	config.Version = sarama.V0_10_1_0
	kafkaClient, _ := sarama.NewClient([]string{"127.0.0.1:9092"}, config)

	request := &sarama.OffsetCommitRequest{}
	request.ConsumerGroup = "blah"
	request.ConsumerGroupGeneration = sarama.GroupGenerationUndefined
	request.RetentionTime = 1000 // not sure what to put here
	request.Version = 2
	request.AddBlock("blah", 0, 5, 0, "Reseting offset to 5")
	broker, _ := kafkaClient.Leader("blah", 0)
	response, _ := broker.CommitOffset(request)
	fmt.Println(response)
	broker.Close()
	kafkaClient.Close()
}

And I end up with this error:

&{map[blah:map[0:kafka server: The provided member is not known in the current generation.]]}

Is there any way to commit a completely arbitrary offset without having to create a consumer?
Thanks!

PS: And I agree that ultimately, the PartitionManager should be able to handle that case. Plus, it's really not clear that committing an offset that's earlier that the current one doesn't do anything if you don't look at the code. A log message would be welcome I think :-)

@fgeller
Copy link
Contributor

fgeller commented Aug 18, 2017

Would gladly submit a PR to work on a solution for this. The two that I can think of are:

  1. Dropping the guard that makes sure that we only increment offsets.
  2. Adding another func on the partition offset manager to allow for resetting to lower values.

Would you be interested in either or another solution that you can think of?

PS. sorry for the noise my references cause.

@eapache
Copy link
Contributor

eapache commented Aug 18, 2017

I would be happy to take a PR for a separate function; I agree with Willem that it's better to be explicit in this kind of case.

I thought there was some sort of reason I hadn't looked at this, that it was more complicated than it seemed, but honestly I can't find anything too scary-looking here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants