Skip to content

Commit

Permalink
Enable to Collector to be run as a Windows service
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Jun 24, 2020
1 parent a8db627 commit 081424c
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 31 deletions.
32 changes: 21 additions & 11 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@ package main
import (
"log"

"github.com/pkg/errors"

"go.opentelemetry.io/collector/internal/version"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/defaultcomponents"
)

func main() {
handleErr := func(message string, err error) {
if err != nil {
log.Fatalf("%s: %v", message, err)
}
}

factories, err := defaultcomponents.Components()
handleErr("Failed to build default components", err)
if err != nil {
log.Fatalf("failed to build default components: %v", err)
}

info := service.ApplicationStartInfo{
ExeName: "otelcol",
Expand All @@ -41,9 +39,21 @@ func main() {
GitHash: version.GitHash,
}

svc, err := service.New(service.Parameters{ApplicationStartInfo: info, Factories: factories})
handleErr("Failed to construct the application", err)
if err := run(service.Parameters{ApplicationStartInfo: info, Factories: factories}); err != nil {
log.Fatal(err)
}
}

func runInteractive(params service.Parameters) error {
app, err := service.New(params)
if err != nil {
return errors.Wrap(err, "failed to construct the application")
}

err = app.Start()
if err != nil {
return errors.Wrap(err, "application run finished with error: %v")
}

err = svc.Start()
handleErr("Application run finished with error", err)
return nil
}
23 changes: 23 additions & 0 deletions cmd/otelcol/main_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows

package main

import "go.opentelemetry.io/collector/service"

func run(params service.Parameters) error {
return runInteractive(params)
}
46 changes: 46 additions & 0 deletions cmd/otelcol/main_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build windows

package main

import (
"github.com/pkg/errors"
"golang.org/x/sys/windows/svc"

"go.opentelemetry.io/collector/service"
)

func run(params service.Parameters) error {
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return errors.Wrap(err, "failed to determine if we are running in an interactive session")
}

if isInteractive {
return runInteractive(params)
} else {
return runService(params)
}
}

func runService(params service.Parameters) error {
// do not need to supply service name when startup is invoked through Service Control Manager directly
if err := svc.Run("", service.NewWindowsService(params)); err != nil {
return errors.Wrap(err, "failed to start service")
}

return nil
}
4 changes: 2 additions & 2 deletions service/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func loggerFlags(flags *flag.FlagSet) {
loggerProfilePtr = flags.String(logProfileCfg, "", "Logging profile to use (dev, prod)")
}

