Skip to content

Commit

Permalink
Scheduler reorganization (#458)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryancouto authored Oct 23, 2019
1 parent 4b61102 commit 9b02ac8
Show file tree
Hide file tree
Showing 118 changed files with 639 additions and 626 deletions.
35 changes: 18 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

NAME := scoot
DESC := distributed build tools
GOVERSION := $(shell go version)
Expand Down Expand Up @@ -29,8 +30,7 @@ vet:
go vet ./...

install:
go install ./binaries/...
go install ./perftests/...
go install ./...

# Cleans go.mod and go.sum of unused dependencies
tidy:
Expand Down Expand Up @@ -100,19 +100,19 @@ test-all: test-unit-property-integration coverage

############## standalone binary & integration tests

swarmtest: install
smoketest:
# Setup a local schedule against local workers (--strategy local.local)
# Then run (with go run) scootapi run_smoke_test with 10 jobs, wait 1m
# We build the binaries becuase 'go run' won't consistently pass signals to our program.
$(FIRSTGOPATH)/bin/setup-cloud-scoot --strategy local.local run scootapi run_smoke_test --num_jobs 10 --timeout 1m $(TRAVIS_FILTER)

recoverytest: install
# Some overlap with swarmtest but focuses on sagalog recovery vs worker/checkout correctness.
# We build the binaries becuase 'go run' won't consistently pass signals to our program.
# Ignore output here to reduce travis log size. Swarmtest is more important and that still logs.
# Then run (with go run) scootcl smoketest with 10 jobs, wait 1m
# We build the binaries because 'go run' won't consistently pass signals to our program.
$(FIRSTGOPATH)/bin/setup-cloud-scoot --strategy local.local run scootcl smoketest --num_jobs 10 --timeout 1m $(TRAVIS_FILTER)

recoverytest:
# Some overlap with smoketest but focuses on sagalog recovery vs worker/checkout correctness.
# We build the binaries because 'go run' won't consistently pass signals to our program.
# Ignore output here to reduce travis log size. Smoketest is more important and that still logs.
$(FIRSTGOPATH)/bin/recoverytest &>/dev/null

integrationtest: install
integrationtest:
# Integration test with some overlap with other standalone tests, but utilizes client binaries
$(FIRSTGOPATH)/bin/scoot-integration &>/dev/null
$(FIRSTGOPATH)/bin/bazel-integration &>/dev/null
Expand Down Expand Up @@ -142,12 +142,13 @@ thrift-worker-go:

thrift-sched-go:
# Create generated code in github.com/twitter/scoot/sched/gen-go/... from sched.thrift
cd sched && rm -rf gen-go && thrift -I ../bazel/execution/bazelapi/ --gen go:package_prefix=github.com/twitter/scoot/bazel/execution/bazelapi/gen-go/,thrift_import=github.com/apache/thrift/lib/go/thrift sched.thrift && cd ..
cd scheduler/domain && rm -rf gen-go && thrift -I ../../bazel/execution/bazelapi/ --gen go:package_prefix=github.com/twitter/scoot/bazel/execution/bazelapi/gen-go/,thrift_import=github.com/apache/thrift/lib/go/thrift sched.thrift && cd ../..
rm -rf scheduler/domain/gen-go/sched/sched-remote/

thrift-scoot-go:
# Create generated code in github.com/twitter/scoot/scootapi/gen-go/... from scoot.thrift
cd scootapi && rm -rf gen-go && thrift -I ../bazel/execution/bazelapi/ --gen go:package_prefix=github.com/twitter/scoot/bazel/execution/bazelapi/gen-go/,thrift_import=github.com/apache/thrift/lib/go/thrift scoot.thrift && cd ..
rm -rf scootapi/gen-go/scoot/cloud_scoot-remote/
# Create generated code in github.com/twitter/scoot/scheduler/api/thrift/gen-go/... from scoot.thrift
cd scheduler/api/thrift && rm -rf gen-go && thrift -I ../../../bazel/execution/bazelapi/ --gen go:package_prefix=github.com/twitter/scoot/bazel/execution/bazelapi/gen-go/,thrift_import=github.com/apache/thrift/lib/go/thrift scoot.thrift && cd ../../..
rm -rf scheduler/api/thrift/gen-go/scoot/cloud_scoot-remote/

thrift-bazel-go:
# Create generated code in github.com/twitter/scoot/bazel/execution/bazelapi/gen-go/... from bazel.thrift
Expand All @@ -164,4 +165,4 @@ bazel-proto:

dev-fullbuild: dev-dependencies generate test-all

travis: fs_util recoverytest swarmtest integrationtest test-all clean-data
travis: clean-data fs_util install recoverytest smoketest integrationtest test-all clean-data
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,39 @@ go run ./binaries/setup-cloud-scoot/main.go --strategy local.local
Run a series of randomly generated tests against the local scheduler and workers:

```sh
go run ./binaries/scootapi/main.go run_smoke_test
go run ./scheduler/client/scootcl/main.go smoke_test
```
## Scoot Integration Tests
Scoot has a few tests that exercise varying levels of common usages and workflows.

### Smoketest/Swarmtest
Invokes a scootapi client directly to run jobs against a local cluster and waits for the
### Smoketest
Invokes a scoot client directly to run jobs against a local cluster and waits for the
scheduled jobs to complete.

(./scootapi/client/smoke_test_cmd.go)
(./scheduler/client/cli/smoketest.go)

### Recoverytest
Invokes a scootapi client directly to run jobs against a local cluster, kills the cluster,
Invokes a scoot client directly to run jobs against a local cluster, kills the cluster,
attempts to spin up a new one, and waits for the originally scheduled jobs to complete.

(./binaries/recoverytest/main.go)

### Integration
Invokes a scootapi and scoot-snapshot-db client via CLI to run a job against a local cluster
Invokes a scoot and scoot-snapshot-db client via CLI to run a job against a local cluster
and waits for the job to complete

(./tests/integration_test.go)
(./binaries/scoot-integration/main.go)

Invokes a scoot bazel client (bzutil & fs_util) via CLI to run a job against a local cluster
using the bazel API grpc endpoints

(./binaries/bazel-integration/main.go)

## Scoot Thrift Code
__Thrift Prerequisites__
Install the Thrift tool and golang thrift repository locally using the following section.

__Generating thrift files (scootapi used as an example)__
__Generating thrift files__
See documentation in thrift definition files for specific generation instructions, or `make thrift`.

# Development Installation Instructions
Expand Down
16 changes: 8 additions & 8 deletions bazel/execution/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import (
loghelpers "github.com/twitter/scoot/common/log/helpers"
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/saga"
"github.com/twitter/scoot/sched"
"github.com/twitter/scoot/sched/scheduler"
"github.com/twitter/scoot/scootapi/server/api"
"github.com/twitter/scoot/scheduler/api/thrift"
"github.com/twitter/scoot/scheduler/domain"
"github.com/twitter/scoot/scheduler/server"
)

// Implements GRPCServer, remoteexecution.ExecutionServer, and longrunning.OperationsServer interfaces
type executionServer struct {
listener net.Listener
sagaCoord saga.SagaCoordinator
server *grpc.Server
scheduler scheduler.Scheduler
scheduler server.Scheduler
stat stats.StatsReceiver
}

// Creates a new GRPCServer (executionServer) based on a GRPC config, scheduler, and stats, and preregisters the service
func MakeExecutionServer(gc *bazel.GRPCConfig, s scheduler.Scheduler, stat stats.StatsReceiver) *executionServer {
func MakeExecutionServer(gc *bazel.GRPCConfig, s server.Scheduler, stat stats.StatsReceiver) *executionServer {
if gc == nil {
return nil
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *executionServer) Execute(
return status.Error(codes.InvalidArgument, fmt.Sprintf("Error converting request to internal definition: %s", err))
}

err = sched.ValidateJob(job)
err = domain.ValidateJob(job)
if err != nil {
log.Errorf("Scoot Job generated from request invalid: %s", err)
return status.Error(codes.Internal, fmt.Sprintf("Internal job definition invalid: %s", err))
Expand Down Expand Up @@ -280,7 +280,7 @@ func (s *executionServer) CancelOperation(_ context.Context, req *longrunning.Ca
// Internal functions

func (s *executionServer) getRunStatusAndValidate(jobID string) (*runStatus, error) {
js, err := api.GetJobStatus(jobID, s.sagaCoord)
js, err := thrift.GetJobStatus(jobID, s.sagaCoord)
if err != nil {
return nil, err
}
Expand All @@ -300,7 +300,7 @@ func (s *executionServer) getRunStatusAndValidate(jobID string) (*runStatus, err
}

func (s *executionServer) killJobAndValidate(jobID string) error {
js, err := api.KillJob(jobID, s.scheduler, s.sagaCoord)
js, err := thrift.KillJob(jobID, s.scheduler, s.sagaCoord)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions bazel/execution/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
scootproto "github.com/twitter/scoot/common/proto"
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/saga"
"github.com/twitter/scoot/sched/scheduler"
"github.com/twitter/scoot/scheduler/server"
)

// Determine that Execute can accept a well-formed request and returns a well-formed response
func TestExecute(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
sc := scheduler.NewMockScheduler(mockCtrl)
sc := server.NewMockScheduler(mockCtrl)
sc.EXPECT().ScheduleJob(gomock.Any()).Return("testJobID", nil)

s := executionServer{scheduler: sc, stat: stats.NilStatsReceiver()}
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestExecute(t *testing.T) {
func TestGetOperation(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
sc := scheduler.NewMockScheduler(mockCtrl)
sc := server.NewMockScheduler(mockCtrl)
mockSagaLog := saga.NewMockSagaLog(mockCtrl)
sagaC := saga.MakeSagaCoordinator(mockSagaLog)
mockSagaLog.EXPECT().GetMessages(gomock.Any()).Return([]saga.SagaMessage{}, nil)
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestGetOperation(t *testing.T) {
func TestCancelOperation(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
sc := scheduler.NewMockScheduler(mockCtrl)
sc := server.NewMockScheduler(mockCtrl)
mockSagaLog := saga.NewMockSagaLog(mockCtrl)
sagaC := saga.MakeSagaCoordinator(mockSagaLog)
var wg sync.WaitGroup
Expand Down
15 changes: 7 additions & 8 deletions bazel/execution/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
bazelthrift "github.com/twitter/scoot/bazel/execution/bazelapi/gen-go/bazel"
remoteexecution "github.com/twitter/scoot/bazel/remoteexecution"
scootproto "github.com/twitter/scoot/common/proto"
"github.com/twitter/scoot/sched"
"github.com/twitter/scoot/scootapi/gen-go/scoot"
"github.com/twitter/scoot/scheduler/api/thrift/gen-go/scoot"
"github.com/twitter/scoot/scheduler/domain"
)

func marshalAny(pb proto.Message) (*any.Any, error) {
Expand All @@ -43,25 +43,24 @@ func validateExecRequest(req *remoteexecution.ExecuteRequest) error {

// Extract Scoot-related job fields from request to populate a JobDef, and pass through bazel request
func execReqToScoot(req *remoteexecution.ExecuteRequest) (
result sched.JobDefinition, err error) {
result domain.JobDefinition, err error) {
if err := validateExecRequest(req); err != nil {
return result, err
}

// NOTE fixed to lowest priority in early stages of Bazel support
// ExecuteRequests do not have priority values, but the Action portion
// contains Platform Properties which can be used to specify arbitrary server-side behavior.
result.Priority = sched.P0
result.Tasks = []sched.TaskDefinition{}

// contains Platform Properties which can be used to specify arbitary server-side behavior.
result.Priority = domain.P0
result.Tasks = []domain.TaskDefinition{}
// Populate TaskDef and Command. Note that Argv and EnvVars are set with placeholders for these requests,
// per Bazel API this data must be made available by the client in the CAS before submitting this request.
// To prevent increasing load and complexity in the Scheduler, this lookup is done at run time on the Worker
// which is required to support CAS interactions.
// ActionDigest is added for convenience and universal availability
// ExecutionMetadata is seeded with current time of queueing
now := time.Now()
var task sched.TaskDefinition
var task domain.TaskDefinition
task.TaskID = fmt.Sprintf("%s_%s_%d", TaskIDPrefix, req.GetActionDigest(), now.Unix())
task.Command.Argv = []string{CommandDefault}
task.Command.EnvVars = make(map[string]string)
Expand Down
2 changes: 1 addition & 1 deletion bazel/execution/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/twitter/scoot/bazel"
remoteexecution "github.com/twitter/scoot/bazel/remoteexecution"
"github.com/twitter/scoot/scootapi/gen-go/scoot"
"github.com/twitter/scoot/scheduler/api/thrift/gen-go/scoot"
)

var rs *runStatus
Expand Down
13 changes: 7 additions & 6 deletions binaries/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ A location for site-specific binaries that inject dependencies.

Scoot is built with many libraries which will get assembled into several binaries. These binaries should be common, but each Scoot site may want to link in custom code to integrate with their own infrastructure. This directory holds binaries that do the dependency injection before calling libraries. Copy these and modify them to include your own implementations of interfaces.

* __apiserver__ - the Scoot apiserver service
* __bazel-integration__ - an integration test for scoot's grpc endpoints
* __bzutil__ - CLI client for scoot's grpc endpoints
* __recoverytest__ - an integration test for sagalog recovery
* __scoot-integration__ - an integration test for scoot's thrift endpoints
* __scoot-snapshot-db__ - CLI client for snapshot store (via apiserver)
* __setup-cloud-scoot__ - sets up local Scoot components (scheduler and worker), or sets up connection to remote ones
* __scheduler__ - the Scoot scheduler
* __workserver__ - the Scoot worker
* __daemon__ - local process that can act as a worker or scheduler proxy
* __scootapi__ - CLI client for Cloud Scoot API (scheduler)
* __workercl__ - CLI client for workers
* __scootcl__ - CLI client for daemon
* __minfs__ - TODO
* __workserver__ - the Scoot worker service
5 changes: 2 additions & 3 deletions binaries/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/config/jsonconfig"
"github.com/twitter/scoot/os/temp"
"github.com/twitter/scoot/scootapi"
"github.com/twitter/scoot/snapshot/bundlestore"
"github.com/twitter/scoot/snapshot/git/gitdb"
"github.com/twitter/scoot/snapshot/snapshots"
Expand All @@ -27,8 +26,8 @@ import (
func main() {
log.AddHook(hooks.NewContextHook())

httpAddr := flag.String("http_addr", scootapi.DefaultApiBundlestore_HTTP, "'host:port' addr to serve http on")
grpcAddr := flag.String("grpc_addr", scootapi.DefaultApiBundlestore_GRPC, "Bind address for grpc server")
httpAddr := flag.String("http_addr", bundlestore.DefaultApiBundlestore_HTTP, "'host:port' addr to serve http on")
grpcAddr := flag.String("grpc_addr", bundlestore.DefaultApiBundlestore_GRPC, "Bind address for grpc server")
configFlag := flag.String("config", "{}", "API Server Config (either a filename like local.local or JSON text")
logLevelFlag := flag.String("log_level", "info", "Log everything at this level and above (error|info|debug)")
cacheSize := flag.Int64("cache_size", 2*1024*1024*1024, "In-memory bundle cache size in bytes")
Expand Down
11 changes: 7 additions & 4 deletions binaries/bazel-integration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/twitter/scoot/common"
"github.com/twitter/scoot/common/log/hooks"
"github.com/twitter/scoot/os/temp"
"github.com/twitter/scoot/scootapi"
"github.com/twitter/scoot/scootapi/setup"
"github.com/twitter/scoot/scheduler"
"github.com/twitter/scoot/scheduler/setup"
"github.com/twitter/scoot/tests/testhelpers"
)

Expand All @@ -44,7 +44,7 @@ func main() {

// Initialize Local Cluster
log.Info("Creating test cluster")
scootClient := testhelpers.CreateScootClient(scootapi.DefaultSched_Thrift)
scootClient := testhelpers.CreateScootClient(scheduler.DefaultSched_Thrift)
clusterCmds, err := testhelpers.CreateLocalTestCluster()
if err != nil {
testhelpers.KillAndExit1(clusterCmds, fmt.Errorf("Unexpected Error while Setting up Local Cluster %v", err))
Expand Down Expand Up @@ -72,7 +72,10 @@ func main() {
}

func installBinaries() error {
testhelpers.InstallBinary("bzutil")
err := testhelpers.InstallBinaries()
if err != nil {
return err
}
b, err := exec.Command("sh", "scripts/get_fs_util.sh").CombinedOutput()
if err != nil {
log.Error(string(b))
Expand Down
Loading

0 comments on commit 9b02ac8

Please sign in to comment.