Skip to content

Commit

Permalink
Merge pull request #303 from alicebob/fixrace
Browse files Browse the repository at this point in the history
fix a race in the stream code
  • Loading branch information
alicebob authored Nov 6, 2022
2 parents 1bcf722 + 08c2456 commit f3ff91a
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *streamKey) generateID(now time.Time) string {
next = fmt.Sprintf("%d-%d", last[0], last[1]+1)
}

lastID := s.lastID()
lastID := s.lastIDUnlocked()
if streamCmp(lastID, next) >= 0 {
last, _ := parseStreamID(lastID)
next = fmt.Sprintf("%d-%d", last[0], last[1]+1)
Expand All @@ -74,8 +74,16 @@ func (s *streamKey) generateID(now time.Time) string {
return next
}

// lastID doesn't lock the mutex
// lastID locks the mutex
func (s *streamKey) lastID() string {
s.mu.Lock()
defer s.mu.Unlock()

return s.lastIDUnlocked()
}

// lastID doesn't lock the mutex
func (s *streamKey) lastIDUnlocked() string {
if len(s.entries) == 0 {
return "0-0"
}
Expand Down Expand Up @@ -209,7 +217,7 @@ func (s *streamKey) createGroup(group, id string) error {
}

if id == "$" {
id = s.lastID()
id = s.lastIDUnlocked()
}
s.groups[group] = &streamGroup{
stream: s,
Expand Down Expand Up @@ -237,7 +245,7 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string,
if entryID == "0-0" {
return "", errors.New(msgStreamIDZero)
}
if streamCmp(s.lastID(), entryID) != -1 {
if streamCmp(s.lastIDUnlocked(), entryID) != -1 {
return "", errors.New(msgStreamIDTooSmall)
}

Expand Down

0 comments on commit f3ff91a

Please sign in to comment.