Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev tming #141

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package basic
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/booster/command"
dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/types"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
Expand Down Expand Up @@ -157,6 +159,12 @@ func (m *Mgr) Register(config *types.WorkRegisterConfig) error {
m.info.SetProjectID(config.Apply.ProjectID)
m.info.SetScene(config.Apply.Scene)

// 新加bazelNoLauncher状态
bazelNoLauncher := strings.Contains(config.Apply.Extra, command.FlagBazelNoLauncher)
m.info.SetBazelNoLauncher(bazelNoLauncher)
blog.Infof("basic: work(%s) project(%s) scene(%s) set bazelNoLauncher to %v",
m.work.ID(), config.Apply.ProjectID, config.Apply.Scene, bazelNoLauncher)

m.work.Resource().SetServerHost(config.ServerHost)
if len(config.SpecificHostList) > 0 {
m.work.Resource().SetSpecificHosts(config.SpecificHostList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (e *executor) executeLocalTask() *types.LocalTaskExecuteResult {
locallockweight = e.handler.LocalLockWeight(e.req.Commands)
}
if !e.lock(dcSDK.JobUsageLocalExe, locallockweight) {
blog.Infof("executor:failed to lock with local job usage(%s) weight %d", dcSDK.JobUsageLocalExe, locallockweight)
blog.Errorf("executor:failed to lock with local job usage(%s) weight %d", dcSDK.JobUsageLocalExe, locallockweight)
return &types.LocalTaskExecuteResult{
Result: &dcSDK.LocalTaskResult{
ExitCode: -1,
Expand Down
18 changes: 17 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"os"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/booster/command"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env"
dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/config"
Expand Down Expand Up @@ -99,6 +101,10 @@ func (m *mgr) RegisterWork(config *types.WorkRegisterConfig) (*types.WorkInfo, b
if work := m.worksPool.find(config.Apply.ProjectID, config.Apply.Scene, config.BatchMode); work != nil {
work.Lock()
info := work.Basic().Info()
bazelNoLauncher := strings.Contains(config.Apply.Extra, command.FlagBazelNoLauncher)
info.SetBazelNoLauncher(bazelNoLauncher)
blog.Infof("mgr: work(%s) project(%s) scene(%s) update bazelNoLauncher to %v",
work.ID(), config.Apply.ProjectID, config.Apply.Scene, bazelNoLauncher)
if info.CanBeHeartbeat() {
defer work.Unlock()
_ = work.Basic().Heartbeat()
Expand Down Expand Up @@ -367,7 +373,7 @@ func (m *mgr) ExecuteLocalTask(
var err error
if workID == dcSDK.EmptyWorkerID {
if m.conf.UseDefaultWorker {
work, err = m.worksPool.getFirstWork()
work, err = m.worksPool.getFirstBazelNoLauncherWork()
if err != nil {
blog.Errorf("mgr: get first work failed: %v", err)
return nil, err
Expand Down Expand Up @@ -890,3 +896,13 @@ func (m *mgr) GetFirstWorkID() (string, error) {
return "", err
}
}

// Get first bazel workid
func (m *mgr) GetFirstBazelNoLauncherWorkID() (string, error) {
work, err := m.worksPool.getFirstBazelNoLauncherWork()
if err == nil {
return work.ID(), nil
} else {
return "", err
}
}
45 changes: 34 additions & 11 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func newResource(hl []*dcProtocol.Host) *resource {
usageMap: usageMap,
lockChan: make(lockWorkerChan, 1000),
unlockChan: make(lockWorkerChan, 1000),
emptyChan: make(chan bool, 1000),
worker: wl,

waitingList: list.New(),
Expand All @@ -94,6 +95,9 @@ type resource struct {
lockChan lockWorkerChan
unlockChan lockWorkerChan

// trigger when worker change to empty
emptyChan chan bool

handling bool

// to save waiting requests
Expand Down Expand Up @@ -237,6 +241,10 @@ func (wr *resource) disableWorker(host *dcProtocol.Host) {
}
}

if wr.totalSlots <= 0 {
wr.emptyChan <- true
}

blog.Infof("remote slot: total slot:%d after disable host:%v", wr.totalSlots, *host)
return
}
Expand Down Expand Up @@ -274,6 +282,10 @@ func (wr *resource) workerDead(w *worker) {
}
}

if wr.totalSlots <= 0 {
wr.emptyChan <- true
}

blog.Infof("remote slot: total slot:%d after host is dead:%v", wr.totalSlots, w.host)
return
}
Expand All @@ -298,6 +310,10 @@ func (wr *resource) disableAllWorker() {
blog.Infof("remote slot: usage map:%v after disable all host", *v)
}

if wr.totalSlots <= 0 {
wr.emptyChan <- true
}

blog.Infof("remote slot: total slot:%d after disable all host", wr.totalSlots)
return
}
Expand Down Expand Up @@ -543,6 +559,8 @@ func (wr *resource) handleLock(ctx context.Context) {
wr.putSlot(msg)
case msg := <-wr.lockChan:
wr.getSlot(msg)
case <-wr.emptyChan:
wr.onSlotEmpty()
}
}
}
Expand Down Expand Up @@ -570,6 +588,11 @@ func (wr *resource) isIdle(set *usageWorkerSet) bool {
}

func (wr *resource) getSlot(msg lockWorkerMessage) {
if wr.totalSlots <= 0 {
msg.result <- nil
return
}

satisfied := false
usage := msg.jobUsage
if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 {
Expand Down Expand Up @@ -603,34 +626,34 @@ func (wr *resource) putSlot(msg lockWorkerMessage) {

// check whether other waiting is satisfied now
if wr.waitingList.Len() > 0 {
// index := 0
for e := wr.waitingList.Front(); e != nil; e = e.Next() {
// if index%2 == 0 {
msg := e.Value.(*lockWorkerMessage)
// usage := e.Value.(dcSDK.JobUsage)
// set := wr.getUsageSet(usage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
set.occupied++
wr.occupiedSlots++

msg.result <- wr.occupyWorkerSlots(msg.largeFile, []*dcProtocol.Host{})

// chanElement := e.Next()
// chanElement.Value.(chan *dcProtocol.Host) <- wr.occupyWorkerSlots()

// delete this element
wr.waitingList.Remove(e)
// wr.waitingList.Remove(chanElement)

break
}
// }
// index++
}
}
}

func (wr *resource) onSlotEmpty() {
blog.Infof("remote slot: on slot empty: occupy:%d,total:%d,waiting:%d", wr.occupiedSlots, wr.totalSlots, wr.waitingList.Len())

for wr.waitingList.Len() > 0 {
e := wr.waitingList.Front()
msg := e.Value.(*lockWorkerMessage)
msg.result <- nil
wr.waitingList.Remove(e)
}
}

// worker describe the worker information includes the host details and the slots status
// and it is the caller's responsibility to ensure the lock.
type worker struct {
Expand Down
18 changes: 17 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/manager/works_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,23 @@ func (wp *worksPool) getFirstWork() (*types.Work, error) {
defer wp.RUnlock()

for _, work := range wp.works {
return work, nil
// check whether work is valid
if work.Basic().Info().IsWorking() {
return work, nil
}
}

return nil, types.ErrNoWork
}

func (wp *worksPool) getFirstBazelNoLauncherWork() (*types.Work, error) {
wp.RLock()
defer wp.RUnlock()

for _, work := range wp.works {
if work.Basic().Info().IsWorking() && work.Basic().Info().BazelNoLauncher() {
return work, nil
}
}

return nil, types.ErrNoWork
Expand Down
3 changes: 3 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Mgr interface {

// Get first workid
GetFirstWorkID() (string, error)

// Get first bazelNoLauncher workid
GetFirstBazelNoLauncherWorkID() (string, error)
}

// RemoteMgr describe a manager for handling all actions with remote workers for work
Expand Down
12 changes: 12 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ type WorkInfo struct {
success bool
batchMode bool

bazelNoLauncher bool

lastHeartbeat time.Time

commonStatus *WorkCommonStatus
Expand Down Expand Up @@ -430,6 +432,16 @@ func (wi *WorkInfo) GetPrepared() int32 {
return atomic.LoadInt32(&wi.preparedRemoteTasks)
}

// BazelNoLauncher return bazelNoLauncher
func (wi WorkInfo) BazelNoLauncher() bool {
return wi.bazelNoLauncher
}

// SetBazelNoLauncher update bazelNoLauncher
func (wi *WorkInfo) SetBazelNoLauncher(flag bool) {
wi.bazelNoLauncher = flag
}

// WorkCommonStatus describe the work status and actions timestamp
type WorkCommonStatus struct {
Status dcSDK.WorkStatus
Expand Down
7 changes: 0 additions & 7 deletions src/backend/booster/bk_dist/executor/pkg/dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,6 @@ func (d *DistExecutor) runWork() (int, string, error) {
return 0, "", fmt.Errorf("not enough args to execute")
}

// ignore argv[0], it's itself
// _, _, r, err := d.work.Job(d.stats).ExecuteLocalTask(os.Args[1:], "")
// if err != nil {
// blog.Errorf("executor: execute failed, error: %v, exit code: -1", err)
// return -1, err
// }

retcode, retmsg, r, err := d.work.Job(d.stats).ExecuteLocalTask(os.Args[1:], "")
if err != nil || retcode != 0 {
if r != nil {
Expand Down
26 changes: 14 additions & 12 deletions src/backend/booster/bk_dist/handler/ue4/cc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,20 @@ var (

// skip options start with flags
skipLocalOptionStartWith = map[string]bool{
"-Wp,": true,
"-Wl,": true,
"-D": true,
"-I": true,
"-U": true,
"-L": true,
"-l": true,
"-MF": true,
"-MT": true,
"-MQ": true,
"-isystem": true,
"@": true, // such as @"..\XXX\XXX.rsp"
"-Wp,": true,
"-Wl,": true,
"-D": true,
"-I": true,
"-U": true,
"-L": true,
"-l": true,
"-MF": true,
"-MT": true,
"-MQ": true,
"-isystem": true,
"@": true, // such as @"..\XXX\XXX.rsp"
"--gcc-toolchain": true,
"--sysroot": true,
}
)

Expand Down