diff --git a/cmd/nucleus/bin.go b/cmd/nucleus/bin.go index 079e0d5b..70a34359 100644 --- a/cmd/nucleus/bin.go +++ b/cmd/nucleus/bin.go @@ -25,6 +25,7 @@ import ( "github.com/LambdaTest/synapse/pkg/global" "github.com/LambdaTest/synapse/pkg/lumber" "github.com/LambdaTest/synapse/pkg/payloadmanager" + "github.com/LambdaTest/synapse/pkg/requestutils" "github.com/LambdaTest/synapse/pkg/secret" "github.com/LambdaTest/synapse/pkg/server" "github.com/LambdaTest/synapse/pkg/service/coverage" @@ -113,18 +114,21 @@ func run(cmd *cobra.Command, args []string) { pm := payloadmanager.NewPayloadManger(azureClient, logger, cfg) secretParser := secret.New(logger) tcm := tasconfigmanager.NewTASConfigManager(logger) + requests := requestutils.New(logger) execManager := command.NewExecutionManager(secretParser, azureClient, logger) gm := gitmanager.NewGitManager(logger, execManager) dm := diffmanager.NewDiffManager(cfg, logger) - tds := testdiscoveryservice.NewTestDiscoveryService(execManager, logger) + + tdResChan := make(chan core.DiscoveryResult) + tds := testdiscoveryservice.NewTestDiscoveryService(ctx, tdResChan, execManager, requests, logger) tes := testexecutionservice.NewTestExecutionService(execManager, azureClient, ts, logger) tbs, err := blocktestservice.NewTestBlockTestService(cfg, logger) if err != nil { logger.Fatalf("failed to initialize test blocklist service: %v", err) } - router := api.NewRouter(logger, ts) + router := api.NewRouter(logger, ts, tdResChan) - t, err := task.New(ctx, cfg, logger) + t, err := task.New(ctx, requests, logger) if err != nil { logger.Fatalf("failed to initialize task: %v", err) } diff --git a/go.mod b/go.mod index d062734a..c4cdf8dc 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/google/uuid v1.2.0 github.com/gorilla/websocket v1.4.2 github.com/joho/godotenv v1.4.0 + github.com/lestrrat-go/backoff v1.0.1 github.com/mholt/archiver/v3 v3.5.1 github.com/shirou/gopsutil/v3 v3.21.1 github.com/sirupsen/logrus v1.8.1 @@ -52,7 +53,6 @@ require ( github.com/klauspost/compress v1.11.13 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/leodido/go-urn v1.2.1 // indirect - github.com/lestrrat-go/backoff v1.0.1 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect diff --git a/pkg/api/router.go b/pkg/api/router.go index 2fbe7821..9e8f57d5 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -3,6 +3,8 @@ package api import ( "github.com/LambdaTest/synapse/pkg/api/health" "github.com/LambdaTest/synapse/pkg/api/results" + "github.com/LambdaTest/synapse/pkg/api/testlist" + "github.com/LambdaTest/synapse/pkg/core" "github.com/LambdaTest/synapse/pkg/lumber" "github.com/LambdaTest/synapse/pkg/service/teststats" "github.com/gin-gonic/gin" @@ -12,13 +14,15 @@ import ( type Router struct { logger lumber.Logger testStatsService *teststats.ProcStats + tdResChan chan core.DiscoveryResult } // NewRouter returns instance of Router -func NewRouter(logger lumber.Logger, ts *teststats.ProcStats) Router { +func NewRouter(logger lumber.Logger, ts *teststats.ProcStats, tdResChan chan core.DiscoveryResult) Router { return Router{ logger: logger, testStatsService: ts, + tdResChan: tdResChan, } } @@ -33,6 +37,7 @@ func (r Router) Handler() *gin.Engine { // router.Use(cors.New(corsConfig)) router.GET("/health", health.Handler) router.POST("/results", results.Handler(r.logger, r.testStatsService)) + router.POST("/test-list", testlist.Handler(r.logger, r.tdResChan)) return router diff --git a/pkg/api/testlist/testlist.go b/pkg/api/testlist/testlist.go new file mode 100644 index 00000000..2bca29fb --- /dev/null +++ b/pkg/api/testlist/testlist.go @@ -0,0 +1,26 @@ +package testlist + +import ( + "net/http" + + "github.com/LambdaTest/synapse/pkg/core" + "github.com/LambdaTest/synapse/pkg/lumber" + "github.com/gin-gonic/gin" +) + +//Handler captures the test execution results from nucleus +func Handler(logger lumber.Logger, tdResChan chan core.DiscoveryResult) gin.HandlerFunc { + return func(c *gin.Context) { + request := core.DiscoveryResult{} + if err := c.ShouldBindJSON(&request); err != nil { + logger.Errorf("error while binding json %v", err) + c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()}) + return + } + + go func() { + tdResChan <- request + }() + c.Data(http.StatusOK, gin.MIMEPlain, []byte(http.StatusText(http.StatusOK))) + } +} diff --git a/pkg/core/interfaces.go b/pkg/core/interfaces.go index 6b5d96f4..e4ef4168 100644 --- a/pkg/core/interfaces.go +++ b/pkg/core/interfaces.go @@ -117,3 +117,7 @@ type ExecutionManager interface { // StoreCommandLogs stores the command logs in the azure. StoreCommandLogs(ctx context.Context, blobPath string, reader io.Reader) <-chan error } + +type Requests interface { + MakeAPIRequest(ctx context.Context, httpMethod, endpoint string, body []byte) error +} diff --git a/pkg/core/lifecycle.go b/pkg/core/lifecycle.go index 227b2f79..8e735845 100644 --- a/pkg/core/lifecycle.go +++ b/pkg/core/lifecycle.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "runtime/debug" "strconv" "time" @@ -21,6 +22,7 @@ import ( const ( endpointPostTestResults = "http://localhost:9876/results" + endpointPostTestList = "http://localhost:9876/test-list" ) // NewPipeline creates and returns a new Pipeline instance @@ -45,7 +47,6 @@ func (pl *Pipeline) Start(ctx context.Context) (err error) { pl.Logger.Debugf("Starting pipeline.....") pl.Logger.Debugf("Fetching config") - endpointPostTestList := global.NeuronHost + "/test-list" // fetch configuration payload, err := pl.PayloadManager.FetchPayload(ctx, pl.Cfg.PayloadAddress) if err != nil { @@ -100,7 +101,7 @@ func (pl *Pipeline) Start(ctx context.Context) (err error) { defer func() { taskPayload.EndTime = time.Now() if p := recover(); p != nil { - pl.Logger.Errorf("panic stack trace: %v", p) + pl.Logger.Errorf("panic stack trace: %v\n%s", p, string(debug.Stack())) taskPayload.Status = Error taskPayload.Remark = errs.GenericErrRemark.Error() } else if err != nil { diff --git a/pkg/core/models.go b/pkg/core/models.go index ca381e36..8f483ea6 100644 --- a/pkg/core/models.go +++ b/pkg/core/models.go @@ -129,6 +129,21 @@ type Pipeline struct { HttpClient http.Client } +type DiscoveryResult struct { + Tests []TestPayload `json:"tests"` + ImpactedTests []string `json:"impactedTests"` + TestSuites []TestSuitePayload `json:"testSuites"` + ExecuteAllTests bool `json:"executeAllTests"` + Parallelism int `json:"parallelism"` + RepoID string `json:"repoID"` + BuildID string `json:"buildID"` + CommitID string `json:"commitID"` + TaskID string `json:"taskID"` + OrgID string `json:"orgID"` + Branch string `json:"branch"` + Tier Tier `json:"tier"` +} + // ExecutionResult represents the request body for test and test suite execution type ExecutionResult struct { TestPayload []TestPayload `json:"testResults"` diff --git a/pkg/requestutils/request.go b/pkg/requestutils/request.go new file mode 100644 index 00000000..90bd05f5 --- /dev/null +++ b/pkg/requestutils/request.go @@ -0,0 +1,55 @@ +package requestutils + +import ( + "bytes" + "context" + "errors" + "io/ioutil" + "net/http" + "time" + + "github.com/LambdaTest/synapse/pkg/core" + "github.com/LambdaTest/synapse/pkg/lumber" +) + +type requests struct { + logger lumber.Logger + client http.Client +} + +func New(logger lumber.Logger) core.Requests { + return &requests{ + logger: logger, + client: http.Client{Timeout: 30 * time.Second}, + } +} + +func (r *requests) MakeAPIRequest(ctx context.Context, httpMethod, endpoint string, body []byte) error { + req, err := http.NewRequestWithContext(ctx, httpMethod, endpoint, bytes.NewBuffer(body)) + + if err != nil { + r.logger.Errorf("error while creating http request %v", err) + return err + } + + resp, err := r.client.Do(req) + if err != nil { + r.logger.Errorf("error while sending http request %v", err) + return err + } + + defer resp.Body.Close() + + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + r.logger.Errorf("error while sending http response body %v", err) + return err + } + + if resp.StatusCode != http.StatusOK { + r.logger.Errorf("non 200 status code %s", string(respBody)) + return errors.New("non 200 status code") + } + + return nil +} diff --git a/pkg/tasconfigmanager/setup.go b/pkg/tasconfigmanager/setup.go index 68f014cf..6c6a54c5 100644 --- a/pkg/tasconfigmanager/setup.go +++ b/pkg/tasconfigmanager/setup.go @@ -98,7 +98,18 @@ func (tc *tasConfigManager) LoadAndValidate(ctx context.Context, tc.logger.Errorf("Error while validating yaml file, error %v", validateErr) return nil, errors.New(errMsg) + } + if tasConfig.Cache == nil { + checksum, err := utils.ComputeChecksum(fmt.Sprintf("%s/%s", global.RepoDir, packageJSON)) + if err != nil { + tc.logger.Errorf("Error while computing checksum, error %v", err) + return nil, err + } + tasConfig.Cache = &core.Cache{ + Key: checksum, + Paths: []string{}, + } } if tasConfig.CoverageThreshold == nil { diff --git a/pkg/task/task.go b/pkg/task/task.go index 4f16bb2b..eb9cb9f2 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -1,15 +1,10 @@ package task import ( - "bytes" "context" "encoding/json" - "errors" - "io/ioutil" "net/http" - "time" - "github.com/LambdaTest/synapse/config" "github.com/LambdaTest/synapse/pkg/core" "github.com/LambdaTest/synapse/pkg/global" "github.com/LambdaTest/synapse/pkg/lumber" @@ -18,16 +13,16 @@ import ( // task represents each instance of nucleus spawned by neuron type task struct { ctx context.Context - client http.Client + requests core.Requests endpoint string logger lumber.Logger } // New returns new task -func New(ctx context.Context, cfg *config.NucleusConfig, logger lumber.Logger) (core.Task, error) { +func New(ctx context.Context, requests core.Requests, logger lumber.Logger) (core.Task, error) { return &task{ ctx: ctx, - client: http.Client{Timeout: 30 * time.Second}, + requests: requests, logger: logger, endpoint: global.NeuronHost + "/task", }, nil @@ -42,32 +37,10 @@ func (t *task) UpdateStatus(payload *core.TaskPayload) error { return err } - req, err := http.NewRequestWithContext(t.ctx, http.MethodPut, t.endpoint, bytes.NewBuffer(reqBody)) - - if err != nil { - t.logger.Errorf("error while creating http request %v", err) + if err := t.requests.MakeAPIRequest(t.ctx, http.MethodPut, t.endpoint, reqBody); err != nil { return err } - resp, err := t.client.Do(req) - if err != nil { - t.logger.Errorf("error while sending http request %v", err) - return err - } - - defer resp.Body.Close() - - respBody, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.logger.Errorf("error while sending http response body %v", err) - return err - } - - if resp.StatusCode != http.StatusOK { - t.logger.Errorf("non 200 status code %s", string(respBody)) - return errors.New("non 200 status code") - } - return nil } diff --git a/pkg/testdiscoveryservice/testdiscovery.go b/pkg/testdiscoveryservice/testdiscovery.go index d8031986..1748e9d9 100644 --- a/pkg/testdiscoveryservice/testdiscovery.go +++ b/pkg/testdiscoveryservice/testdiscovery.go @@ -3,6 +3,8 @@ package testdiscoveryservice import ( "context" + "encoding/json" + "net/http" "os/exec" "github.com/LambdaTest/synapse/pkg/core" @@ -13,14 +15,28 @@ import ( ) type testDiscoveryService struct { + ctx context.Context logger lumber.Logger execManager core.ExecutionManager + tdResChan chan core.DiscoveryResult + requests core.Requests + endpoint string } // NewTestDiscoveryService creates and returns a new testDiscoveryService instance -func NewTestDiscoveryService(execManager core.ExecutionManager, logger lumber.Logger) core.TestDiscoveryService { - tds := testDiscoveryService{logger: logger, execManager: execManager} - return &tds +func NewTestDiscoveryService(ctx context.Context, + tdResChan chan core.DiscoveryResult, + execManager core.ExecutionManager, + requests core.Requests, + logger lumber.Logger) core.TestDiscoveryService { + return &testDiscoveryService{ + ctx: ctx, + logger: logger, + execManager: execManager, + tdResChan: tdResChan, + requests: requests, + endpoint: global.NeuronHost + "/test-list", + } } func (tds *testDiscoveryService) Discover(ctx context.Context, @@ -93,5 +109,26 @@ func (tds *testDiscoveryService) Discover(ctx context.Context, return err } + testDiscoveryResult := <-tds.tdResChan + testDiscoveryResult.Parallelism = tasConfig.Parallelism + testDiscoveryResult.Tier = tasConfig.Tier + if err := tds.updateResult(&testDiscoveryResult); err != nil { + return err + } + return nil +} + +func (tds *testDiscoveryService) updateResult(testDiscoveryResult *core.DiscoveryResult) error { + reqBody, err := json.Marshal(testDiscoveryResult) + if err != nil { + tds.logger.Errorf("error while json marshal %v", err) + return err + } + + if err := tds.requests.MakeAPIRequest(tds.ctx, http.MethodPost, tds.endpoint, reqBody); err != nil { + return err + } + return nil + }