Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/notification/manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/****************************************************************************
* Copyright 2019, Optimizely, Inc. and contributors *
* Copyright 2019-2020, Optimizely, Inc. and contributors *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
Expand All @@ -19,6 +19,7 @@ package notification

import (
"fmt"
"sync"
"sync/atomic"

"github.com/optimizely/go-sdk/pkg/logging"
Expand All @@ -37,6 +38,7 @@ type Manager interface {
type AtomicManager struct {
handlers map[uint32]func(interface{})
counter uint32
lock sync.RWMutex
}

// NewAtomicManager creates a new instance of the atomic manager
Expand All @@ -48,13 +50,19 @@ func NewAtomicManager() *AtomicManager {

// Add adds the given handler
func (am *AtomicManager) Add(newHandler func(interface{})) (int, error) {
am.lock.Lock()
defer am.lock.Unlock()

atomic.AddUint32(&am.counter, 1)
am.handlers[am.counter] = newHandler
return int(am.counter), nil
}

// Remove removes handler with the given id
func (am *AtomicManager) Remove(id int) {
am.lock.Lock()
defer am.lock.Unlock()

handlerID := uint32(id)
if _, ok := am.handlers[handlerID]; ok {
delete(am.handlers, handlerID)
Expand All @@ -66,7 +74,19 @@ func (am *AtomicManager) Remove(id int) {

// Send sends the notification to the registered handlers
func (am *AtomicManager) Send(notification interface{}) {
for _, handler := range am.handlers {
// copying handler to avoid race condition
handlers := am.copyHandlers()
for _, handler := range handlers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of copying everything at once, can we just do handler := handler inside the for loop ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried didn't work.

handler(notification)
}
}

// Return a copy of the given handlers
func (am *AtomicManager) copyHandlers() (handlers []func(interface{})) {
am.lock.RLock()
defer am.lock.RUnlock()
for _, v := range am.handlers {
handlers = append(handlers, v)
}
return handlers
}
53 changes: 53 additions & 0 deletions pkg/notification/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,56 @@ func TestAtomicManager(t *testing.T) {
// Sanity check by calling remove with a incorrect handler id
atomicManager.Remove(55)
}

func TestSendRaceCondition(t *testing.T) {
sync := make(chan interface{})
payload := map[string]interface{}{
"key": "test",
}
atomicManager := NewAtomicManager()
result1, result2 := 0, 0
listenerCalled := false

listener1 := func(interface{}) {
}

listener2 := func(interface{}) {
// Add listener2 internally to assert deadlock
result2, _ = atomicManager.Add(listener1)
// Remove all added listeners
atomicManager.Remove(result1)
atomicManager.Remove(result2)
listenerCalled = true
}
result1, _ = atomicManager.Add(listener2)

go func() {
atomicManager.Send(payload)
// notifying that notification is sent.
sync <- ""
}()

atomicManager.Add(listener1)
<-sync

assert.Equal(t, 1, result1)
assert.Equal(t, len(atomicManager.handlers), 1)
assert.Equal(t, true, listenerCalled)
}

func TestAddRaceCondition(t *testing.T) {
sync := make(chan interface{})
atomicManager := NewAtomicManager()

listener1 := func(interface{}) {

}
result1, _ := atomicManager.Add(listener1)
go func() {
atomicManager.Remove(result1)
sync <- ""
}()

<-sync
assert.Equal(t, len(atomicManager.handlers), 0)
}