Skip to content

Fix race in log (#16490) #16505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 21, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions modules/log/event.go
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ type MultiChannelledLog struct {
name string
bufferLength int64
queue chan *Event
mutex sync.Mutex
rwmutex sync.RWMutex
loggers map[string]EventLogger
flush chan bool
close chan bool
@@ -173,10 +173,10 @@ func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog

// AddLogger adds a logger to this MultiChannelledLog
func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
m.mutex.Lock()
m.rwmutex.Lock()
name := logger.GetName()
if _, has := m.loggers[name]; has {
m.mutex.Unlock()
m.rwmutex.Unlock()
return ErrDuplicateName{name}
}
m.loggers[name] = logger
@@ -186,7 +186,7 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
if logger.GetStacktraceLevel() < m.stacktraceLevel {
m.stacktraceLevel = logger.GetStacktraceLevel()
}
m.mutex.Unlock()
m.rwmutex.Unlock()
go m.Start()
return nil
}
@@ -195,31 +195,31 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
// NB: If you delete the last sublogger this logger will simply drop
// log events
func (m *MultiChannelledLog) DelLogger(name string) bool {
m.mutex.Lock()
m.rwmutex.Lock()
logger, has := m.loggers[name]
if !has {
m.mutex.Unlock()
m.rwmutex.Unlock()
return false
}
delete(m.loggers, name)
m.internalResetLevel()
m.mutex.Unlock()
m.rwmutex.Unlock()
logger.Flush()
logger.Close()
return true
}

// GetEventLogger returns a sub logger from this MultiChannelledLog
func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.loggers[name]
}

// GetEventLoggerNames returns a list of names
func (m *MultiChannelledLog) GetEventLoggerNames() []string {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
var keys []string
for k := range m.loggers {
keys = append(keys, k)
@@ -228,12 +228,12 @@ func (m *MultiChannelledLog) GetEventLoggerNames() []string {
}

func (m *MultiChannelledLog) closeLoggers() {
m.mutex.Lock()
m.rwmutex.Lock()
for _, logger := range m.loggers {
logger.Flush()
logger.Close()
}
m.mutex.Unlock()
m.rwmutex.Unlock()
m.closed <- true
}

@@ -249,8 +249,8 @@ func (m *MultiChannelledLog) Resume() {

// ReleaseReopen causes this logger to tell its subloggers to release and reopen
func (m *MultiChannelledLog) ReleaseReopen() error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
var accumulatedErr error
for _, logger := range m.loggers {
if err := logger.ReleaseReopen(); err != nil {
@@ -266,13 +266,13 @@ func (m *MultiChannelledLog) ReleaseReopen() error {

// Start processing the MultiChannelledLog
func (m *MultiChannelledLog) Start() {
m.mutex.Lock()
m.rwmutex.Lock()
if m.started {
m.mutex.Unlock()
m.rwmutex.Unlock()
return
}
m.started = true
m.mutex.Unlock()
m.rwmutex.Unlock()
paused := false
for {
if paused {
@@ -286,11 +286,11 @@ func (m *MultiChannelledLog) Start() {
m.closeLoggers()
return
}
m.mutex.Lock()
m.rwmutex.RLock()
for _, logger := range m.loggers {
logger.Flush()
}
m.mutex.Unlock()
m.rwmutex.RUnlock()
case <-m.close:
m.closeLoggers()
return
@@ -307,24 +307,24 @@ func (m *MultiChannelledLog) Start() {
m.closeLoggers()
return
}
m.mutex.Lock()
m.rwmutex.RLock()
for _, logger := range m.loggers {
err := logger.LogEvent(event)
if err != nil {
fmt.Println(err)
}
}
m.mutex.Unlock()
m.rwmutex.RUnlock()
case _, ok := <-m.flush:
if !ok {
m.closeLoggers()
return
}
m.mutex.Lock()
m.rwmutex.RLock()
for _, logger := range m.loggers {
logger.Flush()
}
m.mutex.Unlock()
m.rwmutex.RUnlock()
case <-m.close:
m.closeLoggers()
return
@@ -359,11 +359,15 @@ func (m *MultiChannelledLog) Flush() {

// GetLevel gets the level of this MultiChannelledLog
func (m *MultiChannelledLog) GetLevel() Level {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.level
}

// GetStacktraceLevel gets the level of this MultiChannelledLog
func (m *MultiChannelledLog) GetStacktraceLevel() Level {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.stacktraceLevel
}

@@ -384,8 +388,8 @@ func (m *MultiChannelledLog) internalResetLevel() Level {

// ResetLevel will reset the level of this MultiChannelledLog
func (m *MultiChannelledLog) ResetLevel() Level {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
return m.internalResetLevel()
}

1 change: 0 additions & 1 deletion modules/queue/queue_disk_channel_test.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ import (
func TestPersistableChannelQueue(t *testing.T) {
handleChan := make(chan *testData)
handle := func(data ...Data) {
assert.True(t, len(data) == 2)
for _, datum := range data {
testDatum := datum.(*testData)
handleChan <- testDatum