diff --git a/cmd/synapse/bin.go b/cmd/synapse/bin.go index 5bce183..804f032 100644 --- a/cmd/synapse/bin.go +++ b/cmd/synapse/bin.go @@ -95,8 +95,11 @@ func run(cmd *cobra.Command, args []string) { logger.Fatalf("Could not instantiate proxyhandler %v", err) } + // All attempts to connect to lambdatest server failed + connectionFailed := make(chan struct{}) + wg.Add(1) - go synapse.InitiateConnection(ctx, &wg) + go synapse.InitiateConnection(ctx, &wg, connectionFailed) wg.Add(1) go func() { @@ -138,6 +141,21 @@ func run(cmd *cobra.Command, args []string) { case <-time.After(global.GracefulTimeout): logger.Errorf("Graceful timeout exceeded. Brutally killing the application") } + } + + case <-connectionFailed: + { + logger.Debugf("main: all attempts to connect to lamdatest server failed ....") + // tell the goroutines to stop + logger.Debugf("main: telling goroutines to stop") + cancel() + select { + case <-done: + logger.Debugf("Go routines exited within timeout") + case <-time.After(global.GracefulTimeout): + logger.Errorf("Graceful timeout exceeded. Brutally killing the application") + } + os.Exit(0) } case <-done: diff --git a/docker-compose.yml b/docker-compose.yml index 1d3e08c..2dea48c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ services: synapse: image: lambdatest/synapse:latest stop_signal: SIGINT - restart: always + restart: on-failure networks: - test-at-scale hostname: synapse diff --git a/go.mod b/go.mod index 5f733d9..d062734 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/klauspost/compress v1.11.13 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/leodido/go-urn v1.2.1 // indirect + github.com/lestrrat-go/backoff v1.0.1 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect diff --git a/go.sum b/go.sum index c593572..f9d97f0 100644 --- a/go.sum +++ b/go.sum @@ -579,6 +579,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/lestrrat-go/backoff v1.0.1 h1:Gphaach0QvvtaHmR9U8hwXNHXWckPyD8V6S+V+D184c= +github.com/lestrrat-go/backoff v1.0.1/go.mod h1:5QVJKC49Q5yQvCrpup0ZqzDGHEO/O4H82cnDuYdumkw= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= diff --git a/pkg/core/synapse.go b/pkg/core/synapse.go index 5c5fb11..7f1ad72 100644 --- a/pkg/core/synapse.go +++ b/pkg/core/synapse.go @@ -8,5 +8,5 @@ import ( // SynapseManager denfines operations for synapse client type SynapseManager interface { // InitiateConnection initiates the connection with LT cloud - InitiateConnection(ctx context.Context, wg *sync.WaitGroup) + InitiateConnection(ctx context.Context, wg *sync.WaitGroup, connectionFailed chan struct{}) } diff --git a/pkg/global/synapseconstants.go b/pkg/global/synapseconstants.go index 5b6347c..deabfab 100644 --- a/pkg/global/synapseconstants.go +++ b/pkg/global/synapseconstants.go @@ -6,20 +6,21 @@ import ( // all constant related to synapse const ( - GracefulTimeout = 100000 * time.Millisecond - ProxyServerPort = "8000" - DirectoryPermissions = 0755 - FilePermissions = 0755 - GitConfigFileName = "oauth" - RepoSecretsFileName = "reposecrets" - SynapseContainerURL = "http://synapse:8000" - NetworkEnvName = "NetworkName" - AutoRemoveEnv = "AutoRemove" - SynapseHostEnv = "synapsehost" - LocalEnv = "local" - NetworkName = "test-at-scale" - AutoRemove = true - Local = true + GracefulTimeout = 100 * time.Second + ProxyServerPort = "8000" + DirectoryPermissions = 0755 + FilePermissions = 0755 + GitConfigFileName = "oauth" + RepoSecretsFileName = "reposecrets" + SynapseContainerURL = "http://synapse:8000" + NetworkEnvName = "NetworkName" + AutoRemoveEnv = "AutoRemove" + SynapseHostEnv = "synapsehost" + LocalEnv = "local" + NetworkName = "test-at-scale" + AutoRemove = true + Local = true + MaxConnectionAttempts = 10 ) // SocketURL lambdatest url for synapse socket diff --git a/pkg/synapse/synapse.go b/pkg/synapse/synapse.go index 589cd8d..5e92e02 100644 --- a/pkg/synapse/synapse.go +++ b/pkg/synapse/synapse.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/LambdaTest/synapse/pkg/core" "github.com/LambdaTest/synapse/pkg/global" @@ -12,23 +13,31 @@ import ( "github.com/LambdaTest/synapse/pkg/utils" "github.com/denisbrodbeck/machineid" "github.com/gorilla/websocket" + "github.com/lestrrat-go/backoff" "github.com/spf13/viper" ) // All constant related to synapse const ( - Repo = "repo" - BuildID = "build-id" - JobID = "job-id" - Mode = "mode" - ID = "id" + Repo = "repo" + BuildID = "build-id" + JobID = "job-id" + Mode = "mode" + ID = "id" + DuplicateConnectionErr = "Synapse already has an open connection" + AuthenticationFailed = "Synapse authentication failed" ) type synapse struct { - conn *websocket.Conn - runner core.DockerRunner - secretsManager core.SecretsManager - logger lumber.Logger + conn *websocket.Conn + runner core.DockerRunner + secretsManager core.SecretsManager + logger lumber.Logger + MsgErrChan chan struct{} + MsgChan chan []byte + ConnectionAborted chan struct{} + InvalidConnectionRequest chan struct{} + LogoutRequired bool } // New returns new instance of synapse @@ -37,59 +46,144 @@ func New( logger lumber.Logger, secretsManager core.SecretsManager, ) core.SynapseManager { + return &synapse{ - runner: runner, - logger: logger, - secretsManager: secretsManager, + runner: runner, + logger: logger, + secretsManager: secretsManager, + MsgErrChan: make(chan struct{}), + InvalidConnectionRequest: make(chan struct{}), + MsgChan: make(chan []byte, 1024), + ConnectionAborted: make(chan struct{}, 10), + LogoutRequired: true, } } func (s *synapse) InitiateConnection( ctx context.Context, wg *sync.WaitGroup, -) { + connectionFailed chan struct{}) { defer wg.Done() - - s.logger.Debugf("starting socket connection") - s.logger.Errorf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) - conn, _, err := websocket.DefaultDialer.Dial(global.SocketURL[viper.GetString("env")], nil) - if err != nil { - s.logger.Fatalf("error connecting synapse to lambdatest %+v", err) - } - s.conn = conn - defer conn.Close() - - s.logger.Debugf("synapse connected to lambdatest server") - go s.handleIncomingMessage() - s.login() + go s.openAndMaintainConnection(ctx, connectionFailed) <-ctx.Done() - s.logout() + if s.LogoutRequired { + s.logout() + } s.runner.KillRunningDocker(context.TODO()) s.logger.Debugf("exiting synapse") } -func (s *synapse) handleIncomingMessage() { +/* +openAndMaintainConnection tries to create and mantain connection with +exponential backoff factor +*/ +func (s *synapse) openAndMaintainConnection(ctx context.Context, connectionFailed chan struct{}) { + // setup exponential backoff for retrying control websocket connection + var policy = backoff.NewExponential( + backoff.WithInterval(500*time.Millisecond), // base interval + backoff.WithJitterFactor(0.05), // 5% jitter + backoff.WithMaxRetries(global.MaxConnectionAttempts), // If not specified, default number of retries is 10 + ) + + b, cancel := policy.Start(context.Background()) + defer cancel() + s.logger.Debugf("starting socket connection at URL %s", global.SocketURL[viper.GetString("env")]) + for backoff.Continue(b) { + s.logger.Debugf("trying to connect to lamdatest server") + select { + case <-ctx.Done(): + return + default: + conn, _, err := websocket.DefaultDialer.Dial(global.SocketURL[viper.GetString("env")], nil) + if err != nil { + s.logger.Errorf("error connecting synapse to lambdatest %+v", err) + continue + } + s.conn = conn + s.logger.Debugf("synapse connected to lambdatest server") + s.login() + if !s.connectionHandler(ctx, conn, connectionFailed) { + return + } + s.MsgErrChan = make(chan struct{}) + go s.openAndMaintainConnection(ctx, connectionFailed) + return + + } + } + s.logger.Errorf("Unable to establish connection with lambdatest server. exiting...") + connectionFailed <- struct{}{} + s.LogoutRequired = false +} + +/* + connectionHandler handles the connection by listening to any connection closer + also it returns boolean value which repersents whether we can retry to connect +*/ +func (s *synapse) connectionHandler(ctx context.Context, conn *websocket.Conn, connectionFailed chan struct{}) bool { + normalCloser := make(chan struct{}) + ctxDone := false + defer func() { + // if gracefully terminated, wait for logout message to be sent + if !ctxDone { + conn.Close() + } + s.ConnectionAborted <- struct{}{} + }() + + go s.messageReader(normalCloser, conn) + go s.messageWriter(conn) + select { + case <-ctx.Done(): + ctxDone = true + return false + case <-normalCloser: + return false + case <-s.InvalidConnectionRequest: + connectionFailed <- struct{}{} + s.LogoutRequired = false + return false + case <-s.MsgErrChan: + s.logger.Errorf("Connection between synpase and lambdatest break") + return true + } +} + +/* +messageReader reads websocket messages and acts upon it +*/ +func (s *synapse) messageReader(normalCloser chan struct{}, conn *websocket.Conn) { // s.conn.SetReadLimit(maxMessageSize) // s.conn.SetReadDeadline(time.Now().Add(pingWait)) - s.conn.SetPingHandler(func(string) error { - if err := s.conn.WriteMessage(websocket.PongMessage, nil); err != nil { + conn.SetPingHandler(func(string) error { + if err := conn.WriteMessage(websocket.PongMessage, nil); err != nil { s.logger.Errorf("Error in writing pong msg %s", err.Error()) + s.MsgErrChan <- struct{}{} + close(s.MsgErrChan) return err } return nil }) for { - _, msg, err := s.conn.ReadMessage() + _, msg, err := conn.ReadMessage() if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + s.logger.Debugf("Normal closure occurred...........") + normalCloser <- struct{}{} + return + } s.logger.Errorf("disconnecting from lambdatest server. error in reading message %v", err) + s.MsgErrChan <- struct{}{} + close(s.MsgErrChan) return } s.processMessage(msg) } } +// processMessage process messages recieved via websocket func (s *synapse) processMessage(msg []byte) { var message core.Message err := json.Unmarshal(msg, &message) @@ -100,6 +194,7 @@ func (s *synapse) processMessage(msg []byte) { switch message.Type { case core.MsgError: s.logger.Debugf("error message received from server") + go s.processErrorMessage(message) case core.MsgInfo: s.logger.Debugf("info message received from server") case core.MsgTask: @@ -110,6 +205,17 @@ func (s *synapse) processMessage(msg []byte) { } } +// processErrorMessage handles error messages +func (s *synapse) processErrorMessage(message core.Message) { + errMsg := string(message.Content) + s.logger.Errorf("error message received from server, error %s ", errMsg) + if errMsg == DuplicateConnectionErr || errMsg == AuthenticationFailed { + s.InvalidConnectionRequest <- struct{}{} + } + +} + +// processTask handles task type message func (s *synapse) processTask(message core.Message) { var runnerOpts core.RunnerOptions err := json.Unmarshal(message.Content, &runnerOpts) @@ -122,7 +228,7 @@ func (s *synapse) processTask(message core.Message) { jobInfo := CreateJobInfo(core.JobStarted, &runnerOpts) s.logger.Infof("Sending update to neuron %+v", jobInfo) resourceStatsMessage := CreateJobUpdateMessage(jobInfo) - s.SendMessage(&resourceStatsMessage) + s.writeMessageToBuffer(&resourceStatsMessage) } // mounting secrets to container runnerOpts.HostVolumePath = fmt.Sprintf("/tmp/synapse/data/%s", runnerOpts.ContainerName) @@ -141,6 +247,7 @@ func (s *synapse) processTask(message core.Message) { } +// runAndUpdateJobStatus intiate and sends jobs status func (s *synapse) runAndUpdateJobStatus(runnerOpts core.RunnerOptions) { // starting container statusChan := make(chan core.ContainerStatus) @@ -160,9 +267,10 @@ func (s *synapse) runAndUpdateJobStatus(runnerOpts core.RunnerOptions) { jobInfo := CreateJobInfo(jobStatus, &runnerOpts) s.logger.Infof("Sending update to neuron %+v", jobInfo) resourceStatsMessage := CreateJobUpdateMessage(jobInfo) - s.SendMessage(&resourceStatsMessage) + s.writeMessageToBuffer(&resourceStatsMessage) } +// login write login message to lambdatest server func (s *synapse) login() { cpu, ram := s.runner.GetInfo(context.TODO()) id, err := machineid.ProtectedID("synapaseMeta") @@ -179,14 +287,26 @@ func (s *synapse) login() { s.logger.Infof("Login synapse with id %s", loginDetails.SynapseID) loginMessage := CreateLoginMessage(loginDetails) - s.SendMessage(&loginMessage) + s.writeMessageToBuffer(&loginMessage) } +// logout writes logout message to lambdatest server func (s *synapse) logout() { + s.logger.Infof("Logging out from lambdatest server") logoutMessage := CreateLogoutMessage() - s.SendMessage(&logoutMessage) + messageJson, err := json.Marshal(logoutMessage) + + if err != nil { + s.logger.Errorf("error marshaling message") + return + } + if err := s.conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { + s.logger.Errorf("error sending message to the server, error %v", err) + + } } +// sendResourceUpdates sends resource status of synapse func (s *synapse) sendResourceUpdates( status core.StatType, runnerOpts core.RunnerOptions, @@ -198,18 +318,33 @@ func (s *synapse) sendResourceUpdates( RAM: specs.RAM, } resourceStatsMessage := CreateResourceStatsMessage(resourceStats) - s.SendMessage(&resourceStatsMessage) + s.writeMessageToBuffer(&resourceStatsMessage) } -func (s *synapse) SendMessage(message *core.Message) { +// writeMessageToBuffer writes all message to buffer channel +func (s *synapse) writeMessageToBuffer(message *core.Message) { messageJson, err := json.Marshal(message) if err != nil { s.logger.Errorf("error marshaling message") return } - err = s.conn.WriteMessage(websocket.TextMessage, messageJson) - if err != nil { - s.logger.Errorf("error sending message to the server") - return + s.MsgChan <- messageJson +} + +// messageWriter writes the messages to open websocket +func (s *synapse) messageWriter(conn *websocket.Conn) { + for { + select { + case <-s.ConnectionAborted: + return + case messageJson := <-s.MsgChan: + if err := conn.WriteMessage(websocket.TextMessage, messageJson); err != nil { + s.logger.Errorf("error sending message to the server error %v", err) + s.MsgChan <- messageJson + s.MsgErrChan <- struct{}{} + close(s.MsgErrChan) + return + } + } } } diff --git a/pkg/synapse/utils.go b/pkg/synapse/utils.go index ea8f7ce..4650feb 100644 --- a/pkg/synapse/utils.go +++ b/pkg/synapse/utils.go @@ -4,7 +4,6 @@ import ( "encoding/json" "github.com/LambdaTest/synapse/pkg/core" - "github.com/gorilla/websocket" ) // CreateLoginMessage creates message of type login @@ -68,19 +67,6 @@ func CreateResourceStatsMessage(resourceStats core.ResourceStats) core.Message { } } -// SendMessage sends message to the server -func SendMessage(ws *websocket.Conn, message core.Message) error { - messageJson, err := json.Marshal(message) - if err != nil { - return err - } - err = ws.WriteMessage(websocket.TextMessage, messageJson) - if err != nil { - return err - } - return nil -} - // GetResources returns dummy resources based on pod type func GetResources(tierOpts core.Tier) core.Specs { if val, ok := core.TierOpts[tierOpts]; ok {