Skip to content

Commit

Permalink
feat(promtail): fix TargetManager.run() not exit
Browse files Browse the repository at this point in the history
  • Loading branch information
littlepangdi committed Jan 28, 2022
1 parent d0c6e3d commit cf54848
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
27 changes: 18 additions & 9 deletions clients/pkg/promtail/targets/docker/targetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TargetManager struct {
logger log.Logger
positions positions.Positions
cancel context.CancelFunc
done chan struct{}
manager *discovery.Manager
pushClient api.EntryHandler
groups map[string]*targetGroup
Expand All @@ -47,6 +48,7 @@ func NewTargetManager(
metrics: metrics,
logger: logger,
cancel: cancel,
done: make(chan struct{}),
positions: positions,
manager: discovery.NewManager(ctx, log.With(logger, "component", "docker_discovery")),
pushClient: pushClient,
Expand Down Expand Up @@ -87,22 +89,28 @@ func NewTargetManager(
}
}

go tm.run()
go tm.run(ctx)
go util.LogError("running target manager", tm.manager.Run)

return tm, tm.manager.ApplyConfig(configs)
}

// run listens on the service discovery and adds new targets.
func (tm *TargetManager) run() {
for targetGroups := range tm.manager.SyncCh() {
for jobName, groups := range targetGroups {
tg, ok := tm.groups[jobName]
if !ok {
level.Debug(tm.logger).Log("msg", "unknown target for job", "job", jobName)
continue
func (tm *TargetManager) run(ctx context.Context) {
defer close(tm.done)
for {
select {
case targetGroups := <-tm.manager.SyncCh():
for jobName, groups := range targetGroups {
tg, ok := tm.groups[jobName]
if !ok {
level.Debug(tm.logger).Log("msg", "unknown target for job", "job", jobName)
continue
}
tg.sync(groups)
}
tg.sync(groups)
case <-ctx.Done():
return
}
}
}
Expand All @@ -119,6 +127,7 @@ func (tm *TargetManager) Ready() bool {

func (tm *TargetManager) Stop() {
tm.cancel()
<-tm.done
for _, s := range tm.groups {
s.Stop()
}
Expand Down
19 changes: 14 additions & 5 deletions clients/pkg/promtail/targets/file/filetargetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
type FileTargetManager struct {
log log.Logger
quit context.CancelFunc
done chan struct{}
syncers map[string]*targetSyncer
manager *discovery.Manager

Expand Down Expand Up @@ -69,6 +70,7 @@ func NewFileTargetManager(
tm := &FileTargetManager{
log: logger,
quit: quit,
done: make(chan struct{}),
watcher: watcher,
targetEventHandler: make(chan fileTargetEvent),
syncers: map[string]*targetSyncer{},
Expand Down Expand Up @@ -132,7 +134,7 @@ func NewFileTargetManager(
configs[cfg.JobName] = cfg.ServiceDiscoveryConfig.Configs()
}

go tm.run()
go tm.run(ctx)
go tm.watch(ctx)
go util.LogError("running target manager", tm.manager.Run)

Expand Down Expand Up @@ -168,10 +170,16 @@ func (tm *FileTargetManager) watch(ctx context.Context) {
}
}

func (tm *FileTargetManager) run() {
for targetGroups := range tm.manager.SyncCh() {
for jobName, groups := range targetGroups {
tm.syncers[jobName].sync(groups, tm.targetEventHandler)
func (tm *FileTargetManager) run(ctx context.Context) {
defer close(tm.done)
for {
select {
case targetGroups := <-tm.manager.SyncCh():
for jobName, groups := range targetGroups {
tm.syncers[jobName].sync(groups, tm.targetEventHandler)
}
case <-ctx.Done():
return
}
}
}
Expand All @@ -189,6 +197,7 @@ func (tm *FileTargetManager) Ready() bool {
// Stop the TargetManager.
func (tm *FileTargetManager) Stop() {
tm.quit()
<-tm.done
for _, s := range tm.syncers {
s.stop()
}
Expand Down

0 comments on commit cf54848

Please sign in to comment.