Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Changes from PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-sap committed Jun 29, 2021
1 parent 0ef08f1 commit f487b46
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 44 deletions.
111 changes: 73 additions & 38 deletions pkg/common/consumer/consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,10 @@ func (m *kafkaConsumerGroupManagerImpl) StartConsumerGroup(groupId string, topic
}

ctx, cancel := context.WithCancel(context.Background())
// Add the Sarama ConsumerGroup we obtained from the factory to the managed group map,
// so that it can be stopped and started via control-protocol messages.
m.groupLock.Lock()
m.groups[groupId] = createManagedGroup(m.logger, group, cancel)
m.groupLock.Unlock()
managedGrp := createManagedGroup(m.logger, group, cancel)

// Begin listening on the group's Errors() channel and write them to the managedGroup's errors channel
m.groups[groupId].transferErrors(ctx)
managedGrp.transferErrors(ctx)

// consume is passed in to the KafkaConsumerGroupFactory so that it will call the manager's
// consume() function instead of the one on the internal sarama ConsumerGroup. This allows the
Expand All @@ -190,7 +186,11 @@ func (m *kafkaConsumerGroupManagerImpl) StartConsumerGroup(groupId string, topic

// The only thing we really want from the factory is the cancel function for the customConsumerGroup
customGroup := m.factory.startExistingConsumerGroup(group, consume, topics, logger, handler, options...)
m.groups[groupId].cancelConsume = customGroup.cancel
managedGrp.cancelConsume = customGroup.cancel

// Add the Sarama ConsumerGroup we obtained from the factory to the managed group map,
// so that it can be stopped and started via control-protocol messages.
m.setGroup(groupId, managedGrp)
return nil
}

Expand Down Expand Up @@ -223,9 +223,7 @@ func (m *kafkaConsumerGroupManagerImpl) CloseConsumerGroup(groupId string) error
}

// Remove this groupId from the map so that manager functions may not be called on it
m.groupLock.Lock()
delete(m.groups, groupId)
m.groupLock.Unlock()
m.removeGroup(groupId)