func newLogger() (*zap.Logger, error) {
func newLogger(hooks ...func(zapcore.Entry) error) (*zap.Logger, error) {
var level zapcore.Level
err := (&level).UnmarshalText([]byte(*loggerLevelPtr))
if err != nil {
Expand All @@ -62,5 +62,5 @@ func newLogger() (*zap.Logger, error) {
}

conf.Level.SetLevel(level)
return conf.Build()
return conf.Build(zap.Hooks(hooks...))
}
27 changes: 17 additions & 10 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
Expand Down Expand Up @@ -83,6 +84,9 @@ type Application struct {
// stopTestChan is used to terminate the application in end to end tests.
stopTestChan chan struct{}

// signalsChannel is used to receive termination signals from the OS.
signalsChannel chan os.Signal

// asyncErrorChannel is used to signal a fatal error from any component.
asyncErrorChannel chan error
}
Expand Down Expand Up @@ -118,6 +122,8 @@ type Parameters struct {
// If it is not provided the default factory (FileLoaderConfigFactory) is used.
// The default factory loads the configuration specified as a command line flag.
ConfigFactory ConfigFactory
// LoggingHooks provides a way to supply a hook into logging events
LoggingHooks []func(zapcore.Entry) error
}

// ConfigFactory creates config.
Expand Down Expand Up @@ -156,7 +162,7 @@ func New(params Parameters) (*Application, error) {
Use: params.ApplicationStartInfo.ExeName,
Long: params.ApplicationStartInfo.LongName,
RunE: func(cmd *cobra.Command, args []string) error {
err := app.init()
err := app.init(params.LoggingHooks...)
if err != nil {
return err
}
Expand Down Expand Up @@ -237,8 +243,8 @@ func (app *Application) SignalTestComplete() {
close(app.stopTestChan)
}

func (app *Application) init() error {
l, err := newLogger()
func (app *Application) init(hooks ...func(zapcore.Entry) error) error {
l, err := newLogger(hooks...)
if err != nil {
return errors.Wrap(err, "failed to get logger")
}
Expand All @@ -261,19 +267,17 @@ func (app *Application) setupTelemetry(ballastSizeBytes uint64) error {
func (app *Application) runAndWaitForShutdownEvent() {
app.logger.Info("Everything is ready. Begin running and processing data.")

// Plug SIGTERM signal into a channel.
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
// plug SIGTERM signal into a channel.
app.signalsChannel = make(chan os.Signal, 1)
signal.Notify(app.signalsChannel, os.Interrupt, syscall.SIGTERM)

// set the channel to stop testing.
app.stopTestChan = make(chan struct{})
// notify tests that it is ready.

app.stateChannel <- Running
select {
case err := <-app.asyncErrorChannel:
app.logger.Error("Asynchronous error received, terminating process", zap.Error(err))
case s := <-signalsChannel:
case s := <-app.signalsChannel:
app.logger.Info("Received signal from OS", zap.String("signal", s.String()))
case <-app.stopTestChan:
app.logger.Info("Received stop test request")
Expand Down Expand Up @@ -462,7 +466,10 @@ func (app *Application) execute(ctx context.Context, factory ConfigFactory) erro
errs = append(errs, errors.Wrap(err, "failed to shutdown extensions"))
}

AppTelemetry.shutdown()
err = AppTelemetry.shutdown()
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to shutdown extensions"))
}

app.logger.Info("Shutdown complete.")
app.stateChannel <- Closed
Expand Down
52 changes: 50 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"sort"
"strconv"
"strings"
"syscall"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/common/expfmt"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand All @@ -43,7 +46,13 @@ func TestApplication_Start(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
loggingHookCalled := false
hook := func(entry zapcore.Entry) error {
loggingHookCalled = true
return nil
}

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}, LoggingHooks: []func(entry zapcore.Entry) error{hook}})
require.NoError(t, err)
assert.Equal(t, app.rootCmd, app.Command())

Expand All @@ -65,6 +74,7 @@ func TestApplication_Start(t *testing.T) {
assert.Equal(t, Running, <-app.GetStateChannel())
require.True(t, isAppAvailable(t, "http://localhost:13133"))
assert.Equal(t, app.logger, app.GetLogger())
assert.True(t, loggingHookCalled)

// All labels added to all collector metrics by default are listed below.
// These labels are hard coded here in order to avoid inadvertent changes:
Expand All @@ -77,7 +87,45 @@ func TestApplication_Start(t *testing.T) {
}
assertMetrics(t, testPrefix, metricsPort, mandatoryLabels)

close(app.stopTestChan)
app.signalsChannel <- syscall.SIGTERM
<-appDone
assert.Equal(t, Closing, <-app.GetStateChannel())
assert.Equal(t, Closed, <-app.GetStateChannel())
}

type mockAppTelemetry struct{}

func (tel *mockAppTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error {
return nil
}

func (tel *mockAppTelemetry) shutdown() error {
return errors.New("err1")
}

func TestApplication_ReportError(t *testing.T) {
// use a mock AppTelemetry struct to return an error on shutdown
preservedAppTelemetry := AppTelemetry
AppTelemetry = &mockAppTelemetry{}
defer func() { AppTelemetry = preservedAppTelemetry }()

factories, err := defaultcomponents.Components()
require.NoError(t, err)

app, err := New(Parameters{Factories: factories, ApplicationStartInfo: ApplicationStartInfo{}})
require.NoError(t, err)

app.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})

appDone := make(chan struct{})
go func() {
defer close(appDone)
assert.EqualError(t, app.Start(), "failed to shutdown extensions: err1")
}()

assert.Equal(t, Starting, <-app.GetStateChannel())
assert.Equal(t, Running, <-app.GetStateChannel())
app.ReportFatalError(errors.New("err2"))
<-appDone
assert.Equal(t, Closing, <-app.GetStateChannel())
assert.Equal(t, Closed, <-app.GetStateChannel())
Expand Down
Loading

0 comments on commit 081424c

Please sign in to comment.