Skip to content

Commit

Permalink
add context to avoid locks
Browse files Browse the repository at this point in the history
  • Loading branch information
mlsmaycon committed Dec 8, 2024
1 parent e61728f commit 64c0590
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
4 changes: 2 additions & 2 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
func (conn *Conn) Open() {
conn.semaphore.Add()
conn.semaphore.Add(conn.ctx)
conn.log.Debugf("open connection to peer")

conn.mu.Lock()
Expand All @@ -195,7 +195,7 @@ func (conn *Conn) Open() {
}

func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
defer conn.semaphore.Done()
defer conn.semaphore.Done(conn.ctx)
conn.waitInitialRandomSleepTime(ctx)

err := conn.handshaker.sendOffer()
Expand Down
17 changes: 13 additions & 4 deletions util/semaphore-group/semaphore_group.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package semaphoregroup

import (
"context"
"sync"
)

Expand All @@ -18,19 +19,27 @@ func NewSemaphoreGroup(limit int) *SemaphoreGroup {
}

// Add increments the internal WaitGroup counter and acquires a semaphore slot.
func (sg *SemaphoreGroup) Add() {
func (sg *SemaphoreGroup) Add(ctx context.Context) {
sg.waitGroup.Add(1)

// Acquire semaphore slot
sg.semaphore <- struct{}{}
select {
case <-ctx.Done():
return
case sg.semaphore <- struct{}{}:
}
}

// Done decrements the internal WaitGroup counter and releases a semaphore slot.
func (sg *SemaphoreGroup) Done() {
func (sg *SemaphoreGroup) Done(ctx context.Context) {
sg.waitGroup.Done()

// Release semaphore slot
<-sg.semaphore
select {
case <-ctx.Done():
return
case <-sg.semaphore:
}
}

// Wait waits until the internal WaitGroup counter is zero.
Expand Down
37 changes: 35 additions & 2 deletions util/semaphore-group/semaphore_group_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package semaphoregroup

import (
"context"
"testing"
"time"
)
Expand All @@ -9,9 +10,9 @@ func TestSemaphoreGroup(t *testing.T) {
semGroup := NewSemaphoreGroup(2)

for i := 0; i < 5; i++ {
semGroup.Add()
semGroup.Add(context.Background())
go func(id int) {
defer semGroup.Done()
defer semGroup.Done(context.Background())

got := len(semGroup.semaphore)
if got == 0 {
Expand All @@ -31,3 +32,35 @@ func TestSemaphoreGroup(t *testing.T) {
t.Errorf("Expected semaphore length %d, got %d", want, got)
}
}

func TestSemaphoreGroupContext(t *testing.T) {
semGroup := NewSemaphoreGroup(1)
semGroup.Add(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)
rChan := make(chan struct{})

go func() {
semGroup.Add(ctx)
rChan <- struct{}{}
}()
select {
case <-rChan:
case <-time.NewTimer(2 * time.Second).C:
t.Error("Adding to semaphore group should not block when context is not done")
}

semGroup.Done(context.Background())

ctxDone, cancelDone := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancelDone)
go func() {
semGroup.Done(ctxDone)
rChan <- struct{}{}
}()
select {
case <-rChan:
case <-time.NewTimer(2 * time.Second).C:
t.Error("Releasing from semaphore group should not block when context is not done")
}
}

0 comments on commit 64c0590

Please sign in to comment.