From 13c2d5071c5e1e6789635d5f205e685ae92ece08 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sat, 13 Jul 2024 17:09:48 -0400 Subject: [PATCH 1/3] Try to fix infinite conn read in SH tests It seems SH can't receive a short message too fast --- transport/internet/splithttp/splithttp_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/transport/internet/splithttp/splithttp_test.go b/transport/internet/splithttp/splithttp_test.go index 7e22c9adba29..28489aeb9951 100644 --- a/transport/internet/splithttp/splithttp_test.go +++ b/transport/internet/splithttp/splithttp_test.go @@ -50,6 +50,7 @@ func Test_listenSHAndDial(t *testing.T) { } conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) + <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM common.Must(err) _, err = conn.Write([]byte("Test connection 1")) common.Must(err) @@ -65,6 +66,8 @@ func Test_listenSHAndDial(t *testing.T) { common.Must(conn.Close()) <-time.After(time.Second * 5) conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) + + <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM common.Must(err) _, err = conn.Write([]byte("Test connection 2")) common.Must(err) @@ -107,6 +110,7 @@ func TestDialWithRemoteAddr(t *testing.T) { ProtocolSettings: &Config{Path: "sh", Header: map[string]string{"X-Forwarded-For": "1.1.1.1"}}, }) + <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM common.Must(err) _, err = conn.Write([]byte("Test connection 1")) common.Must(err) From f04d22181af0c7ba5046e12c54c2ac997b825758 Mon Sep 17 00:00:00 2001 From: nobody Date: Wed, 17 Jul 2024 01:20:36 +0800 Subject: [PATCH 2/3] fix SplitHTTP unit test fails bug --- transport/internet/splithttp/hub.go | 12 ++++++++++++ transport/internet/splithttp/splithttp_test.go | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/transport/internet/splithttp/hub.go b/transport/internet/splithttp/hub.go index 27fab68d45bc..b31ec707976a 100644 --- a/transport/internet/splithttp/hub.go +++ b/transport/internet/splithttp/hub.go @@ -27,6 +27,7 @@ type requestHandler struct { host string path string ln *Listener + sessionMu *sync.Mutex sessions sync.Map localAddr gonet.TCPAddr } @@ -56,11 +57,21 @@ func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessi } func (h *requestHandler) upsertSession(sessionId string) *httpSession { + // fast path currentSessionAny, ok := h.sessions.Load(sessionId) if ok { return currentSessionAny.(*httpSession) } + // slow path + h.sessionMu.Lock() + defer h.sessionMu.Unlock() + + currentSessionAny, ok = h.sessions.Load(sessionId) + if ok { + return currentSessionAny.(*httpSession) + } + s := &httpSession{ uploadQueue: NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())), isFullyConnected: done.New(), @@ -277,6 +288,7 @@ func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSet host: shSettings.Host, path: shSettings.GetNormalizedPath(), ln: l, + sessionMu: &sync.Mutex{}, sessions: sync.Map{}, localAddr: localAddr, } diff --git a/transport/internet/splithttp/splithttp_test.go b/transport/internet/splithttp/splithttp_test.go index 28489aeb9951..e4e1254f912c 100644 --- a/transport/internet/splithttp/splithttp_test.go +++ b/transport/internet/splithttp/splithttp_test.go @@ -64,7 +64,7 @@ func Test_listenSHAndDial(t *testing.T) { } common.Must(conn.Close()) - <-time.After(time.Second * 5) + // <-time.After(time.Second * 5) conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM From 51b8e036530e37a8814359cf9d75dcd25acbff4c Mon Sep 17 00:00:00 2001 From: mmmray <142015632+mmmray@users.noreply.github.com> Date: Wed, 17 Jul 2024 06:09:13 -0500 Subject: [PATCH 3/3] remove sleeps from tests --- transport/internet/splithttp/splithttp_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/transport/internet/splithttp/splithttp_test.go b/transport/internet/splithttp/splithttp_test.go index e4e1254f912c..db15745f722f 100644 --- a/transport/internet/splithttp/splithttp_test.go +++ b/transport/internet/splithttp/splithttp_test.go @@ -50,7 +50,6 @@ func Test_listenSHAndDial(t *testing.T) { } conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) - <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM common.Must(err) _, err = conn.Write([]byte("Test connection 1")) common.Must(err) @@ -64,10 +63,8 @@ func Test_listenSHAndDial(t *testing.T) { } common.Must(conn.Close()) - // <-time.After(time.Second * 5) conn, err = Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) - <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM common.Must(err) _, err = conn.Write([]byte("Test connection 2")) common.Must(err) @@ -110,7 +107,6 @@ func TestDialWithRemoteAddr(t *testing.T) { ProtocolSettings: &Config{Path: "sh", Header: map[string]string{"X-Forwarded-For": "1.1.1.1"}}, }) - <-time.After(time.Millisecond * 100) // SH can't receive a short message too fast ATM common.Must(err) _, err = conn.Write([]byte("Test connection 1")) common.Must(err)