From 3973898e10cd0c89053ce5e91a95ecc674975c0b Mon Sep 17 00:00:00 2001 From: VikrantKS <96419531+VikrantKS@users.noreply.github.com> Date: Wed, 2 Mar 2022 13:10:10 +0530 Subject: [PATCH 1/6] added retry websocket connection logic --- pkg/global/synapseconstants.go | 29 ++++----- pkg/synapse/synapse.go | 111 ++++++++++++++++++++++++++------- 2 files changed, 105 insertions(+), 35 deletions(-) diff --git a/pkg/global/synapseconstants.go b/pkg/global/synapseconstants.go index 5b6347c..44fb67e 100644 --- a/pkg/global/synapseconstants.go +++ b/pkg/global/synapseconstants.go @@ -6,20 +6,21 @@ import ( // all constant related to synapse const ( - GracefulTimeout = 100000 * time.Millisecond - ProxyServerPort = "8000" - DirectoryPermissions = 0755 - FilePermissions = 0755 - GitConfigFileName = "oauth" - RepoSecretsFileName = "reposecrets" - SynapseContainerURL = "http://synapse:8000" - NetworkEnvName = "NetworkName" - AutoRemoveEnv = "AutoRemove" - SynapseHostEnv = "synapsehost" - LocalEnv = "local" - NetworkName = "test-at-scale" - AutoRemove = true - Local = true + GracefulTimeout = 100000 * time.Millisecond + ProxyServerPort = "8000" + DirectoryPermissions = 0755 + FilePermissions = 0755 + GitConfigFileName = "oauth" + RepoSecretsFileName = "reposecrets" + SynapseContainerURL = "http://synapse:8000" + NetworkEnvName = "NetworkName" + AutoRemoveEnv = "AutoRemove" + SynapseHostEnv = "synapsehost" + LocalEnv = "local" + NetworkName = "test-at-scale" + AutoRemove = true + Local = true + MaxConnectionAttempts = 10 ) // SocketURL lambdatest url for synapse socket diff --git a/pkg/synapse/synapse.go b/pkg/synapse/synapse.go index 589cd8d..ca2da2e 100644 --- a/pkg/synapse/synapse.go +++ b/pkg/synapse/synapse.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/LambdaTest/synapse/pkg/core" "github.com/LambdaTest/synapse/pkg/global" @@ -12,6 +13,7 @@ import ( "github.com/LambdaTest/synapse/pkg/utils" "github.com/denisbrodbeck/machineid" "github.com/gorilla/websocket" + "github.com/lestrrat-go/backoff" "github.com/spf13/viper" ) @@ -25,10 +27,13 @@ const ( ) type synapse struct { - conn *websocket.Conn - runner core.DockerRunner - secretsManager core.SecretsManager - logger lumber.Logger + conn *websocket.Conn + runner core.DockerRunner + secretsManager core.SecretsManager + logger lumber.Logger + MsgErrChan chan struct{} + MsgChan chan []byte + ConnectionAborted chan struct{} } // New returns new instance of synapse @@ -37,10 +42,12 @@ func New( logger lumber.Logger, secretsManager core.SecretsManager, ) core.SynapseManager { + return &synapse{ runner: runner, logger: logger, secretsManager: secretsManager, + MsgChan: make(chan []byte, 1024), } } @@ -49,18 +56,7 @@ func (s *synapse) InitiateConnection( wg *sync.WaitGroup, ) { defer wg.Done() - - s.logger.Debugf("starting socket connection") - s.logger.Errorf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) - conn, _, err := websocket.DefaultDialer.Dial(global.SocketURL[viper.GetString("env")], nil) - if err != nil { - s.logger.Fatalf("error connecting synapse to lambdatest %+v", err) - } - s.conn = conn - defer conn.Close() - - s.logger.Debugf("synapse connected to lambdatest server") - go s.handleIncomingMessage() + go s.openAndMaintainConnection(ctx) s.login() <-ctx.Done() s.logout() @@ -68,13 +64,68 @@ func (s *synapse) InitiateConnection( s.logger.Debugf("exiting synapse") } -func (s *synapse) handleIncomingMessage() { +func (s *synapse) openAndMaintainConnection(ctx context.Context) { + // setup exponential backoff for retrying control websocket connection + var policy = backoff.NewExponential( + backoff.WithInterval(500*time.Millisecond), // base interval + backoff.WithJitterFactor(0.05), // 5% jitter + backoff.WithMaxRetries(global.MaxConnectionAttempts), // If not specified, default number of retries is 10 + ) + + normalCloser := make(chan struct{}) + + b, cancel := policy.Start(context.Background()) + defer cancel() + s.logger.Debugf("starting socket connection") + s.logger.Errorf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) + for backoff.Continue(b) { + + select { + case <-ctx.Done(): + return + default: + conn, _, err := websocket.DefaultDialer.Dial(global.SocketURL[viper.GetString("env")], nil) + if err != nil { + s.logger.Errorf("error connecting synapse to lambdatest %+v", err) + s.logger.Warnf("Retrying ...") + s.ConnectionAborted <- struct{}{} + continue + + } + s.conn = conn + s.logger.Debugf("synapse connected to lambdatest server") + go s.handleIncomingMessage(normalCloser) + go s.WriteMessage() + select { + case <-ctx.Done(): + conn.Close() + s.ConnectionAborted <- struct{}{} + return + case <-normalCloser: + conn.Close() + s.ConnectionAborted <- struct{}{} + return + case <-s.MsgErrChan: + s.logger.Errorf("Connection between synpase and lambdatest break") + s.logger.Warnf("Retrying ...") + s.ConnectionAborted <- struct{}{} + conn.Close() + + } + + } + } + +} + +func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { // s.conn.SetReadLimit(maxMessageSize) // s.conn.SetReadDeadline(time.Now().Add(pingWait)) s.conn.SetPingHandler(func(string) error { if err := s.conn.WriteMessage(websocket.PongMessage, nil); err != nil { s.logger.Errorf("Error in writing pong msg %s", err.Error()) + s.MsgErrChan <- struct{}{} return err } return nil @@ -83,7 +134,13 @@ func (s *synapse) handleIncomingMessage() { for { _, msg, err := s.conn.ReadMessage() if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + s.logger.Debugf("Normal closure occurred...........") + normalCloser <- struct{}{} + return + } s.logger.Errorf("disconnecting from lambdatest server. error in reading message %v", err) + s.MsgErrChan <- struct{}{} return } s.processMessage(msg) @@ -207,9 +264,21 @@ func (s *synapse) SendMessage(message *core.Message) { s.logger.Errorf("error marshaling message") return } - err = s.conn.WriteMessage(websocket.TextMessage, messageJson) - if err != nil { - s.logger.Errorf("error sending message to the server") - return + s.MsgChan <- messageJson +} + +func (s *synapse) WriteMessage() { + for { + select { + case <-s.ConnectionAborted: + return + case messageJson := <-s.MsgChan: + if err := s.conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { + s.logger.Errorf("error sending message to the server") + s.MsgChan <- messageJson + s.MsgErrChan <- struct{}{} + return + } + } } } From 97d5148217a37daba0fa6e6b674268f8dbeec46a Mon Sep 17 00:00:00 2001 From: VikrantKS <96419531+VikrantKS@users.noreply.github.com> Date: Fri, 4 Mar 2022 16:01:16 +0530 Subject: [PATCH 2/6] added connection abort functionality if duplicate connection requested --- cmd/synapse/bin.go | 20 ++++++++- pkg/core/synapse.go | 2 +- pkg/synapse/synapse.go | 100 ++++++++++++++++++++++++++++------------- 3 files changed, 89 insertions(+), 33 deletions(-) diff --git a/cmd/synapse/bin.go b/cmd/synapse/bin.go index 5bce183..804f032 100644 --- a/cmd/synapse/bin.go +++ b/cmd/synapse/bin.go @@ -95,8 +95,11 @@ func run(cmd *cobra.Command, args []string) { logger.Fatalf("Could not instantiate proxyhandler %v", err) } + // All attempts to connect to lambdatest server failed + connectionFailed := make(chan struct{}) + wg.Add(1) - go synapse.InitiateConnection(ctx, &wg) + go synapse.InitiateConnection(ctx, &wg, connectionFailed) wg.Add(1) go func() { @@ -138,6 +141,21 @@ func run(cmd *cobra.Command, args []string) { case <-time.After(global.GracefulTimeout): logger.Errorf("Graceful timeout exceeded. Brutally killing the application") } + } + + case <-connectionFailed: + { + logger.Debugf("main: all attempts to connect to lamdatest server failed ....") + // tell the goroutines to stop + logger.Debugf("main: telling goroutines to stop") + cancel() + select { + case <-done: + logger.Debugf("Go routines exited within timeout") + case <-time.After(global.GracefulTimeout): + logger.Errorf("Graceful timeout exceeded. Brutally killing the application") + } + os.Exit(0) } case <-done: diff --git a/pkg/core/synapse.go b/pkg/core/synapse.go index 5c5fb11..7f1ad72 100644 --- a/pkg/core/synapse.go +++ b/pkg/core/synapse.go @@ -8,5 +8,5 @@ import ( // SynapseManager denfines operations for synapse client type SynapseManager interface { // InitiateConnection initiates the connection with LT cloud - InitiateConnection(ctx context.Context, wg *sync.WaitGroup) + InitiateConnection(ctx context.Context, wg *sync.WaitGroup, connectionFailed chan struct{}) } diff --git a/pkg/synapse/synapse.go b/pkg/synapse/synapse.go index ca2da2e..4a71601 100644 --- a/pkg/synapse/synapse.go +++ b/pkg/synapse/synapse.go @@ -19,21 +19,25 @@ import ( // All constant related to synapse const ( - Repo = "repo" - BuildID = "build-id" - JobID = "job-id" - Mode = "mode" - ID = "id" + Repo = "repo" + BuildID = "build-id" + JobID = "job-id" + Mode = "mode" + ID = "id" + DuplicateConnectionErr = "Duplicate connection" + AuthenticationFailed = "Authentication failed" ) type synapse struct { - conn *websocket.Conn - runner core.DockerRunner - secretsManager core.SecretsManager - logger lumber.Logger - MsgErrChan chan struct{} - MsgChan chan []byte - ConnectionAborted chan struct{} + conn *websocket.Conn + runner core.DockerRunner + secretsManager core.SecretsManager + logger lumber.Logger + MsgErrChan chan struct{} + MsgChan chan []byte + ConnectionAborted chan struct{} + InvalidConnectionRequest chan struct{} + LogoutRequired bool } // New returns new instance of synapse @@ -44,27 +48,32 @@ func New( ) core.SynapseManager { return &synapse{ - runner: runner, - logger: logger, - secretsManager: secretsManager, - MsgChan: make(chan []byte, 1024), + runner: runner, + logger: logger, + secretsManager: secretsManager, + MsgErrChan: make(chan struct{}), + InvalidConnectionRequest: make(chan struct{}), + MsgChan: make(chan []byte, 1024), + ConnectionAborted: make(chan struct{}, 10), + LogoutRequired: true, } } func (s *synapse) InitiateConnection( ctx context.Context, wg *sync.WaitGroup, -) { + connectionFailed chan struct{}) { defer wg.Done() - go s.openAndMaintainConnection(ctx) - s.login() + go s.openAndMaintainConnection(ctx, connectionFailed) <-ctx.Done() - s.logout() + if !s.LogoutRequired { + s.logout() + } s.runner.KillRunningDocker(context.TODO()) s.logger.Debugf("exiting synapse") } -func (s *synapse) openAndMaintainConnection(ctx context.Context) { +func (s *synapse) openAndMaintainConnection(ctx context.Context, connectionFailed chan struct{}) { // setup exponential backoff for retrying control websocket connection var policy = backoff.NewExponential( backoff.WithInterval(500*time.Millisecond), // base interval @@ -79,7 +88,7 @@ func (s *synapse) openAndMaintainConnection(ctx context.Context) { s.logger.Debugf("starting socket connection") s.logger.Errorf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) for backoff.Continue(b) { - + s.logger.Debugf("trying to connect to lamdatest server") select { case <-ctx.Done(): return @@ -87,35 +96,41 @@ func (s *synapse) openAndMaintainConnection(ctx context.Context) { conn, _, err := websocket.DefaultDialer.Dial(global.SocketURL[viper.GetString("env")], nil) if err != nil { s.logger.Errorf("error connecting synapse to lambdatest %+v", err) - s.logger.Warnf("Retrying ...") - s.ConnectionAborted <- struct{}{} continue - } s.conn = conn s.logger.Debugf("synapse connected to lambdatest server") + s.login() go s.handleIncomingMessage(normalCloser) go s.WriteMessage() select { case <-ctx.Done(): - conn.Close() s.ConnectionAborted <- struct{}{} return case <-normalCloser: conn.Close() s.ConnectionAborted <- struct{}{} return + case <-s.InvalidConnectionRequest: + conn.Close() + s.ConnectionAborted <- struct{}{} + connectionFailed <- struct{}{} + s.LogoutRequired = false + return case <-s.MsgErrChan: s.logger.Errorf("Connection between synpase and lambdatest break") - s.logger.Warnf("Retrying ...") s.ConnectionAborted <- struct{}{} conn.Close() - } + s.MsgErrChan = make(chan struct{}) + go s.openAndMaintainConnection(ctx, connectionFailed) + return } } - + s.logger.Errorf("Unable to establish connection with lambdatest server. exiting...") + connectionFailed <- struct{}{} + s.LogoutRequired = false } func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { @@ -126,6 +141,7 @@ func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { if err := s.conn.WriteMessage(websocket.PongMessage, nil); err != nil { s.logger.Errorf("Error in writing pong msg %s", err.Error()) s.MsgErrChan <- struct{}{} + close(s.MsgErrChan) return err } return nil @@ -141,6 +157,7 @@ func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { } s.logger.Errorf("disconnecting from lambdatest server. error in reading message %v", err) s.MsgErrChan <- struct{}{} + close(s.MsgErrChan) return } s.processMessage(msg) @@ -157,6 +174,7 @@ func (s *synapse) processMessage(msg []byte) { switch message.Type { case core.MsgError: s.logger.Debugf("error message received from server") + go s.processErrorMessage(message) case core.MsgInfo: s.logger.Debugf("info message received from server") case core.MsgTask: @@ -167,6 +185,15 @@ func (s *synapse) processMessage(msg []byte) { } } +func (s *synapse) processErrorMessage(message core.Message) { + errMsg := string(message.Content) + s.logger.Errorf("error message received from server, error %s ", errMsg) + if errMsg == DuplicateConnectionErr || errMsg == AuthenticationFailed { + s.InvalidConnectionRequest <- struct{}{} + } + +} + func (s *synapse) processTask(message core.Message) { var runnerOpts core.RunnerOptions err := json.Unmarshal(message.Content, &runnerOpts) @@ -240,8 +267,18 @@ func (s *synapse) login() { } func (s *synapse) logout() { + s.logger.Infof("Logging out from lambdatest server") logoutMessage := CreateLogoutMessage() - s.SendMessage(&logoutMessage) + messageJson, err := json.Marshal(logoutMessage) + + if err != nil { + s.logger.Errorf("error marshaling message") + return + } + if err := s.conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { + s.logger.Errorf("error sending message to the server, error %v", err) + + } } func (s *synapse) sendResourceUpdates( @@ -274,9 +311,10 @@ func (s *synapse) WriteMessage() { return case messageJson := <-s.MsgChan: if err := s.conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { - s.logger.Errorf("error sending message to the server") + s.logger.Errorf("error sending message to the server error %v", err) s.MsgChan <- messageJson s.MsgErrChan <- struct{}{} + close(s.MsgErrChan) return } } From cfb81599324f96157875dfb54394fe4bd0da91db Mon Sep 17 00:00:00 2001 From: VikrantKS <96419531+VikrantKS@users.noreply.github.com> Date: Fri, 4 Mar 2022 16:19:05 +0530 Subject: [PATCH 3/6] added entry in go.mod and go.sum , change restart policy in docker-compose --- docker-compose.yml | 2 +- go.mod | 1 + go.sum | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1d3e08c..2dea48c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ services: synapse: image: lambdatest/synapse:latest stop_signal: SIGINT - restart: always + restart: on-failure networks: - test-at-scale hostname: synapse diff --git a/go.mod b/go.mod index 5f733d9..d062734 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/klauspost/compress v1.11.13 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/leodido/go-urn v1.2.1 // indirect + github.com/lestrrat-go/backoff v1.0.1 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect diff --git a/go.sum b/go.sum index c593572..f9d97f0 100644 --- a/go.sum +++ b/go.sum @@ -579,6 +579,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/lestrrat-go/backoff v1.0.1 h1:Gphaach0QvvtaHmR9U8hwXNHXWckPyD8V6S+V+D184c= +github.com/lestrrat-go/backoff v1.0.1/go.mod h1:5QVJKC49Q5yQvCrpup0ZqzDGHEO/O4H82cnDuYdumkw= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= From e3493e0e26bf376e96c4469d39d871d6c46f85aa Mon Sep 17 00:00:00 2001 From: VikrantKS <96419531+VikrantKS@users.noreply.github.com> Date: Fri, 4 Mar 2022 17:48:18 +0530 Subject: [PATCH 4/6] fix logout when graceful shutdown --- pkg/synapse/synapse.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/synapse/synapse.go b/pkg/synapse/synapse.go index 4a71601..f3039ca 100644 --- a/pkg/synapse/synapse.go +++ b/pkg/synapse/synapse.go @@ -66,7 +66,7 @@ func (s *synapse) InitiateConnection( defer wg.Done() go s.openAndMaintainConnection(ctx, connectionFailed) <-ctx.Done() - if !s.LogoutRequired { + if s.LogoutRequired { s.logout() } s.runner.KillRunningDocker(context.TODO()) From 4ce0bccf98a2b9fb83f7521cbd21c106505e060e Mon Sep 17 00:00:00 2001 From: VikrantKS <96419531+VikrantKS@users.noreply.github.com> Date: Wed, 16 Mar 2022 19:52:57 +0530 Subject: [PATCH 5/6] refactor code and resolve review comments --- pkg/synapse/synapse.go | 100 ++++++++++++++++++++++++++--------------- pkg/synapse/utils.go | 14 ------ 2 files changed, 64 insertions(+), 50 deletions(-) diff --git a/pkg/synapse/synapse.go b/pkg/synapse/synapse.go index f3039ca..5e92e02 100644 --- a/pkg/synapse/synapse.go +++ b/pkg/synapse/synapse.go @@ -24,8 +24,8 @@ const ( JobID = "job-id" Mode = "mode" ID = "id" - DuplicateConnectionErr = "Duplicate connection" - AuthenticationFailed = "Authentication failed" + DuplicateConnectionErr = "Synapse already has an open connection" + AuthenticationFailed = "Synapse authentication failed" ) type synapse struct { @@ -73,6 +73,10 @@ func (s *synapse) InitiateConnection( s.logger.Debugf("exiting synapse") } +/* +openAndMaintainConnection tries to create and mantain connection with +exponential backoff factor +*/ func (s *synapse) openAndMaintainConnection(ctx context.Context, connectionFailed chan struct{}) { // setup exponential backoff for retrying control websocket connection var policy = backoff.NewExponential( @@ -81,12 +85,9 @@ func (s *synapse) openAndMaintainConnection(ctx context.Context, connectionFaile backoff.WithMaxRetries(global.MaxConnectionAttempts), // If not specified, default number of retries is 10 ) - normalCloser := make(chan struct{}) - b, cancel := policy.Start(context.Background()) defer cancel() - s.logger.Debugf("starting socket connection") - s.logger.Errorf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) + s.logger.Debugf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) for backoff.Continue(b) { s.logger.Debugf("trying to connect to lamdatest server") select { @@ -101,26 +102,8 @@ func (s *synapse) openAndMaintainConnection(ctx context.Context, connectionFaile s.conn = conn s.logger.Debugf("synapse connected to lambdatest server") s.login() - go s.handleIncomingMessage(normalCloser) - go s.WriteMessage() - select { - case <-ctx.Done(): - s.ConnectionAborted <- struct{}{} - return - case <-normalCloser: - conn.Close() - s.ConnectionAborted <- struct{}{} + if !s.connectionHandler(ctx, conn, connectionFailed) { return - case <-s.InvalidConnectionRequest: - conn.Close() - s.ConnectionAborted <- struct{}{} - connectionFailed <- struct{}{} - s.LogoutRequired = false - return - case <-s.MsgErrChan: - s.logger.Errorf("Connection between synpase and lambdatest break") - s.ConnectionAborted <- struct{}{} - conn.Close() } s.MsgErrChan = make(chan struct{}) go s.openAndMaintainConnection(ctx, connectionFailed) @@ -133,12 +116,48 @@ func (s *synapse) openAndMaintainConnection(ctx context.Context, connectionFaile s.LogoutRequired = false } -func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { +/* + connectionHandler handles the connection by listening to any connection closer + also it returns boolean value which repersents whether we can retry to connect +*/ +func (s *synapse) connectionHandler(ctx context.Context, conn *websocket.Conn, connectionFailed chan struct{}) bool { + normalCloser := make(chan struct{}) + ctxDone := false + defer func() { + // if gracefully terminated, wait for logout message to be sent + if !ctxDone { + conn.Close() + } + s.ConnectionAborted <- struct{}{} + }() + + go s.messageReader(normalCloser, conn) + go s.messageWriter(conn) + select { + case <-ctx.Done(): + ctxDone = true + return false + case <-normalCloser: + return false + case <-s.InvalidConnectionRequest: + connectionFailed <- struct{}{} + s.LogoutRequired = false + return false + case <-s.MsgErrChan: + s.logger.Errorf("Connection between synpase and lambdatest break") + return true + } +} + +/* +messageReader reads websocket messages and acts upon it +*/ +func (s *synapse) messageReader(normalCloser chan struct{}, conn *websocket.Conn) { // s.conn.SetReadLimit(maxMessageSize) // s.conn.SetReadDeadline(time.Now().Add(pingWait)) - s.conn.SetPingHandler(func(string) error { - if err := s.conn.WriteMessage(websocket.PongMessage, nil); err != nil { + conn.SetPingHandler(func(string) error { + if err := conn.WriteMessage(websocket.PongMessage, nil); err != nil { s.logger.Errorf("Error in writing pong msg %s", err.Error()) s.MsgErrChan <- struct{}{} close(s.MsgErrChan) @@ -148,7 +167,7 @@ func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { }) for { - _, msg, err := s.conn.ReadMessage() + _, msg, err := conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { s.logger.Debugf("Normal closure occurred...........") @@ -164,6 +183,7 @@ func (s *synapse) handleIncomingMessage(normalCloser chan struct{}) { } } +// processMessage process messages recieved via websocket func (s *synapse) processMessage(msg []byte) { var message core.Message err := json.Unmarshal(msg, &message) @@ -185,6 +205,7 @@ func (s *synapse) processMessage(msg []byte) { } } +// processErrorMessage handles error messages func (s *synapse) processErrorMessage(message core.Message) { errMsg := string(message.Content) s.logger.Errorf("error message received from server, error %s ", errMsg) @@ -194,6 +215,7 @@ func (s *synapse) processErrorMessage(message core.Message) { } +// processTask handles task type message func (s *synapse) processTask(message core.Message) { var runnerOpts core.RunnerOptions err := json.Unmarshal(message.Content, &runnerOpts) @@ -206,7 +228,7 @@ func (s *synapse) processTask(message core.Message) { jobInfo := CreateJobInfo(core.JobStarted, &runnerOpts) s.logger.Infof("Sending update to neuron %+v", jobInfo) resourceStatsMessage := CreateJobUpdateMessage(jobInfo) - s.SendMessage(&resourceStatsMessage) + s.writeMessageToBuffer(&resourceStatsMessage) } // mounting secrets to container runnerOpts.HostVolumePath = fmt.Sprintf("/tmp/synapse/data/%s", runnerOpts.ContainerName) @@ -225,6 +247,7 @@ func (s *synapse) processTask(message core.Message) { } +// runAndUpdateJobStatus intiate and sends jobs status func (s *synapse) runAndUpdateJobStatus(runnerOpts core.RunnerOptions) { // starting container statusChan := make(chan core.ContainerStatus) @@ -244,9 +267,10 @@ func (s *synapse) runAndUpdateJobStatus(runnerOpts core.RunnerOptions) { jobInfo := CreateJobInfo(jobStatus, &runnerOpts) s.logger.Infof("Sending update to neuron %+v", jobInfo) resourceStatsMessage := CreateJobUpdateMessage(jobInfo) - s.SendMessage(&resourceStatsMessage) + s.writeMessageToBuffer(&resourceStatsMessage) } +// login write login message to lambdatest server func (s *synapse) login() { cpu, ram := s.runner.GetInfo(context.TODO()) id, err := machineid.ProtectedID("synapaseMeta") @@ -263,9 +287,10 @@ func (s *synapse) login() { s.logger.Infof("Login synapse with id %s", loginDetails.SynapseID) loginMessage := CreateLoginMessage(loginDetails) - s.SendMessage(&loginMessage) + s.writeMessageToBuffer(&loginMessage) } +// logout writes logout message to lambdatest server func (s *synapse) logout() { s.logger.Infof("Logging out from lambdatest server") logoutMessage := CreateLogoutMessage() @@ -281,6 +306,7 @@ func (s *synapse) logout() { } } +// sendResourceUpdates sends resource status of synapse func (s *synapse) sendResourceUpdates( status core.StatType, runnerOpts core.RunnerOptions, @@ -292,10 +318,11 @@ func (s *synapse) sendResourceUpdates( RAM: specs.RAM, } resourceStatsMessage := CreateResourceStatsMessage(resourceStats) - s.SendMessage(&resourceStatsMessage) + s.writeMessageToBuffer(&resourceStatsMessage) } -func (s *synapse) SendMessage(message *core.Message) { +// writeMessageToBuffer writes all message to buffer channel +func (s *synapse) writeMessageToBuffer(message *core.Message) { messageJson, err := json.Marshal(message) if err != nil { s.logger.Errorf("error marshaling message") @@ -304,13 +331,14 @@ func (s *synapse) SendMessage(message *core.Message) { s.MsgChan <- messageJson } -func (s *synapse) WriteMessage() { +// messageWriter writes the messages to open websocket +func (s *synapse) messageWriter(conn *websocket.Conn) { for { select { case <-s.ConnectionAborted: return case messageJson := <-s.MsgChan: - if err := s.conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { + if err := conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { s.logger.Errorf("error sending message to the server error %v", err) s.MsgChan <- messageJson s.MsgErrChan <- struct{}{} diff --git a/pkg/synapse/utils.go b/pkg/synapse/utils.go index ea8f7ce..4650feb 100644 --- a/pkg/synapse/utils.go +++ b/pkg/synapse/utils.go @@ -4,7 +4,6 @@ import ( "encoding/json" "github.com/LambdaTest/synapse/pkg/core" - "github.com/gorilla/websocket" ) // CreateLoginMessage creates message of type login @@ -68,19 +67,6 @@ func CreateResourceStatsMessage(resourceStats core.ResourceStats) core.Message { } } -// SendMessage sends message to the server -func SendMessage(ws *websocket.Conn, message core.Message) error { - messageJson, err := json.Marshal(message) - if err != nil { - return err - } - err = ws.WriteMessage(websocket.TextMessage, messageJson) - if err != nil { - return err - } - return nil -} - // GetResources returns dummy resources based on pod type func GetResources(tierOpts core.Tier) core.Specs { if val, ok := core.TierOpts[tierOpts]; ok { From 869920334a960a8c166bd07ed68260d9001adf96 Mon Sep 17 00:00:00 2001 From: VikrantKS <96419531+VikrantKS@users.noreply.github.com> Date: Thu, 17 Mar 2022 11:04:54 +0530 Subject: [PATCH 6/6] change gracefultimeout for synapse from ms to s --- pkg/global/synapseconstants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/global/synapseconstants.go b/pkg/global/synapseconstants.go index 44fb67e..deabfab 100644 --- a/pkg/global/synapseconstants.go +++ b/pkg/global/synapseconstants.go @@ -6,7 +6,7 @@ import ( // all constant related to synapse const ( - GracefulTimeout = 100000 * time.Millisecond + GracefulTimeout = 100 * time.Second ProxyServerPort = "8000" DirectoryPermissions = 0755 FilePermissions = 0755