Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #209 from bsm/feature/partitions-mark-offsets
Browse files Browse the repository at this point in the history
Expose offset methods on partition consumers
  • Loading branch information
dim authored Mar 7, 2018
2 parents e8d2b59 + 60c9f44 commit af1a350
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 86 deletions.
4 changes: 0 additions & 4 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ func (r *balancer) Topic(name string, memberID string) error {
}

func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
if r == nil {
return nil
}

res := make(map[string]map[string][]int32, 1)
for topic, info := range r.topics {
for memberID, partitions := range info.Perform(s) {
Expand Down
48 changes: 22 additions & 26 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,16 @@ func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consum
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
sub := c.subs.Fetch(msg.Topic, msg.Partition)
if sub != nil {
sub.MarkOffset(msg.Offset+1, metadata)
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
sub.MarkOffset(msg.Offset, metadata)
}
}

// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.MarkOffset(offset+1, metadata)
if sub := c.subs.Fetch(topic, partition); sub != nil {
sub.MarkOffset(offset, metadata)
}
}

Expand All @@ -162,9 +160,8 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
defer s.mu.Unlock()

for tp, info := range s.offsets {
sub := c.subs.Fetch(tp.Topic, tp.Partition)
if sub != nil {
sub.MarkOffset(info.Offset+1, info.Metadata)
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
sub.MarkOffset(info.Offset, info.Metadata)
}
delete(s.offsets, tp)
}
Expand All @@ -177,9 +174,8 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
sub := c.subs.Fetch(msg.Topic, msg.Partition)
if sub != nil {
sub.ResetOffset(msg.Offset+1, metadata)
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
sub.ResetOffset(msg.Offset, metadata)
}
}

Expand All @@ -188,7 +184,7 @@ func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.ResetOffset(offset+1, metadata)
sub.ResetOffset(offset, metadata)
}
}

Expand All @@ -199,9 +195,8 @@ func (c *Consumer) ResetOffsets(s *OffsetStash) {
defer s.mu.Unlock()

for tp, info := range s.offsets {
sub := c.subs.Fetch(tp.Topic, tp.Partition)
if sub != nil {
sub.ResetOffset(info.Offset+1, info.Metadata)
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
sub.ResetOffset(info.Offset, info.Metadata)
}
delete(s.offsets, tp)
}
Expand Down Expand Up @@ -264,9 +259,8 @@ func (c *Consumer) CommitOffsets() error {
if kerr != sarama.ErrNoError {
err = kerr
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.MarkCommitted(state.Info.Offset)
if sub := c.subs.Fetch(topic, partition); sub != nil {
sub.markCommitted(state.Info.Offset)
}
}
}
Expand Down Expand Up @@ -706,11 +700,13 @@ func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
GenerationId: generationID,
}

for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
Topics: topics,
}); err != nil {
return nil, err
if strategy != nil {
for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
Topics: topics,
}); err != nil {
return nil, err
}
}
}

Expand Down Expand Up @@ -828,9 +824,9 @@ func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32,
// Start partition consumer goroutine
tomb.Go(func(stopper <-chan none) {
if c.client.config.Group.Mode == ConsumerModePartitions {
pc.WaitFor(stopper, c.errors)
pc.waitFor(stopper, c.errors)
} else {
pc.Multiplex(stopper, c.messages, c.errors)
pc.multiplex(stopper, c.messages, c.errors)
}
})

Expand Down
42 changes: 17 additions & 25 deletions partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type PartitionConsumer interface {

// Partition returns the consumed partition
Partition() int32

// MarkOffset marks the offset of a message as preocessed.
MarkOffset(offset int64, metadata string)

// ResetOffset resets the offset to a previously processed message.
ResetOffset(offset int64, metadata string)
}

