Skip to content

Commit fb8c09a

Browse files
committed
cmd/coordinator: finish the scheduler code, at least mostly
Optimizations and tuning remain, but this should be tons better than what we had before (random). Updates golang/go#19178 Change-Id: Idb483a4c4209a012814322cc8b37b966ee4681de Reviewed-on: https://go-review.googlesource.com/c/build/+/205078 Reviewed-by: Bryan C. Mills <bcmills@google.com>
1 parent f993d8f commit fb8c09a

File tree

10 files changed

+369
-246
lines changed

10 files changed

+369
-246
lines changed

cmd/coordinator/coordinator.go

Lines changed: 82 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,9 @@ func main() {
325325
}
326326
}()
327327

328-
workc := make(chan buildgo.BuilderRev)
329-
330328
if *mode == "dev" {
331329
// TODO(crawshaw): do more in dev mode
332330
gcePool.SetEnabled(*devEnableGCE)
333-
http.HandleFunc("/dosomework/", handleDoSomeWork(workc))
334331
} else {
335332
go gcePool.cleanUpOldVMs()
336333
if kubeErr == nil {
@@ -342,7 +339,7 @@ func main() {
342339
}
343340

344341
go listenAndServeInternalModuleProxy()
345-
go findWorkLoop(workc)
342+
go findWorkLoop()
346343
go findTryWorkLoop()
347344
go reportMetrics(context.Background())
348345
// TODO(cmang): gccgo will need its own findWorkLoop
@@ -351,22 +348,38 @@ func main() {
351348
go listenAndServeTLS()
352349
go listenAndServeSSH() // ssh proxy to remote buildlets; remote.go
353350

354-
for {
355-
work := <-workc
356-
if !mayBuildRev(work) {
357-
if inStaging {
358-
if _, ok := dashboard.Builders[work.Name]; ok && logCantBuildStaging.Allow() {
359-
log.Printf("may not build %v; skipping", work)
360-
}
351+
select {}
352+
}
353+
354+
// ignoreAllNewWork, when true, prevents addWork from doing anything.
355+
// It's sometimes set in staging mode when people are debugging
356+
// certain paths.
357+
var ignoreAllNewWork bool
358+
359+
// addWorkTestHook is optionally set by tests.
360+
var addWorkTestHook func(work buildgo.BuilderRev)
361+
362+
func addWork(work buildgo.BuilderRev) {
363+
if f := addWorkTestHook; f != nil {
364+
f(work)
365+
return
366+
}
367+
if ignoreAllNewWork || isBuilding(work) {
368+
return
369+
}
370+
if !mayBuildRev(work) {
371+
if inStaging {
372+
if _, ok := dashboard.Builders[work.Name]; ok && logCantBuildStaging.Allow() {
373+
log.Printf("may not build %v; skipping", work)
361374
}
362-
continue
363-
}
364-
st, err := newBuild(work)
365-
if err != nil {
366-
log.Printf("Bad build work params %v: %v", work, err)
367-
} else {
368-
st.start()
369375
}
376+
return
377+
}
378+
st, err := newBuild(work)
379+
if err != nil {
380+
log.Printf("Bad build work params %v: %v", work, err)
381+
} else {
382+
st.start()
370383
}
371384
}
372385

@@ -811,19 +824,21 @@ func workaroundFlush(w http.ResponseWriter) {
811824
w.(http.Flusher).Flush()
812825
}
813826

814-
// findWorkLoop polls https://build.golang.org/?mode=json looking for new work
815-
// for the main dashboard. It does not support gccgo.
816-
func findWorkLoop(work chan<- buildgo.BuilderRev) {
827+
// findWorkLoop polls https://build.golang.org/?mode=json looking for
828+
// new post-submit work for the main dashboard. It does not support
829+
// gccgo. This is separate from trybots, which populates its work from
830+
// findTryWorkLoop.
831+
func findWorkLoop() {
817832
// Useful for debugging a single run:
818833
if inStaging && false {
819834
const debugSubrepo = false
820835
if debugSubrepo {
821-
work <- buildgo.BuilderRev{
836+
addWork(buildgo.BuilderRev{
822837
Name: "linux-arm",
823838
Rev: "c9778ec302b2e0e0d6027e1e0fca892e428d9657",
824839
SubName: "tools",
825840
SubRev: "ac303766f5f240c1796eeea3dc9bf34f1261aa35",
826-
}
841+
})
827842
}
828843
const debugArm = false
829844
if debugArm {
@@ -832,31 +847,29 @@ func findWorkLoop(work chan<- buildgo.BuilderRev) {
832847
time.Sleep(time.Second)
833848
}
834849
log.Printf("ARM machine(s) registered.")
835-
work <- buildgo.BuilderRev{Name: "linux-arm", Rev: "3129c67db76bc8ee13a1edc38a6c25f9eddcbc6c"}
850+
addWork(buildgo.BuilderRev{Name: "linux-arm", Rev: "3129c67db76bc8ee13a1edc38a6c25f9eddcbc6c"})
836851
} else {
837-
work <- buildgo.BuilderRev{Name: "linux-amd64", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"}
838-
work <- buildgo.BuilderRev{Name: "linux-amd64-sid", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"}
839-
work <- buildgo.BuilderRev{Name: "linux-amd64-clang", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"}
852+
addWork(buildgo.BuilderRev{Name: "linux-amd64", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"})
853+
addWork(buildgo.BuilderRev{Name: "linux-amd64-sid", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"})
854+
addWork(buildgo.BuilderRev{Name: "linux-amd64-clang", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"})
840855
}
841-
842-
// Still run findWork but ignore what it does.
843-
ignore := make(chan buildgo.BuilderRev)
844-
go func() {
845-
for range ignore {
846-
}
847-
}()
848-
work = ignore
856+
ignoreAllNewWork = true
849857
}
858+
// TODO: remove this hard-coded 15 second ticker and instead
859+
// do some new streaming gRPC call to maintnerd to subscribe
860+
// to new commits.
850861
ticker := time.NewTicker(15 * time.Second)
851862
for {
852-
if err := findWork(work); err != nil {
863+
if err := findWork(); err != nil {
853864
log.Printf("failed to find new work: %v", err)
854865
}
855866
<-ticker.C
856867
}
857868
}
858869

859-
func findWork(work chan<- buildgo.BuilderRev) error {
870+
// findWork polls the https://build.golang.org/ dashboard once to find
871+
// post-submit work to do. It's called in a loop by findWorkLoop.
872+
func findWork() error {
860873
var bs types.BuildStatus
861874
if err := dash("GET", "", url.Values{"mode": {"json"}}, nil, &bs); err != nil {
862875
return err
@@ -881,6 +894,7 @@ func findWork(work chan<- buildgo.BuilderRev) error {
881894
// 15 seconds, but they should be skewed toward new work.
882895
// This depends on the build dashboard sending back the list
883896
// of empty slots newest first (matching the order on the main screen).
897+
// TODO: delete this code when the scheduler is on by default.
884898
sent := map[string]bool{}
885899

886900
var goRevisions []string // revisions of repo "go", branch "master" revisions
@@ -951,12 +965,18 @@ func findWork(work chan<- buildgo.BuilderRev) error {
951965
}
952966
}
953967

954-
// The !sent[builder] here is a clumsy attempt at priority scheduling
955-
// and probably should be replaced at some point with a better solution.
956-
// See golang.org/issue/19178 and the long comment above.
957-
if !isBuilding(rev) && !sent[builder] {
958-
sent[builder] = true
959-
work <- rev
968+
if useScheduler {
969+
addWork(rev)
970+
} else {
971+
// The !sent[builder] here is a clumsy attempt at priority scheduling
972+
// and probably should be replaced at some point with a better solution.
973+
// See golang.org/issue/19178 and the long comment above.
974+
// TODO: delete all this code and the sent map above when the
975+
// useScheduler const is removed.
976+
if !sent[builder] {
977+
sent[builder] = true
978+
addWork(rev)
979+
}
960980
}
961981
}
962982
}
@@ -969,10 +989,7 @@ func findWork(work chan<- buildgo.BuilderRev) error {
969989
continue
970990
}
971991
for _, rev := range goRevisions {
972-
br := buildgo.BuilderRev{Name: b, Rev: rev}
973-
if !isBuilding(br) {
974-
work <- br
975-
}
992+
addWork(buildgo.BuilderRev{Name: b, Rev: rev})
976993
}
977994
}
978995
return nil
@@ -1530,30 +1547,26 @@ type BuildletPool interface {
15301547
// and highPriorityOpt.
15311548
GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error)
15321549

1533-
// HasCapacity reports whether the buildlet pool has
1534-
// quota/capacity to create a buildlet of the provided host
1535-
// type. This should return as fast as possible and err on
1536-
// the side of returning false.
1537-
HasCapacity(hostType string) bool
1538-
15391550
String() string // TODO(bradfitz): more status stuff
15401551
}
15411552

1542-
// GetBuildlets creates up to n buildlets and sends them on the returned channel
1553+
// getBuildlets creates up to n buildlets and sends them on the returned channel
15431554
// before closing the channel.
1544-
func GetBuildlets(ctx context.Context, pool BuildletPool, n int, hostType string, lg logger) <-chan *buildlet.Client {
1555+
func getBuildlets(ctx context.Context, n int, schedTmpl *SchedItem, lg logger) <-chan *buildlet.Client {
15451556
ch := make(chan *buildlet.Client) // NOT buffered
15461557
var wg sync.WaitGroup
15471558
wg.Add(n)
15481559
for i := 0; i < n; i++ {
15491560
go func(i int) {
15501561
defer wg.Done()
15511562
sp := lg.CreateSpan("get_helper", fmt.Sprintf("helper %d/%d", i+1, n))
1552-
bc, err := pool.GetBuildlet(ctx, hostType, lg)
1563+
schedItem := *schedTmpl // copy; GetBuildlet takes ownership
1564+
schedItem.IsHelper = i > 0
1565+
bc, err := sched.GetBuildlet(ctx, lg, &schedItem)
15531566
sp.Done(err)
15541567
if err != nil {
15551568
if err != context.Canceled {
1556-
log.Printf("failed to get a %s buildlet: %v", hostType, err)
1569+
log.Printf("failed to get a %s buildlet: %v", schedItem.HostType, err)
15571570
}
15581571
return
15591572
}
@@ -1574,9 +1587,9 @@ func GetBuildlets(ctx context.Context, pool BuildletPool, n int, hostType string
15741587
return ch
15751588
}
15761589

1577-
var testPoolHook func(*dashboard.BuildConfig) BuildletPool
1590+
var testPoolHook func(*dashboard.HostConfig) BuildletPool
15781591

1579-
func poolForConf(conf *dashboard.BuildConfig) BuildletPool {
1592+
func poolForConf(conf *dashboard.HostConfig) BuildletPool {
15801593
if testPoolHook != nil {
15811594
return testPoolHook(conf)
15821595
}
@@ -1589,10 +1602,10 @@ func poolForConf(conf *dashboard.BuildConfig) BuildletPool {
15891602
} else {
15901603
return kubePool
15911604
}
1592-
case conf.IsReverse():
1605+
case conf.IsReverse:
15931606
return reversePool
15941607
default:
1595-
panic(fmt.Sprintf("no buildlet pool for builder type %q", conf.Name))
1608+
panic(fmt.Sprintf("no buildlet pool for host type %q", conf.HostType))
15961609
}
15971610
}
15981611

@@ -1641,7 +1654,7 @@ func (st *buildStatus) start() {
16411654
}
16421655

16431656
func (st *buildStatus) buildletPool() BuildletPool {
1644-
return poolForConf(st.conf)
1657+
return poolForConf(st.conf.HostConfig())
16451658
}
16461659

16471660
// parentRev returns the parent of this build's commit (but only if this build comes from a trySet).
@@ -1721,8 +1734,12 @@ func (st *buildStatus) getHelpers() <-chan *buildlet.Client {
17211734
}
17221735

17231736
func (st *buildStatus) onceInitHelpersFunc() {
1724-
pool := st.buildletPool()
1725-
st.helpers = GetBuildlets(st.ctx, pool, st.conf.NumTestHelpers(st.isTry()), st.conf.HostType, st)
1737+
schedTmpl := &SchedItem{
1738+
BuilderRev: st.BuilderRev,
1739+
HostType: st.conf.HostType,
1740+
IsTry: st.isTry(),
1741+
}
1742+
st.helpers = getBuildlets(st.ctx, st.conf.NumTestHelpers(st.isTry()), schedTmpl, st)
17261743
}
17271744

17281745
// useSnapshot reports whether this type of build uses a snapshot of
@@ -1835,11 +1852,9 @@ func (st *buildStatus) build() error {
18351852
}
18361853

18371854
sp = st.CreateSpan("get_buildlet")
1838-
pool := st.buildletPool()
18391855
bc, err := sched.GetBuildlet(st.ctx, st, &SchedItem{
18401856
HostType: st.conf.HostType,
18411857
IsTry: st.trySet != nil,
1842-
Pool: pool,
18431858
BuilderRev: st.BuilderRev,
18441859
})
18451860
sp.Done(err)
@@ -1980,7 +1995,7 @@ func (st *buildStatus) buildRecord() *types.BuildRecord {
19801995

19811996
// Log whether we used COS, so we can do queries to analyze
19821997
// Kubernetes vs COS performance for containers.
1983-
if st.conf.IsContainer() && poolForConf(st.conf) == gcePool {
1998+
if st.conf.IsContainer() && poolForConf(st.conf.HostConfig()) == gcePool {
19841999
rec.ContainerHost = "cos"
19852000
}
19862001

@@ -2099,7 +2114,6 @@ func (st *buildStatus) crossCompileMakeAndSnapshot(config *dashboard.CrossCompil
20992114
kubeBC, err := sched.GetBuildlet(ctx, st, &SchedItem{
21002115
HostType: config.CompileHostType,
21012116
IsTry: st.trySet != nil,
2102-
Pool: kubePool,
21032117
BuilderRev: st.BuilderRev,
21042118
})
21052119
sp.Done(err)

cmd/coordinator/coordinator_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,14 @@ func TestFindWork(t *testing.T) {
189189
return false
190190
}
191191

192-
c := make(chan buildgo.BuilderRev, 1000)
193-
go func() {
194-
defer close(c)
195-
err := findWork(c)
196-
if err != nil {
197-
t.Error(err)
198-
}
199-
}()
200-
for br := range c {
201-
t.Logf("Got: %v", br)
192+
addWorkTestHook = func(work buildgo.BuilderRev) {
193+
t.Logf("Got: %v", work)
194+
}
195+
defer func() { addWorkTestHook = nil }()
196+
197+
err := findWork()
198+
if err != nil {
199+
t.Error(err)
202200
}
203201
}
204202

cmd/coordinator/gce.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -423,17 +423,6 @@ func (p *gceBuildletPool) awaitVMCountQuota(ctx context.Context, numCPU int) err
423423
}
424424
}
425425

426-
func (p *gceBuildletPool) HasCapacity(hostType string) bool {
427-
hconf, ok := dashboard.Hosts[hostType]
428-
if !ok {
429-
return false
430-
}
431-
numCPU := hconf.GCENumCPU()
432-
p.mu.Lock()
433-
defer p.mu.Unlock()
434-
return p.haveQuotaLocked(numCPU)
435-
}
436-
437426
// haveQuotaLocked reports whether the current GCE quota permits
438427
// starting numCPU more CPUs.
439428
//

cmd/coordinator/kube.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,6 @@ func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
209209

210210
}
211211

212-
func (p *kubeBuildletPool) HasCapacity(hostType string) bool {
213-
// TODO: implement. But for now we don't care because we only
214-
// use the kubePool for the cross-compiled builds and we have
215-
// very few hostTypes for those, and only one (ARM) that's
216-
// used day-to-day. So it's okay if we lie here and always try
217-
// to create buildlets. The scheduler will still give created
218-
// buildlets to the highest priority waiter.
219-
return true
220-
}
221-
222212
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
223213
hconf, ok := dashboard.Hosts[hostType]
224214
if !ok || !hconf.IsContainer() {

cmd/coordinator/remote.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ func handleBuildletCreate(w http.ResponseWriter, r *http.Request) {
139139
return
140140
}
141141
user, _, _ := r.BasicAuth()
142-
pool := poolForConf(bconf)
143142

144143
var closeNotify <-chan bool
145144
if cn, ok := w.(http.CloseNotifier); ok {
@@ -160,13 +159,17 @@ func handleBuildletCreate(w http.ResponseWriter, r *http.Request) {
160159
resc := make(chan *buildlet.Client)
161160
errc := make(chan error)
162161
go func() {
163-
bc, err := pool.GetBuildlet(ctx, bconf.HostType, loggerFunc(func(event string, optText ...string) {
162+
lgFunc := loggerFunc(func(event string, optText ...string) {
164163
var extra string
165164
if len(optText) > 0 {
166165
extra = " " + optText[0]
167166
}
168167
log.Printf("creating buildlet %s for %s: %s%s", bconf.HostType, user, event, extra)
169-
}))
168+
})
169+
bc, err := sched.GetBuildlet(ctx, lgFunc, &SchedItem{
170+
HostType: bconf.HostType,
171+
IsGomote: true,
172+
})
170173
if bc != nil {
171174
resc <- bc
172175
return

0 commit comments

Comments
 (0)