Skip to content

Commit

Permalink
stable
Browse files Browse the repository at this point in the history
  • Loading branch information
saurabh-prakash committed Mar 22, 2022
1 parent fd98d9c commit faa8377
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 41 deletions.
10 changes: 7 additions & 3 deletions cmd/nucleus/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/LambdaTest/synapse/pkg/global"
"github.com/LambdaTest/synapse/pkg/lumber"
"github.com/LambdaTest/synapse/pkg/payloadmanager"
"github.com/LambdaTest/synapse/pkg/requestutils"
"github.com/LambdaTest/synapse/pkg/secret"
"github.com/LambdaTest/synapse/pkg/server"
"github.com/LambdaTest/synapse/pkg/service/coverage"
Expand Down Expand Up @@ -113,18 +114,21 @@ func run(cmd *cobra.Command, args []string) {
pm := payloadmanager.NewPayloadManger(azureClient, logger, cfg)
secretParser := secret.New(logger)
tcm := tasconfigmanager.NewTASConfigManager(logger)
requests := requestutils.New(logger)
execManager := command.NewExecutionManager(secretParser, azureClient, logger)
gm := gitmanager.NewGitManager(logger, execManager)
dm := diffmanager.NewDiffManager(cfg, logger)
tds := testdiscoveryservice.NewTestDiscoveryService(execManager, logger)

tdResChan := make(chan core.DiscoveryResult)
tds := testdiscoveryservice.NewTestDiscoveryService(ctx, tdResChan, execManager, requests, logger)
tes := testexecutionservice.NewTestExecutionService(execManager, azureClient, ts, logger)
tbs, err := blocktestservice.NewTestBlockTestService(cfg, logger)
if err != nil {
logger.Fatalf("failed to initialize test blocklist service: %v", err)
}
router := api.NewRouter(logger, ts)
router := api.NewRouter(logger, ts, tdResChan)

t, err := task.New(ctx, cfg, logger)
t, err := task.New(ctx, requests, logger)
if err != nil {
logger.Fatalf("failed to initialize task: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/websocket v1.4.2
github.com/joho/godotenv v1.4.0
github.com/lestrrat-go/backoff v1.0.1
github.com/mholt/archiver/v3 v3.5.1
github.com/shirou/gopsutil/v3 v3.21.1
github.com/sirupsen/logrus v1.8.1
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
github.com/klauspost/compress v1.11.13 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lestrrat-go/backoff v1.0.1 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package api
import (
"github.com/LambdaTest/synapse/pkg/api/health"
"github.com/LambdaTest/synapse/pkg/api/results"
"github.com/LambdaTest/synapse/pkg/api/testlist"
"github.com/LambdaTest/synapse/pkg/core"
"github.com/LambdaTest/synapse/pkg/lumber"
"github.com/LambdaTest/synapse/pkg/service/teststats"
"github.com/gin-gonic/gin"
Expand All @@ -12,13 +14,15 @@ import (
type Router struct {
logger lumber.Logger
testStatsService *teststats.ProcStats
tdResChan chan core.DiscoveryResult
}

// NewRouter returns instance of Router
func NewRouter(logger lumber.Logger, ts *teststats.ProcStats) Router {
func NewRouter(logger lumber.Logger, ts *teststats.ProcStats, tdResChan chan core.DiscoveryResult) Router {
return Router{
logger: logger,
testStatsService: ts,
tdResChan: tdResChan,
}
}

Expand All @@ -33,6 +37,7 @@ func (r Router) Handler() *gin.Engine {
// router.Use(cors.New(corsConfig))
router.GET("/health", health.Handler)
router.POST("/results", results.Handler(r.logger, r.testStatsService))
router.POST("/test-list", testlist.Handler(r.logger, r.tdResChan))

return router

Expand Down
26 changes: 26 additions & 0 deletions pkg/api/testlist/testlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package testlist

import (
"net/http"

"github.com/LambdaTest/synapse/pkg/core"
"github.com/LambdaTest/synapse/pkg/lumber"
"github.com/gin-gonic/gin"
)

//Handler captures the test execution results from nucleus
func Handler(logger lumber.Logger, tdResChan chan core.DiscoveryResult) gin.HandlerFunc {
return func(c *gin.Context) {
request := core.DiscoveryResult{}
if err := c.ShouldBindJSON(&request); err != nil {
logger.Errorf("error while binding json %v", err)
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
return
}

go func() {
tdResChan <- request
}()
c.Data(http.StatusOK, gin.MIMEPlain, []byte(http.StatusText(http.StatusOK)))
}
}
4 changes: 4 additions & 0 deletions pkg/core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ type ExecutionManager interface {
// StoreCommandLogs stores the command logs in the azure.
StoreCommandLogs(ctx context.Context, blobPath string, reader io.Reader) <-chan error
}

type Requests interface {
MakeAPIRequest(ctx context.Context, httpMethod, endpoint string, body []byte) error
}
5 changes: 3 additions & 2 deletions pkg/core/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime/debug"
"strconv"
"time"

Expand All @@ -21,6 +22,7 @@ import (

const (
endpointPostTestResults = "http://localhost:9876/results"
endpointPostTestList = "http://localhost:9876/test-list"
)

// NewPipeline creates and returns a new Pipeline instance
Expand All @@ -45,7 +47,6 @@ func (pl *Pipeline) Start(ctx context.Context) (err error) {
pl.Logger.Debugf("Starting pipeline.....")
pl.Logger.Debugf("Fetching config")

endpointPostTestList := global.NeuronHost + "/test-list"
// fetch configuration
payload, err := pl.PayloadManager.FetchPayload(ctx, pl.Cfg.PayloadAddress)
if err != nil {
Expand Down Expand Up @@ -100,7 +101,7 @@ func (pl *Pipeline) Start(ctx context.Context) (err error) {
defer func() {
taskPayload.EndTime = time.Now()
if p := recover(); p != nil {
pl.Logger.Errorf("panic stack trace: %v", p)
pl.Logger.Errorf("panic stack trace: %v\n%s", p, string(debug.Stack()))
taskPayload.Status = Error
taskPayload.Remark = errs.GenericErrRemark.Error()
} else if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/core/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ type Pipeline struct {
HttpClient http.Client
}

type DiscoveryResult struct {
Tests []TestPayload `json:"tests"`
ImpactedTests []string `json:"impactedTests"`
TestSuites []TestSuitePayload `json:"testSuites"`
ExecuteAllTests bool `json:"executeAllTests"`
Parallelism int `json:"parallelism"`
RepoID string `json:"repoID"`
BuildID string `json:"buildID"`
CommitID string `json:"commitID"`
TaskID string `json:"taskID"`
OrgID string `json:"orgID"`
Branch string `json:"branch"`
Tier Tier `json:"tier"`
}

// ExecutionResult represents the request body for test and test suite execution
type ExecutionResult struct {
TestPayload []TestPayload `json:"testResults"`
Expand Down
55 changes: 55 additions & 0 deletions pkg/requestutils/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package requestutils

import (
"bytes"
"context"
"errors"
"io/ioutil"
"net/http"
"time"

"github.com/LambdaTest/synapse/pkg/core"
"github.com/LambdaTest/synapse/pkg/lumber"
)

type requests struct {
logger lumber.Logger
client http.Client
}

func New(logger lumber.Logger) core.Requests {
return &requests{
logger: logger,
client: http.Client{Timeout: 30 * time.Second},
}
}

func (r *requests) MakeAPIRequest(ctx context.Context, httpMethod, endpoint string, body []byte) error {
req, err := http.NewRequestWithContext(ctx, httpMethod, endpoint, bytes.NewBuffer(body))

if err != nil {
r.logger.Errorf("error while creating http request %v", err)
return err
}

resp, err := r.client.Do(req)
if err != nil {
r.logger.Errorf("error while sending http request %v", err)
return err
}

defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
r.logger.Errorf("error while sending http response body %v", err)
return err
}

if resp.StatusCode != http.StatusOK {
r.logger.Errorf("non 200 status code %s", string(respBody))
return errors.New("non 200 status code")
}

return nil
}
11 changes: 11 additions & 0 deletions pkg/tasconfigmanager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,18 @@ func (tc *tasConfigManager) LoadAndValidate(ctx context.Context,

tc.logger.Errorf("Error while validating yaml file, error %v", validateErr)
return nil, errors.New(errMsg)
}

if tasConfig.Cache == nil {
checksum, err := utils.ComputeChecksum(fmt.Sprintf("%s/%s", global.RepoDir, packageJSON))
if err != nil {
tc.logger.Errorf("Error while computing checksum, error %v", err)
return nil, err
}
tasConfig.Cache = &core.Cache{
Key: checksum,
Paths: []string{},
}
}

if tasConfig.CoverageThreshold == nil {
Expand Down
35 changes: 4 additions & 31 deletions pkg/task/task.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package task

import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"time"

"github.com/LambdaTest/synapse/config"
"github.com/LambdaTest/synapse/pkg/core"
"github.com/LambdaTest/synapse/pkg/global"
"github.com/LambdaTest/synapse/pkg/lumber"
Expand All @@ -18,16 +13,16 @@ import (
// task represents each instance of nucleus spawned by neuron
type task struct {
ctx context.Context
client http.Client
requests core.Requests
endpoint string
logger lumber.Logger
}

// New returns new task
func New(ctx context.Context, cfg *config.NucleusConfig, logger lumber.Logger) (core.Task, error) {
func New(ctx context.Context, requests core.Requests, logger lumber.Logger) (core.Task, error) {
return &task{
ctx: ctx,
client: http.Client{Timeout: 30 * time.Second},
requests: requests,
logger: logger,
endpoint: global.NeuronHost + "/task",
}, nil
Expand All @@ -42,32 +37,10 @@ func (t *task) UpdateStatus(payload *core.TaskPayload) error {
return err
}

req, err := http.NewRequestWithContext(t.ctx, http.MethodPut, t.endpoint, bytes.NewBuffer(reqBody))

if err != nil {
t.logger.Errorf("error while creating http request %v", err)
if err := t.requests.MakeAPIRequest(t.ctx, http.MethodPut, t.endpoint, reqBody); err != nil {
return err
}

resp, err := t.client.Do(req)
if err != nil {
t.logger.Errorf("error while sending http request %v", err)
return err
}

defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.logger.Errorf("error while sending http response body %v", err)
return err
}

if resp.StatusCode != http.StatusOK {
t.logger.Errorf("non 200 status code %s", string(respBody))
return errors.New("non 200 status code")
}

return nil

}
43 changes: 40 additions & 3 deletions pkg/testdiscoveryservice/testdiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package testdiscoveryservice

import (
"context"
"encoding/json"
"net/http"
"os/exec"

"github.com/LambdaTest/synapse/pkg/core"
Expand All @@ -13,14 +15,28 @@ import (
)

type testDiscoveryService struct {
ctx context.Context
logger lumber.Logger
execManager core.ExecutionManager
tdResChan chan core.DiscoveryResult
requests core.Requests
endpoint string
}

// NewTestDiscoveryService creates and returns a new testDiscoveryService instance
func NewTestDiscoveryService(execManager core.ExecutionManager, logger lumber.Logger) core.TestDiscoveryService {
tds := testDiscoveryService{logger: logger, execManager: execManager}
return &tds
func NewTestDiscoveryService(ctx context.Context,
tdResChan chan core.DiscoveryResult,
execManager core.ExecutionManager,
requests core.Requests,
logger lumber.Logger) core.TestDiscoveryService {
return &testDiscoveryService{
ctx: ctx,
logger: logger,
execManager: execManager,
tdResChan: tdResChan,
requests: requests,
endpoint: global.NeuronHost + "/test-list",
}
}

func (tds *testDiscoveryService) Discover(ctx context.Context,
Expand Down Expand Up @@ -93,5 +109,26 @@ func (tds *testDiscoveryService) Discover(ctx context.Context,
return err
}

testDiscoveryResult := <-tds.tdResChan
testDiscoveryResult.Parallelism = tasConfig.Parallelism
testDiscoveryResult.Tier = tasConfig.Tier
if err := tds.updateResult(&testDiscoveryResult); err != nil {
return err
}
return nil
}

func (tds *testDiscoveryService) updateResult(testDiscoveryResult *core.DiscoveryResult) error {
reqBody, err := json.Marshal(testDiscoveryResult)
if err != nil {
tds.logger.Errorf("error while json marshal %v", err)
return err
}

if err := tds.requests.MakeAPIRequest(tds.ctx, http.MethodPost, tds.endpoint, reqBody); err != nil {
return err
}

return nil

}

0 comments on commit faa8377

Please sign in to comment.