From dcb86e273f3e8194866919172a5496039b218d7f Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Mon, 15 Jun 2020 17:22:30 +1000 Subject: [PATCH] Enable to Collector to be run as a Windows service --- cmd/otelcol/main.go | 32 ++-- cmd/otelcol/main_others.go | 23 +++ cmd/otelcol/main_windows.go | 46 ++++++ service/logger.go | 4 +- service/service.go | 27 ++-- service/service_test.go | 52 ++++++- service/service_windows.go | 147 +++++++++++++++++++ service/service_windows_test.go | 56 +++++++ service/telemetry.go | 25 +++- service/testdata/otelcol-config-minimal.yaml | 13 ++ 10 files changed, 394 insertions(+), 31 deletions(-) create mode 100644 cmd/otelcol/main_others.go create mode 100644 cmd/otelcol/main_windows.go create mode 100644 service/service_windows.go create mode 100644 service/service_windows_test.go create mode 100644 service/testdata/otelcol-config-minimal.yaml diff --git a/cmd/otelcol/main.go b/cmd/otelcol/main.go index f40a39532f0..1cf70800dd8 100644 --- a/cmd/otelcol/main.go +++ b/cmd/otelcol/main.go @@ -19,20 +19,18 @@ package main import ( "log" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/internal/version" "go.opentelemetry.io/collector/service" "go.opentelemetry.io/collector/service/defaultcomponents" ) func main() { - handleErr := func(message string, err error) { - if err != nil { - log.Fatalf("%s: %v", message, err) - } - } - factories, err := defaultcomponents.Components() - handleErr("Failed to build default components", err) + if err != nil { + log.Fatalf("failed to build default components: %v", err) + } info := service.ApplicationStartInfo{ ExeName: "otelcol", @@ -41,9 +39,21 @@ func main() { GitHash: version.GitHash, } - svc, err := service.New(service.Parameters{ApplicationStartInfo: info, Factories: factories}) - handleErr("Failed to construct the application", err) + if err := run(service.Parameters{ApplicationStartInfo: info, Factories: factories}); err != nil { + log.Fatal(err) + } +} + +func runInteractive(params service.Parameters) error { + app, err := service.New(params) + if err != nil { + return errors.Wrap(err, "failed to construct the application") + } + + err = app.Start() + if err != nil { + return errors.Wrap(err, "application run finished with error: %v") + } - err = svc.Start() - handleErr("Application run finished with error", err) + return nil } diff --git a/cmd/otelcol/main_others.go b/cmd/otelcol/main_others.go new file mode 100644 index 00000000000..bacdadcba5e --- /dev/null +++ b/cmd/otelcol/main_others.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package main + +import "go.opentelemetry.io/collector/service" + +func run(params service.Parameters) error { + return runInteractive(params) +} diff --git a/cmd/otelcol/main_windows.go b/cmd/otelcol/main_windows.go new file mode 100644 index 00000000000..300cd59a141 --- /dev/null +++ b/cmd/otelcol/main_windows.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package main + +import ( + "github.com/pkg/errors" + "golang.org/x/sys/windows/svc" + + "go.opentelemetry.io/collector/service" +) + +func run(params service.Parameters) error { + isInteractive, err := svc.IsAnInteractiveSession() + if err != nil { + return errors.Wrap(err, "failed to determine if we are running in an interactive session") + } + + if isInteractive { + return runInteractive(params) + } else { + return runService(params) + } +} + +func runService(params service.Parameters) error { + // do not need to supply service name when startup is invoked through Service Control Manager directly + if err := svc.Run("", service.NewWindowsService(params)); err != nil { + return errors.Wrap(err, "failed to start service") + } + + return nil +} diff --git a/service/logger.go b/service/logger.go index 38b22c911e5..1246181611c 100644 --- a/service/logger.go +++ b/service/logger.go @@ -39,7 +39,7 @@ func loggerFlags(flags *flag.FlagSet) { loggerProfilePtr = flags.String(logProfileCfg, "", "Logging profile to use (dev, prod)") } -func newLogger() (*zap.Logger, error) { +func newLogger(hooks ...func(zapcore.Entry) error) (*zap.Logger, error) { var level zapcore.Level err := (&level).UnmarshalText([]byte(*loggerLevelPtr)) if err != nil { @@ -62,5 +62,5 @@ func newLogger() (*zap.Logger, error) { } conf.Level.SetLevel(level) - return conf.Build() + return conf.Build(zap.Hooks(hooks...)) } diff --git a/service/service.go b/service/service.go index 2da897deda9..78f3f6ed3db 100644 --- a/service/service.go +++ b/service/service.go @@ -32,6 +32,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" @@ -83,6 +84,9 @@ type Application struct { // stopTestChan is used to terminate the application in end to end tests. stopTestChan chan struct{} + // signalsChannel is used to receive termination signals from the OS. + signalsChannel chan os.Signal + // asyncErrorChannel is used to signal a fatal error from any component. asyncErrorChannel chan error } @@ -118,6 +122,8 @@ type Parameters struct { // If it is not provided the default factory (FileLoaderConfigFactory) is used. // The default factory loads the configuration specified as a command line flag. ConfigFactory ConfigFactory + // LoggingHooks provides a way to supply a hook into logging events + LoggingHooks []func(zapcore.Entry) error } // ConfigFactory creates config. @@ -156,7 +162,7 @@ func New(params Parameters) (*Application, error) { Use: params.ApplicationStartInfo.ExeName, Long: params.ApplicationStartInfo.LongName, RunE: func(cmd *cobra.Command, args []string) error { - err := app.init() + err := app.init(params.LoggingHooks...) if err != nil { return err } @@ -237,8 +243,8 @@ func (app *Application) SignalTestComplete() { close(app.stopTestChan) } -func (app *Application) init() error { - l, err := newLogger() +func (app *Application) init(hooks ...func(zapcore.Entry) error) error { + l, err := newLogger(hooks...) if err != nil { return errors.Wrap(err, "failed to get logger") } @@ -261,19 +267,17 @@ func (app *Application) setupTelemetry(ballastSizeBytes uint64) error { func (app *Application) runAndWaitForShutdownEvent() { app.logger.Info("Everything is ready. Begin running and processing data.") - // Plug SIGTERM signal into a channel. - signalsChannel := make(chan os.Signal, 1) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + // plug SIGTERM signal into a channel. + app.signalsChannel = make(chan os.Signal, 1) + signal.Notify(app.signalsChannel, os.Interrupt, syscall.SIGTERM) // set the channel to stop testing. app.stopTestChan = make(chan struct{}) - // notify tests that it is ready. - app.stateChannel <- Running select { case err := <-app.asyncErrorChannel: app.logger.Error("Asynchronous error received, terminating process", zap.Error(err)) - case s := <-signalsChannel: + case s := <-app.signalsChannel: app.logger.Info("Received signal from OS", zap.String("signal", s.String())) case <-app.stopTestChan: app.logger.Info("Received stop test request") @@ -462,7 +466,10 @@ func (app *Application) execute(ctx context.Context, factory ConfigFactory) erro errs = append(errs, errors.Wrap(err, "failed to shutdown extensions")) } - AppTelemetry.shutdown() + err = AppTelemetry.shutdown() + if err != nil { + errs = append(errs, errors.Wrap(err, "failed to shutdown extensions")) + } app.logger.Info("Shutdown complete.") app.stateChannel <- Closed diff --git a/service/service_test.go b/service/service_test.go index e99fb208f08..b7837a3b5d0 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -23,13 +23,16 @@ import ( "sort" "strconv" "strings" + "syscall" "testing" + "github.com/pkg/errors" "github.com/prometheus/common/expfmt" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -43,7 +46,13 @@ func TestApplication_Start(t *testing.T) { factories, err := defaultcomponents.Components() require.NoError(t, err) - app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}}) + loggingHookCalled := false + hook := func(entry zapcore.Entry) error { + loggingHookCalled = true + return nil + } + + app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}, LoggingHooks: []func(entry zapcore.Entry) error{hook}}) require.NoError(t, err) assert.Equal(t, app.rootCmd, app.Command()) @@ -65,6 +74,7 @@ func TestApplication_Start(t *testing.T) { assert.Equal(t, Running, <-app.GetStateChannel()) require.True(t, isAppAvailable(t, "http://localhost:13133")) assert.Equal(t, app.logger, app.GetLogger()) + assert.True(t, loggingHookCalled) // All labels added to all collector metrics by default are listed below. // These labels are hard coded here in order to avoid inadvertent changes: @@ -77,7 +87,45 @@ func TestApplication_Start(t *testing.T) { } assertMetrics(t, testPrefix, metricsPort, mandatoryLabels) - close(app.stopTestChan) + app.signalsChannel <- syscall.SIGTERM + <-appDone + assert.Equal(t, Closing, <-app.GetStateChannel()) + assert.Equal(t, Closed, <-app.GetStateChannel()) +} + +type mockAppTelemetry struct{} + +func (tel *mockAppTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error { + return nil +} + +func (tel *mockAppTelemetry) shutdown() error { + return errors.New("err1") +} + +func TestApplication_ReportError(t *testing.T) { + // use a mock AppTelemetry struct to return an error on shutdown + preservedAppTelemetry := AppTelemetry + AppTelemetry = &mockAppTelemetry{} + defer func() { AppTelemetry = preservedAppTelemetry }() + + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}}) + require.NoError(t, err) + + app.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"}) + + appDone := make(chan struct{}) + go func() { + defer close(appDone) + assert.EqualError(t, app.Start(), "failed to shutdown extensions: err1") + }() + + assert.Equal(t, Starting, <-app.GetStateChannel()) + assert.Equal(t, Running, <-app.GetStateChannel()) + app.ReportFatalError(errors.New("err2")) <-appDone assert.Equal(t, Closing, <-app.GetStateChannel()) assert.Equal(t, Closed, <-app.GetStateChannel()) diff --git a/service/service_windows.go b/service/service_windows.go new file mode 100644 index 00000000000..271e0ef23e5 --- /dev/null +++ b/service/service_windows.go @@ -0,0 +1,147 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package service + +import ( + "fmt" + "syscall" + + "github.com/pkg/errors" + "go.uber.org/zap/zapcore" + "golang.org/x/sys/windows/svc" + "golang.org/x/sys/windows/svc/eventlog" +) + +type WindowsService struct { + params Parameters + app *Application +} + +func NewWindowsService(params Parameters) *WindowsService { + return &WindowsService{params: params} +} + +// Execute implements https://godoc.org/golang.org/x/sys/windows/svc#Handler +func (s *WindowsService) Execute(args []string, requests <-chan svc.ChangeRequest, changes chan<- svc.Status) (ssec bool, errno uint32) { + // The first argument supplied to service.Execute is the service name. If this is + // not provided for some reason, raise a relevant error to the system event log + if len(args) == 0 { + return false, 1213 // 1213: ERROR_INVALID_SERVICENAME + } + + elog, err := openEventLog(args[0]) + if err != nil { + return false, 1501 // 1501: ERROR_EVENTLOG_CANT_START + } + + appErrorChannel := make(chan error, 1) + + changes <- svc.Status{State: svc.StartPending} + if err = s.start(elog, appErrorChannel); err != nil { + elog.Error(3, fmt.Sprintf("failed to start service: %v", err)) + return false, 1064 // 1064: ERROR_EXCEPTION_IN_SERVICE + } + changes <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptShutdown} + + for req := range requests { + switch req.Cmd { + case svc.Interrogate: + changes <- req.CurrentStatus + + case svc.Stop, svc.Shutdown: + changes <- svc.Status{State: svc.StopPending} + if err := s.stop(appErrorChannel); err != nil { + elog.Error(3, fmt.Sprintf("errors occurred while shutting down the service: %v", err)) + } + changes <- svc.Status{State: svc.Stopped} + return false, 0 + + default: + elog.Error(3, fmt.Sprintf("unexpected service control request #%d", req.Cmd)) + return false, 1052 // 1052: ERROR_INVALID_SERVICE_CONTROL + } + } + + return false, 0 +} + +func (s *WindowsService) start(elog *eventlog.Log, appErrorChannel chan error) error { + var err error + s.app, err = newWithEventViewerLoggingHook(s.params, elog) + if err != nil { + return err + } + + // app.Start blocks until receiving a SIGTERM signal, so needs to be started + // asynchronously, but it will exit early if an error occurs on startup + go func() { appErrorChannel <- s.app.Start() }() + + // wait until the app is in the Running state + go func() { + for state := range s.app.GetStateChannel() { + if state == Running { + appErrorChannel <- nil + break + } + } + }() + + // wait until the app is in the Running state, or an error was returned + return <-appErrorChannel +} + +func (s *WindowsService) stop(appErrorChannel chan error) error { + // simulate a SIGTERM signal to terminate the application + s.app.signalsChannel <- syscall.SIGTERM + // return the response of app.Start + return <-appErrorChannel +} + +func openEventLog(serviceName string) (*eventlog.Log, error) { + elog, err := eventlog.Open(serviceName) + if err != nil { + return nil, errors.Wrap(err, "service failed to open event log: %v") + } + + return elog, nil +} + +func newWithEventViewerLoggingHook(params Parameters, elog *eventlog.Log) (*Application, error) { + params.LoggingHooks = append( + params.LoggingHooks, + func(entry zapcore.Entry) error { + msg := fmt.Sprintf("%v\r\n\r\nStack Trace:\r\n%v", entry.Message, entry.Stack) + + switch entry.Level { + case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel: + // golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs + return elog.Error(3, msg) + case zapcore.ErrorLevel: + return elog.Error(3, msg) + case zapcore.WarnLevel: + return elog.Warning(2, msg) + case zapcore.InfoLevel: + return elog.Info(1, msg) + } + + // ignore Debug level logs + return nil + }, + ) + + return New(params) +} diff --git a/service/service_windows_test.go b/service/service_windows_test.go new file mode 100644 index 00000000000..ed2f84f855b --- /dev/null +++ b/service/service_windows_test.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package service + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sys/windows/svc" + + "go.opentelemetry.io/collector/service/defaultcomponents" +) + +func TestWindowsService_Execute(t *testing.T) { + os.Args = []string{"otelcol", "--config", "testdata/otelcol-config-minimal.yaml"} + + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + s := NewWindowsService(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}}) + + appDone := make(chan struct{}) + requests := make(chan svc.ChangeRequest) + changes := make(chan svc.Status) + go func() { + defer close(appDone) + ssec, errno := s.Execute([]string{"svc name"}, requests, changes) + assert.Equal(t, uint32(0), errno) + assert.False(t, ssec) + }() + + assert.Equal(t, svc.StartPending, (<-changes).State) + assert.Equal(t, svc.Running, (<-changes).State) + requests <- svc.ChangeRequest{Cmd: svc.Interrogate, CurrentStatus: svc.Status{State: svc.Running}} + assert.Equal(t, svc.Running, (<-changes).State) + requests <- svc.ChangeRequest{Cmd: svc.Stop} + assert.Equal(t, svc.StopPending, (<-changes).State) + assert.Equal(t, svc.Stopped, (<-changes).State) + <-appDone +} diff --git a/service/telemetry.go b/service/telemetry.go index 6d924bbfdc3..06fc6b77844 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -36,11 +36,17 @@ import ( var ( // AppTelemetry is application's own telemetry. - AppTelemetry = &appTelemetry{} + AppTelemetry appTelemetryExporter = &appTelemetry{} ) +type appTelemetryExporter interface { + init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error + shutdown() error +} + type appTelemetry struct { - views []*view.View + views []*view.View + server *http.Server } func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error { @@ -100,10 +106,16 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u zap.String(conventions.AttributeServiceInstance, instanceID), ) + mux := http.NewServeMux() + mux.Handle("/metrics", pe) + + tel.server = &http.Server{ + Addr: metricsAddr, + Handler: mux, + } + go func() { - mux := http.NewServeMux() - mux.Handle("/metrics", pe) - serveErr := http.ListenAndServe(metricsAddr, mux) + serveErr := tel.server.ListenAndServe() if serveErr != nil && serveErr != http.ErrServerClosed { asyncErrorChannel <- serveErr } @@ -112,8 +124,9 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u return nil } -func (tel *appTelemetry) shutdown() { +func (tel *appTelemetry) shutdown() error { view.Unregister(tel.views...) + return tel.server.Close() } func sanitizePrometheusKey(str string) string { diff --git a/service/testdata/otelcol-config-minimal.yaml b/service/testdata/otelcol-config-minimal.yaml new file mode 100644 index 00000000000..448973ae172 --- /dev/null +++ b/service/testdata/otelcol-config-minimal.yaml @@ -0,0 +1,13 @@ +receivers: + otlp: + +exporters: + otlp: + endpoint: "locahost:14250" + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [otlp] +