forked from open-telemetry/opamp-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- The Supervisor can manage OpenTelemetry Collector. - Demonstrates basic features: applying config, configuring Collector to collect its own metrics. TODO: - Find a way to fetch Collector version instead of hard-coding it. - Set instance id in the Collector config file to make sure OpAMP and Collector metrics use the same instance id. (Related open issue open-telemetry/opentelemetry-collector#4272) - Re-think callbacks to avoid unnecessary restarts (See open-telemetry#77) - Add a way for Supervisor to understand why the Collector process exited unexpectedly.
- Loading branch information
1 parent
2a41cf9
commit b438fb1
Showing
9 changed files
with
735 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
agent.log | ||
effective.yaml | ||
supervisor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
server: | ||
endpoint: ws://127.0.0.1:4320/v1/opamp | ||
|
||
agent: | ||
executable: ../../../../../collector-contrib/bin/otelcontribcol_darwin_amd64 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
"os" | ||
"os/signal" | ||
|
||
"github.com/open-telemetry/opamp-go/internal/examples/supervisor/supervisor" | ||
) | ||
|
||
func main() { | ||
logger := &supervisor.Logger{Logger: log.Default()} | ||
supervisor, err := supervisor.NewSupervisor(logger) | ||
if err != nil { | ||
logger.Errorf(err.Error()) | ||
os.Exit(-1) | ||
return | ||
} | ||
|
||
interrupt := make(chan os.Signal, 1) | ||
signal.Notify(interrupt, os.Interrupt) | ||
<-interrupt | ||
supervisor.Shutdown() | ||
} |
158 changes: 158 additions & 0 deletions
158
internal/examples/supervisor/supervisor/commander/commander.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
package commander | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
"os/exec" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/open-telemetry/opamp-go/client/types" | ||
"github.com/open-telemetry/opamp-go/internal/examples/supervisor/supervisor/config" | ||
) | ||
|
||
// Commander can start/stop/restat the Agent executable and also watch for a signal | ||
// for the Agent process to finish. | ||
type Commander struct { | ||
logger types.Logger | ||
cfg *config.Agent | ||
args []string | ||
cmd *exec.Cmd | ||
doneCh chan struct{} | ||
waitCh chan struct{} | ||
} | ||
|
||
func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Commander, error) { | ||
if cfg.Executable == "" { | ||
return nil, errors.New("agent.executable config option must be specified") | ||
} | ||
|
||
return &Commander{ | ||
logger: logger, | ||
cfg: cfg, | ||
args: args, | ||
}, nil | ||
} | ||
|
||
// Start the Agent and begin watching the process. | ||
// Agent's stdout and stderr are written to a file. | ||
func (c *Commander) Start(ctx context.Context) error { | ||
c.logger.Debugf("Starting agent %s", c.cfg.Executable) | ||
|
||
logFilePath := "agent.log" | ||
logFile, err := os.Create(logFilePath) | ||
if err != nil { | ||
return fmt.Errorf("cannot create %s: %s", logFilePath, err.Error()) | ||
} | ||
|
||
c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) | ||
|
||
// Capture standard output and standard error. | ||
c.cmd.Stdout = logFile | ||
c.cmd.Stderr = logFile | ||
|
||
c.doneCh = make(chan struct{}, 1) | ||
c.waitCh = make(chan struct{}) | ||
|
||
if err := c.cmd.Start(); err != nil { | ||
return err | ||
} | ||
|
||
c.logger.Debugf("Agent process started, PID=%d", c.cmd.Process.Pid) | ||
|
||
go c.watch() | ||
|
||
return nil | ||
} | ||
|
||
func (c *Commander) Restart(ctx context.Context) error { | ||
if err := c.Stop(ctx); err != nil { | ||
return err | ||
} | ||
if err := c.Start(ctx); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func (c *Commander) watch() { | ||
c.cmd.Wait() | ||
c.doneCh <- struct{}{} | ||
close(c.waitCh) | ||
} | ||
|
||
// Done returns a channel that will send a signal when the Agent process is finished. | ||
func (c *Commander) Done() <-chan struct{} { | ||
return c.doneCh | ||
} | ||
|
||
// Pid returns Agent process PID if it is started or 0 if it is not. | ||
func (c *Commander) Pid() int { | ||
if c.cmd == nil || c.cmd.Process == nil { | ||
return 0 | ||
} | ||
return c.cmd.Process.Pid | ||
} | ||
|
||
// ExitCode returns Agent process exit code if it exited or 0 if it is not. | ||
func (c *Commander) ExitCode() int { | ||
if c.cmd == nil || c.cmd.ProcessState == nil { | ||
return 0 | ||
} | ||
return c.cmd.ProcessState.ExitCode() | ||
} | ||
|
||
// Stop the Agent process. Sends SIGTERM to the process and wait for up 10 seconds | ||
// and if the process does not finish kills it forcedly by sending SIGKILL. | ||
// Returns after the process is terminated. | ||
func (c *Commander) Stop(ctx context.Context) error { | ||
if c.cmd == nil || c.cmd.Process == nil { | ||
// Not started, nothing to do. | ||
return nil | ||
} | ||
|
||
c.logger.Debugf("Stopping agent process, PID=%v", c.cmd.Process.Pid) | ||
|
||
// Gracefully signal process to stop. | ||
if err := c.cmd.Process.Signal(syscall.SIGTERM); err != nil { | ||
return err | ||
} | ||
|
||
finished := make(chan struct{}) | ||
|
||
// Setup a goroutine to wait a while for process to finish and send kill signal | ||
// to the process if it doesn't finish. | ||
var innerErr error | ||
go func() { | ||
// Wait 10 seconds. | ||
t := time.After(10 * time.Second) | ||
select { | ||
case <-ctx.Done(): | ||
break | ||
case <-t: | ||
break | ||
case <-finished: | ||
// Process is successfully finished. | ||
c.logger.Debugf("Agent process PID=%v successfully stopped.", c.cmd.Process.Pid) | ||
return | ||
} | ||
|
||
// Time is out. Kill the process. | ||
c.logger.Debugf( | ||
"Agent process PID=%d is not responding to SIGTERM. Sending SIGKILL to kill forcedly.", | ||
c.cmd.Process.Pid) | ||
if innerErr = c.cmd.Process.Signal(syscall.SIGKILL); innerErr != nil { | ||
return | ||
} | ||
}() | ||
|
||
// Wait for process to terminate | ||
<-c.waitCh | ||
|
||
// Let goroutine know process is finished. | ||
close(finished) | ||
|
||
return innerErr | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package config | ||
|
||
// Supervisor is the Supervisor config file format. | ||
type Supervisor struct { | ||
Server *OpAMPServer | ||
Agent *Agent | ||
} | ||
|
||
type OpAMPServer struct { | ||
Endpoint string | ||
} | ||
|
||
type Agent struct { | ||
Executable string | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package supervisor | ||
|
||
import "log" | ||
|
||
type Logger struct { | ||
Logger *log.Logger | ||
} | ||
|
||
func (l *Logger) Debugf(format string, v ...interface{}) { | ||
l.Logger.Printf(format, v...) | ||
} | ||
|
||
func (l *Logger) Errorf(format string, v ...interface{}) { | ||
l.Logger.Printf(format, v...) | ||
} |
Oops, something went wrong.