type partitionConsumer struct {
Expand Down Expand Up @@ -81,7 +87,7 @@ func (c *partitionConsumer) Close() error {
return c.closeErr
}

func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) {
func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
defer close(c.dead)

for {
Expand All @@ -105,7 +111,7 @@ func (c *partitionConsumer) WaitFor(stopper <-chan none, errors chan<- error) {
}
}

func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer close(c.dead)

for {
Expand Down Expand Up @@ -140,52 +146,38 @@ func (c *partitionConsumer) Multiplex(stopper <-chan none, messages chan<- *sara
}
}

func (c *partitionConsumer) State() partitionState {
if c == nil {
return partitionState{}
}

func (c *partitionConsumer) getState() partitionState {
c.mu.Lock()
state := c.state
c.mu.Unlock()

return state
}

func (c *partitionConsumer) MarkCommitted(offset int64) {
if c == nil {
return
}

func (c *partitionConsumer) markCommitted(offset int64) {
c.mu.Lock()
if offset == c.state.Info.Offset {
c.state.Dirty = false
}
c.mu.Unlock()
}

// MarkOffset implements PartitionConsumer
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
if c == nil {
return
}

c.mu.Lock()
if offset > c.state.Info.Offset {
c.state.Info.Offset = offset
if next := offset + 1; next > c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}

// ResetOffset implements PartitionConsumer
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
if c == nil {
return
}

c.mu.Lock()
if offset <= c.state.Info.Offset {
c.state.Info.Offset = offset
if next := offset + 1; next <= c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
Expand Down Expand Up @@ -244,7 +236,7 @@ func (m *partitionMap) Snapshot() map[topicPartition]partitionState {

snap := make(map[topicPartition]partitionState, len(m.data))
for tp, pc := range m.data {
snap[tp] = pc.State()
snap[tp] = pc.getState()
}
return snap
}
Expand Down
53 changes: 22 additions & 31 deletions partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var _ = Describe("partitionConsumer", func() {
})

It("should set state", func() {
Expect(subject.State()).To(Equal(partitionState{
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2000, "m3ta"},
}))
})
Expand All @@ -32,67 +32,58 @@ var _ = Describe("partitionConsumer", func() {
defer pc.Close()
close(pc.dead)

state := pc.State()
state := pc.getState()
Expect(state.Info.Offset).To(Equal(int64(-1)))
Expect(state.Info.Metadata).To(Equal("m3ta"))
})

It("should update state", func() {
subject.MarkOffset(2001, "met@") // should set state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
Dirty: true,
}))

subject.MarkCommitted(2001) // should reset dirty status
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
subject.markCommitted(2002) // should reset dirty status
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
}))

subject.MarkOffset(2001, "me7a") // should not update state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
}))

subject.MarkOffset(2002, "me7a") // should bump state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2003, "me7a"},
Dirty: true,
}))

// After committing a later offset, try rewinding back to earlier offset with new metadata.
subject.ResetOffset(2001, "met@")
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
Dirty: true,
}))

subject.MarkCommitted(2001) // should not unset state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2001, "met@"},
subject.markCommitted(2002) // should not unset state
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2002, "met@"},
}))

subject.MarkOffset(2002, "me7a") // should bump state
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2003, "me7a"},
Dirty: true,
}))

subject.MarkCommitted(2002)
Expect(subject.State()).To(Equal(partitionState{
Info: offsetInfo{2002, "me7a"},
subject.markCommitted(2003)
Expect(subject.getState()).To(Equal(partitionState{
Info: offsetInfo{2003, "me7a"},
}))
})

It("should not fail when nil", func() {
blank := (*partitionConsumer)(nil)
Expect(func() {
_ = blank.State()
blank.MarkOffset(2001, "met@")
blank.MarkCommitted(2001)
}).NotTo(Panic())
})

})

var _ = Describe("partitionMap", func() {
Expand Down Expand Up @@ -135,7 +126,7 @@ var _ = Describe("partitionMap", func() {

subject.Store("topic", 0, pc0)
subject.Store("topic", 1, pc1)
subject.Fetch("topic", 1).MarkOffset(2001, "met@")
subject.Fetch("topic", 1).MarkOffset(2000, "met@")

Expect(subject.Snapshot()).To(Equal(map[topicPartition]partitionState{
{"topic", 0}: {Info: offsetInfo{2000, "m3ta"}, Dirty: false},
Expand Down

0 comments on commit af1a350

Please sign in to comment.