Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev tming #141

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -12,10 +12,12 @@ package basic
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/booster/command"
dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/types"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
@@ -157,6 +159,12 @@ func (m *Mgr) Register(config *types.WorkRegisterConfig) error {
m.info.SetProjectID(config.Apply.ProjectID)
m.info.SetScene(config.Apply.Scene)

// 新加bazelNoLauncher状态
bazelNoLauncher := strings.Contains(config.Apply.Extra, command.FlagBazelNoLauncher)
m.info.SetBazelNoLauncher(bazelNoLauncher)
blog.Infof("basic: work(%s) project(%s) scene(%s) set bazelNoLauncher to %v",
m.work.ID(), config.Apply.ProjectID, config.Apply.Scene, bazelNoLauncher)

m.work.Resource().SetServerHost(config.ServerHost)
if len(config.SpecificHostList) > 0 {
m.work.Resource().SetSpecificHosts(config.SpecificHostList)
Original file line number Diff line number Diff line change
@@ -271,7 +271,7 @@ func (e *executor) executeLocalTask() *types.LocalTaskExecuteResult {
locallockweight = e.handler.LocalLockWeight(e.req.Commands)
}
if !e.lock(dcSDK.JobUsageLocalExe, locallockweight) {
blog.Infof("executor:failed to lock with local job usage(%s) weight %d", dcSDK.JobUsageLocalExe, locallockweight)
blog.Errorf("executor:failed to lock with local job usage(%s) weight %d", dcSDK.JobUsageLocalExe, locallockweight)
return &types.LocalTaskExecuteResult{
Result: &dcSDK.LocalTaskResult{
ExitCode: -1,
18 changes: 17 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/manager/manager.go
Original file line number Diff line number Diff line change
@@ -14,10 +14,12 @@ import (
"os"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/booster/command"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env"
dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/config"
@@ -99,6 +101,10 @@ func (m *mgr) RegisterWork(config *types.WorkRegisterConfig) (*types.WorkInfo, b
if work := m.worksPool.find(config.Apply.ProjectID, config.Apply.Scene, config.BatchMode); work != nil {
work.Lock()
info := work.Basic().Info()
bazelNoLauncher := strings.Contains(config.Apply.Extra, command.FlagBazelNoLauncher)
info.SetBazelNoLauncher(bazelNoLauncher)
blog.Infof("mgr: work(%s) project(%s) scene(%s) update bazelNoLauncher to %v",
work.ID(), config.Apply.ProjectID, config.Apply.Scene, bazelNoLauncher)
if info.CanBeHeartbeat() {
defer work.Unlock()
_ = work.Basic().Heartbeat()
@@ -367,7 +373,7 @@ func (m *mgr) ExecuteLocalTask(
var err error
if workID == dcSDK.EmptyWorkerID {
if m.conf.UseDefaultWorker {
work, err = m.worksPool.getFirstWork()
work, err = m.worksPool.getFirstBazelNoLauncherWork()
if err != nil {
blog.Errorf("mgr: get first work failed: %v", err)
return nil, err
@@ -890,3 +896,13 @@ func (m *mgr) GetFirstWorkID() (string, error) {
return "", err
}
}

// Get first bazel workid
func (m *mgr) GetFirstBazelNoLauncherWorkID() (string, error) {
work, err := m.worksPool.getFirstBazelNoLauncherWork()
if err == nil {
return work.ID(), nil
} else {
return "", err
}
}
45 changes: 34 additions & 11 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ func newResource(hl []*dcProtocol.Host) *resource {
usageMap: usageMap,
lockChan: make(lockWorkerChan, 1000),
unlockChan: make(lockWorkerChan, 1000),
emptyChan: make(chan bool, 1000),
worker: wl,

waitingList: list.New(),
@@ -94,6 +95,9 @@ type resource struct {
lockChan lockWorkerChan
unlockChan lockWorkerChan

// trigger when worker change to empty
emptyChan chan bool

handling bool

// to save waiting requests
@@ -237,6 +241,10 @@ func (wr *resource) disableWorker(host *dcProtocol.Host) {
}
}

if wr.totalSlots <= 0 {
wr.emptyChan <- true
}

blog.Infof("remote slot: total slot:%d after disable host:%v", wr.totalSlots, *host)
return
}
@@ -274,6 +282,10 @@ func (wr *resource) workerDead(w *worker) {
}
}

if wr.totalSlots <= 0 {
wr.emptyChan <- true
}

blog.Infof("remote slot: total slot:%d after host is dead:%v", wr.totalSlots, w.host)
return
}
@@ -298,6 +310,10 @@ func (wr *resource) disableAllWorker() {
blog.Infof("remote slot: usage map:%v after disable all host", *v)
}

if wr.totalSlots <= 0 {
wr.emptyChan <- true
}

blog.Infof("remote slot: total slot:%d after disable all host", wr.totalSlots)
return
}
@@ -543,6 +559,8 @@ func (wr *resource) handleLock(ctx context.Context) {
wr.putSlot(msg)
case msg := <-wr.lockChan:
wr.getSlot(msg)
case <-wr.emptyChan:
wr.onSlotEmpty()
}
}
}
@@ -570,6 +588,11 @@ func (wr *resource) isIdle(set *usageWorkerSet) bool {
}

func (wr *resource) getSlot(msg lockWorkerMessage) {
if wr.totalSlots <= 0 {
msg.result <- nil
return
}

satisfied := false
usage := msg.jobUsage
if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 {
@@ -603,34 +626,34 @@ func (wr *resource) putSlot(msg lockWorkerMessage) {

// check whether other waiting is satisfied now
if wr.waitingList.Len() > 0 {
// index := 0
for e := wr.waitingList.Front(); e != nil; e = e.Next() {
// if index%2 == 0 {
msg := e.Value.(*lockWorkerMessage)
// usage := e.Value.(dcSDK.JobUsage)
// set := wr.getUsageSet(usage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
set.occupied++
wr.occupiedSlots++

msg.result <- wr.occupyWorkerSlots(msg.largeFile, []*dcProtocol.Host{})

// chanElement := e.Next()
// chanElement.Value.(chan *dcProtocol.Host) <- wr.occupyWorkerSlots()

// delete this element
wr.waitingList.Remove(e)
// wr.waitingList.Remove(chanElement)

break
}
// }
// index++
}
}
}

func (wr *resource) onSlotEmpty() {
blog.Infof("remote slot: on slot empty: occupy:%d,total:%d,waiting:%d", wr.occupiedSlots, wr.totalSlots, wr.waitingList.Len())

for wr.waitingList.Len() > 0 {
e := wr.waitingList.Front()
msg := e.Value.(*lockWorkerMessage)
msg.result <- nil
wr.waitingList.Remove(e)
}
}

// worker describe the worker information includes the host details and the slots status
// and it is the caller's responsibility to ensure the lock.
type worker struct {
18 changes: 17 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/manager/works_pool.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,23 @@ func (wp *worksPool) getFirstWork() (*types.Work, error) {
defer wp.RUnlock()

for _, work := range wp.works {
return work, nil
// check whether work is valid
if work.Basic().Info().IsWorking() {
return work, nil
}
}

return nil, types.ErrNoWork
}

func (wp *worksPool) getFirstBazelNoLauncherWork() (*types.Work, error) {
wp.RLock()
defer wp.RUnlock()

for _, work := range wp.works {
if work.Basic().Info().IsWorking() && work.Basic().Info().BazelNoLauncher() {
return work, nil
}
}

return nil, types.ErrNoWork
3 changes: 3 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/interface.go
Original file line number Diff line number Diff line change
@@ -84,6 +84,9 @@ type Mgr interface {

// Get first workid
GetFirstWorkID() (string, error)

// Get first bazelNoLauncher workid
GetFirstBazelNoLauncherWorkID() (string, error)
}

// RemoteMgr describe a manager for handling all actions with remote workers for work
12 changes: 12 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/work.go
Original file line number Diff line number Diff line change
@@ -197,6 +197,8 @@ type WorkInfo struct {
success bool
batchMode bool

bazelNoLauncher bool

lastHeartbeat time.Time

commonStatus *WorkCommonStatus
@@ -430,6 +432,16 @@ func (wi *WorkInfo) GetPrepared() int32 {
return atomic.LoadInt32(&wi.preparedRemoteTasks)
}

// BazelNoLauncher return bazelNoLauncher
func (wi WorkInfo) BazelNoLauncher() bool {
return wi.bazelNoLauncher
}

// SetBazelNoLauncher update bazelNoLauncher
func (wi *WorkInfo) SetBazelNoLauncher(flag bool) {
wi.bazelNoLauncher = flag
}

// WorkCommonStatus describe the work status and actions timestamp
type WorkCommonStatus struct {
Status dcSDK.WorkStatus
7 changes: 0 additions & 7 deletions src/backend/booster/bk_dist/executor/pkg/dist_executor.go
Original file line number Diff line number Diff line change
@@ -180,13 +180,6 @@ func (d *DistExecutor) runWork() (int, string, error) {
return 0, "", fmt.Errorf("not enough args to execute")
}

// ignore argv[0], it's itself
// _, _, r, err := d.work.Job(d.stats).ExecuteLocalTask(os.Args[1:], "")
// if err != nil {
// blog.Errorf("executor: execute failed, error: %v, exit code: -1", err)
// return -1, err
// }

retcode, retmsg, r, err := d.work.Job(d.stats).ExecuteLocalTask(os.Args[1:], "")
if err != nil || retcode != 0 {
if r != nil {
26 changes: 14 additions & 12 deletions src/backend/booster/bk_dist/handler/ue4/cc/utils.go
Original file line number Diff line number Diff line change
@@ -513,18 +513,20 @@ var (

// skip options start with flags
skipLocalOptionStartWith = map[string]bool{
"-Wp,": true,
"-Wl,": true,
"-D": true,
"-I": true,
"-U": true,
"-L": true,
"-l": true,
"-MF": true,
"-MT": true,
"-MQ": true,
"-isystem": true,
"@": true, // such as @"..\XXX\XXX.rsp"
"-Wp,": true,
"-Wl,": true,
"-D": true,
"-I": true,
"-U": true,
"-L": true,
"-l": true,
"-MF": true,
"-MT": true,
"-MQ": true,
"-isystem": true,
"@": true, // such as @"..\XXX\XXX.rsp"
"--gcc-toolchain": true,
"--sysroot": true,
}
)