From 64c05900ba012ac5d7c4b25593c553ab98436479 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Sun, 8 Dec 2024 10:47:11 +0100 Subject: [PATCH] add context to avoid locks --- client/internal/peer/conn.go | 4 +-- util/semaphore-group/semaphore_group.go | 17 ++++++--- util/semaphore-group/semaphore_group_test.go | 37 ++++++++++++++++++-- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 0c35d1aa810..5c2e2cb60b6 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -172,7 +172,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu // It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will // be used. func (conn *Conn) Open() { - conn.semaphore.Add() + conn.semaphore.Add(conn.ctx) conn.log.Debugf("open connection to peer") conn.mu.Lock() @@ -195,7 +195,7 @@ func (conn *Conn) Open() { } func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) { - defer conn.semaphore.Done() + defer conn.semaphore.Done(conn.ctx) conn.waitInitialRandomSleepTime(ctx) err := conn.handshaker.sendOffer() diff --git a/util/semaphore-group/semaphore_group.go b/util/semaphore-group/semaphore_group.go index d2dd559cd93..ad74e1bfc81 100644 --- a/util/semaphore-group/semaphore_group.go +++ b/util/semaphore-group/semaphore_group.go @@ -1,6 +1,7 @@ package semaphoregroup import ( + "context" "sync" ) @@ -18,19 +19,27 @@ func NewSemaphoreGroup(limit int) *SemaphoreGroup { } // Add increments the internal WaitGroup counter and acquires a semaphore slot. -func (sg *SemaphoreGroup) Add() { +func (sg *SemaphoreGroup) Add(ctx context.Context) { sg.waitGroup.Add(1) // Acquire semaphore slot - sg.semaphore <- struct{}{} + select { + case <-ctx.Done(): + return + case sg.semaphore <- struct{}{}: + } } // Done decrements the internal WaitGroup counter and releases a semaphore slot. -func (sg *SemaphoreGroup) Done() { +func (sg *SemaphoreGroup) Done(ctx context.Context) { sg.waitGroup.Done() // Release semaphore slot - <-sg.semaphore + select { + case <-ctx.Done(): + return + case <-sg.semaphore: + } } // Wait waits until the internal WaitGroup counter is zero. diff --git a/util/semaphore-group/semaphore_group_test.go b/util/semaphore-group/semaphore_group_test.go index 5731e987323..d4491cf772e 100644 --- a/util/semaphore-group/semaphore_group_test.go +++ b/util/semaphore-group/semaphore_group_test.go @@ -1,6 +1,7 @@ package semaphoregroup import ( + "context" "testing" "time" ) @@ -9,9 +10,9 @@ func TestSemaphoreGroup(t *testing.T) { semGroup := NewSemaphoreGroup(2) for i := 0; i < 5; i++ { - semGroup.Add() + semGroup.Add(context.Background()) go func(id int) { - defer semGroup.Done() + defer semGroup.Done(context.Background()) got := len(semGroup.semaphore) if got == 0 { @@ -31,3 +32,35 @@ func TestSemaphoreGroup(t *testing.T) { t.Errorf("Expected semaphore length %d, got %d", want, got) } } + +func TestSemaphoreGroupContext(t *testing.T) { + semGroup := NewSemaphoreGroup(1) + semGroup.Add(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + rChan := make(chan struct{}) + + go func() { + semGroup.Add(ctx) + rChan <- struct{}{} + }() + select { + case <-rChan: + case <-time.NewTimer(2 * time.Second).C: + t.Error("Adding to semaphore group should not block when context is not done") + } + + semGroup.Done(context.Background()) + + ctxDone, cancelDone := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancelDone) + go func() { + semGroup.Done(ctxDone) + rChan <- struct{}{} + }() + select { + case <-rChan: + case <-time.NewTimer(2 * time.Second).C: + t.Error("Releasing from semaphore group should not block when context is not done") + } +}