return nil
}
Expand All @@ -242,12 +240,7 @@ func (m *kafkaConsumerGroupManagerImpl) Errors(groupId string) <-chan error {

// IsManaged returns true if the given groupId corresponds to a managed ConsumerGroup
func (m *kafkaConsumerGroupManagerImpl) IsManaged(groupId string) bool {
m.groupLock.RLock()
defer m.groupLock.RUnlock()
if _, ok := m.groups[groupId]; ok {
return true
}
return false
return m.getGroup(groupId) != nil
}

// Consume calls the Consume method of a managed consumer group, using a loop to call it again if that
Expand Down Expand Up @@ -279,11 +272,14 @@ func (m *kafkaConsumerGroupManagerImpl) consume(ctx context.Context, groupId str
// stopConsumerGroups closes the managed ConsumerGroup identified by the provided groupId, and marks it
// as "stopped" (that is, "able to be restarted" as opposed to being closed by something outside the manager)
func (m *kafkaConsumerGroupManagerImpl) stopConsumerGroup(lock *commands.CommandLock, groupId string) error {
if err := m.processLock(lock, groupId, true); err != nil {
groupLogger := m.logger.With(zap.String("GroupId", groupId))

// Lock the managedGroup before stopping it, if lock.LockBefore is true
if err := m.lockBefore(lock, groupId); err != nil {
groupLogger.Error("Failed to lock consumer group prior to stopping", zap.Error(err))
return err
}

groupLogger := m.logger.With(zap.String("GroupId", groupId))
groupLogger.Info("Stopping Managed ConsumerGroup")

managedGrp := m.getGroup(groupId)
Expand All @@ -306,16 +302,24 @@ func (m *kafkaConsumerGroupManagerImpl) stopConsumerGroup(lock *commands.Command
return err
}

return m.processLock(lock, groupId, false)
// Unlock the managedGroup after stopping it, if lock.UnlockAfter is true
if err = m.unlockAfter(lock, groupId); err != nil {
groupLogger.Error("Failed to unlock consumer group after stopping", zap.Error(err))
return err
}
return nil
}

// startConsumerGroups creates a new Consumer Group based on the groupId provided
func (m *kafkaConsumerGroupManagerImpl) startConsumerGroup(lock *commands.CommandLock, groupId string) error {
if err := m.processLock(lock, groupId, true); err != nil {
groupLogger := m.logger.With(zap.String("GroupId", groupId))

// Lock the managedGroup before starting it, if lock.LockBefore is true
if err := m.lockBefore(lock, groupId); err != nil {
groupLogger.Error("Failed to lock consumer group prior to starting", zap.Error(err))
return err
}

groupLogger := m.logger.With(zap.String("GroupId", groupId))
groupLogger.Info("Starting Managed ConsumerGroup")
if !m.IsManaged(groupId) {
groupLogger.Info("ConsumerGroup Not Managed - Ignoring Start Request")
Expand All @@ -328,12 +332,16 @@ func (m *kafkaConsumerGroupManagerImpl) startConsumerGroup(lock *commands.Comman
return err
}

m.groupLock.Lock()
m.groups[groupId].group = group
m.groups[groupId].closeRestartChannel() // Closing this allows the waitForStart function to finish
m.groupLock.Unlock()
managedGrp := m.getGroup(groupId)
managedGrp.group = group
managedGrp.closeRestartChannel() // Closing this allows the waitForStart function to finish

return m.processLock(lock, groupId, false)
// Unlock the managedGroup after starting it, if lock.UnlockAfter is true
if err = m.unlockAfter(lock, groupId); err != nil {
groupLogger.Error("Failed to unlock consumer group after starting", zap.Error(err))
return err
}
return nil
}

// getGroup returns a group from the groups map using the groupLock mutex
Expand All @@ -343,15 +351,38 @@ func (m *kafkaConsumerGroupManagerImpl) getGroup(groupId string) *managedGroup {
return m.groups[groupId]
}

// getGroup associates a group with a groupId in the groups map using the groupLock mutex
func (m *kafkaConsumerGroupManagerImpl) setGroup(groupId string, group *managedGroup) {
m.groupLock.Lock()
m.groups[groupId] = group
m.groupLock.Unlock()
}

// getGroup removes a group from the groups map by groupId, using the groupLock mutex
func (m *kafkaConsumerGroupManagerImpl) removeGroup(groupId string) {
m.groupLock.Lock()
delete(m.groups, groupId)
m.groupLock.Unlock()
}

// lockBefore will lock the managedGroup corresponding to the groupId, if lock.LockBefore is true
func (m *kafkaConsumerGroupManagerImpl) lockBefore(lock *commands.CommandLock, groupId string) error {
return m.processLock(lock, groupId, true)
}

// unlockAfter will unlock the managedGroup corresponding to the groupId, if lock.UnlockAfter is true
func (m *kafkaConsumerGroupManagerImpl) unlockAfter(lock *commands.CommandLock, groupId string) error {
return m.processLock(lock, groupId, false)
}

// processLock handles setting and removing the managedGroup's lock status.
// For the managedGroup with the given groupId, if that group exists and the provided CommandLock fields warrant it,
// this function will:
// - if "before" is true, set the lockedBy field of the group
// - if "before" is false, remove the lockedBy field of the group
// For the managedGroup with the given groupId, if that group exists, this function will:
// - Lock the group, if "lock" is true and cmdLock.LockBefore is true
// - Unlock the group, if "lock" is false and cmdLock.UnlockAfter is true
// Returns an error if the lock.Token field doesn't match an existing token in the managedGroup (for either case), as
// this indicates that the group is locked by a different sender
func (m *kafkaConsumerGroupManagerImpl) processLock(lock *commands.CommandLock, groupId string, before bool) error {
if lock == nil {
func (m *kafkaConsumerGroupManagerImpl) processLock(cmdLock *commands.CommandLock, groupId string, lock bool) error {
if cmdLock == nil {
return nil // No lock processing was requested
}

Expand All @@ -360,19 +391,23 @@ func (m *kafkaConsumerGroupManagerImpl) processLock(lock *commands.CommandLock,
return nil // Can't lock a nonexistent group
}

if !group.canUnlock(lock.Token) {
if (lock && !cmdLock.LockBefore) || (!lock && !cmdLock.UnlockAfter) {
return nil // If neither a lock nor unlock were requested, no need to go any further
}

if !group.canUnlock(cmdLock.Token) {
m.logger.Info("Managed group access denied; already locked with a different token",
zap.String("Token", lock.Token), zap.String("GroupId", groupId), zap.Bool("Before", before))
zap.String("Token", cmdLock.Token), zap.String("GroupId", groupId))
return GroupLockedError // Already locked by a different command token
}

if before && lock.LockBefore {
timeout := lock.Timeout
if lock && cmdLock.LockBefore {
timeout := cmdLock.Timeout
if timeout == 0 {
timeout = defaultLockTimeout
}
group.resetLockTimer(lock.Token, timeout)
} else if !before && lock.UnlockAfter {
group.resetLock(cmdLock.Token, timeout)
} else if cmdLock.UnlockAfter {
group.removeLock()
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/common/consumer/consumer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,25 @@ func TestNotifications(t *testing.T) {
lock: &commands.CommandLock{LockBefore: true, Token: "token"},
lockFail: true,
},
{
name: "Start Group OpCode, Unlock Failure",
opcode: commands.StartConsumerGroupOpCode,
groupId: "test-group-id",
version: 1,
lock: &commands.CommandLock{UnlockAfter: true, Token: "token"},
initialStop: true,
lockFail: true,
},
{
name: "Stop Group OpCode, Unlock Failure",
opcode: commands.StopConsumerGroupOpCode,
groupId: "test-group-id",
version: 1,
lock: &commands.CommandLock{UnlockAfter: true, Token: "token"},
expectClose: true,
lockFail: true,
expectStop: true,
},
{
name: "Start Group OpCode, Lock Success",
opcode: commands.StartConsumerGroupOpCode,
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/consumer/managed_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func createManagedGroup(logger *zap.Logger, group sarama.ConsumerGroup, cancel f
return &managedGrp
}

// resetLockTimer will set the "lockedBy" field to the given lockToken (need not be the same as the
// resetLock will set the "lockedBy" field to the given lockToken (need not be the same as the
// existing one) and reset the timer to the provided timeout. After the timer expires, the lockToken
// will be set to an empty string (representing "unlocked")
func (m *managedGroup) resetLockTimer(lockToken string, timeout time.Duration) {
func (m *managedGroup) resetLock(lockToken string, timeout time.Duration) {

// Stop any existing timer (without releasing the lock) so that it won't inadvertently do an unlock later
if m.cancelLockTimeout != nil {
Expand All @@ -71,7 +71,7 @@ func (m *managedGroup) resetLockTimer(lockToken string, timeout time.Duration) {
if lockToken != "" && timeout != 0 {
// Use the provided lockToken, which will prevent any caller with a different token from executing commands
lockTimer := time.NewTimer(timeout)
// Create a cancel function that will be used by further calls to resetLockTimer, or to removeLock(),
// Create a cancel function that will be used by further calls to resetLock, or to removeLock(),
// for the purpose of stopping the lockTimer without clearing the token
ctx, cancel := context.WithCancel(context.Background())
m.cancelLockTimeout = cancel
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/consumer/managed_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ func TestResetLockTimer(t *testing.T) {
managedGrp.cancelLockTimeout = testCase.cancelTimer
}
if testCase.existingTimer {
managedGrp.resetLockTimer("existing-token", time.Second) // Long enough not to expire during the test
managedGrp.resetLock("existing-token", time.Second) // Long enough not to expire during the test
assert.False(t, managedGrp.canUnlock(testCase.token))
time.Sleep(time.Millisecond) // Let the timer loop start executing
} else if testCase.expiredTimer {
managedGrp.resetLockTimer("existing-token", time.Microsecond) // Expire the timer immediately
managedGrp.resetLock("existing-token", time.Microsecond) // Expire the timer immediately
time.Sleep(shortTimeout / 2)
} else {
assert.True(t, managedGrp.canUnlock(testCase.token))
Expand All @@ -162,7 +162,7 @@ func TestResetLockTimer(t *testing.T) {
managedGrp.lockedBy.Store(testCase.existingToken)
assert.Equal(t, testCase.existingToken == testCase.token, managedGrp.canUnlock(testCase.token))
}
managedGrp.resetLockTimer(testCase.token, testCase.timeout)
managedGrp.resetLock(testCase.token, testCase.timeout)
if testCase.existingTimer || testCase.expiredTimer {
time.Sleep(time.Millisecond) // Let the timer loop finish executing
}
Expand Down

0 comments on commit f487b46

Please sign in to comment.