Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[healthcheckextensionv2] Introduce health check extension based on component status reporting #30673

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
33deeeb
Introduce health check extension based on component status reporting
mwear Jan 19, 2024
d4abdf9
Refactor aggregator
mwear Jan 25, 2024
79c59e2
Make gotidy
mwear Jan 25, 2024
298c343
Update extension/healthcheckextensionv2/config.go
mwear Jan 25, 2024
bf369bb
Apply suggestions from code review
mwear Jan 26, 2024
e742ed0
Implement suggestions from review
mwear Jan 26, 2024
bd25302
Add Scope and Detail types to Aggregator
mwear Jan 26, 2024
f2765fd
Use pipeline status for availability during start/shutdown
mwear Jan 30, 2024
e9b0f14
Multiple health strategies for http
mwear Feb 7, 2024
5a4ae61
Multiple health strategies via different aggregation funcs
mwear Feb 9, 2024
9084fd5
Simplify HTTP strategies
mwear Feb 21, 2024
166870b
gRPC support for component health options
mwear Feb 22, 2024
2accfd2
Fusion
mwear Feb 29, 2024
1f455d3
Update deps / rename settings -> config
mwear Mar 1, 2024
6b343dd
Readme fusion
mwear Mar 2, 2024
276f326
Update extension/healthcheckextensionv2/README.md
mwear Mar 5, 2024
168cb3c
Address code review feedback
mwear Mar 5, 2024
f6d09b2
Address more feedback; additional cleanup
mwear Mar 5, 2024
43b4ce1
Post rebase fixes
mwear Mar 7, 2024
ab04e29
Post rebase fixes
mwear Mar 28, 2024
bf07351
Rename healthcheckextensionv2 -> healthcheckv2extension
mwear Mar 29, 2024
e1a17c4
Update metadata
mwear Mar 29, 2024
e8725e7
Post rebase updates
mwear Apr 15, 2024
3582452
Enable goleak
mwear Apr 18, 2024
e972c0b
Update go.mod
mwear Apr 25, 2024
36f4169
Post rebase fixes
mwear Jun 11, 2024
62c1bb0
Check listen error
mwear Jul 10, 2024
1ab2038
whitespace
mwear Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/healthcheck-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'new_component'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: healthcheckv2extension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Health Check Extension V2 is meant to be a replacement for the current Health Check Extension. It is based off of component status reporting and provides HTTP and gRPC services health check services.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 3 additions & 3 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ testbed/mockdatasenders/mockdatadogagentexporter/ @open-teleme
# List of distribution maintainers for OpenTelemetry Collector Contrib
#
#####################################################
reports/distributions/core.yaml @open-telemetry/collector-contrib-approvers
reports/distributions/contrib.yaml @open-telemetry/collector-contrib-approvers
reports/distributions/core.yaml @open-telemetry/collector-contrib-approvers
mwear marked this conversation as resolved.
Show resolved Hide resolved
reports/distributions/contrib.yaml @open-telemetry/collector-contrib-approvers
mwear marked this conversation as resolved.
Show resolved Hide resolved


## UNMAINTAINED components

receiver/googlecloudspannerreceiver/ @open-telemetry/collector-contrib-approvers
receiver/googlecloudspannerreceiver/ @open-telemetry/collector-contrib-approvers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert those white space changes, this file is generated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. FYI, this PR is just a reference. I've been keeping it up to date as I split off smaller PRs and they are integrated.

mwear marked this conversation as resolved.
Show resolved Hide resolved
178 changes: 170 additions & 8 deletions extension/healthcheckv2extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,198 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
)

type eventSourcePair struct {
source *component.InstanceID
event *component.StatusEvent
}

type healthCheckExtension struct {
config Config
telemetry component.TelemetrySettings
config Config
telemetry component.TelemetrySettings
aggregator *status.Aggregator
subcomponents []component.Component
eventCh chan *eventSourcePair
readyCh chan struct{}
}

var _ component.Component = (*healthCheckExtension)(nil)
var _ extension.ConfigWatcher = (*healthCheckExtension)(nil)
var _ extension.PipelineWatcher = (*healthCheckExtension)(nil)

func newExtension(
_ context.Context,
ctx context.Context,
config Config,
set extension.Settings,
) *healthCheckExtension {
return &healthCheckExtension{
config: config,
telemetry: set.TelemetrySettings,
var comps []component.Component

errPriority := status.PriorityPermanent
if config.ComponentHealthConfig != nil &&
config.ComponentHealthConfig.IncludeRecoverable &&
!config.ComponentHealthConfig.IncludePermanent {
errPriority = status.PriorityRecoverable
}

aggregator := status.NewAggregator(errPriority)

if config.UseV2 && config.GRPCConfig != nil {
grpcServer := grpc.NewServer(
config.GRPCConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, grpcServer)
}

if !config.UseV2 || config.UseV2 && config.HTTPConfig != nil {
httpServer := http.NewServer(
config.HTTPConfig,
config.LegacyConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, httpServer)
}

hc := &healthCheckExtension{
config: config,
subcomponents: comps,
telemetry: set.TelemetrySettings,
aggregator: aggregator,
eventCh: make(chan *eventSourcePair),
readyCh: make(chan struct{}),
}

// Start processing events in the background so that our status watcher doesn't
// block others before the extension starts.
go hc.eventLoop(ctx)

return hc
}

// Start implements the component.Component interface.
func (hc *healthCheckExtension) Start(context.Context, component.Host) error {
func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error {
hc.telemetry.Logger.Debug("Starting health check extension V2", zap.Any("config", hc.config))

for _, comp := range hc.subcomponents {
if err := comp.Start(ctx, host); err != nil {
return err
}
}

return nil
}

// Shutdown implements the component.Component interface.
func (hc *healthCheckExtension) Shutdown(context.Context) error {
func (hc *healthCheckExtension) Shutdown(ctx context.Context) error {
// Preemptively send the stopped event, so it can be exported before shutdown
hc.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))

close(hc.eventCh)
hc.aggregator.Close()

var err error
for _, comp := range hc.subcomponents {
err = multierr.Append(err, comp.Shutdown(ctx))
}

return err
}

// ComponentStatusChanged implements the extension.StatusWatcher interface.
func (hc *healthCheckExtension) ComponentStatusChanged(
source *component.InstanceID,
event *component.StatusEvent,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
hc.telemetry.Logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
hc.eventCh <- &eventSourcePair{source: source, event: event}
}

// NotifyConfig implements the extension.ConfigWatcher interface.
func (hc *healthCheckExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var err error
for _, comp := range hc.subcomponents {
if cw, ok := comp.(extension.ConfigWatcher); ok {
err = multierr.Append(err, cw.NotifyConfig(ctx, conf))
}
}
return err
}

// Ready implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) Ready() error {
close(hc.readyCh)
return nil
}

// NotReady implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) NotReady() error {
return nil
}

func (hc *healthCheckExtension) eventLoop(ctx context.Context) {
// Record events with component.StatusStarting, but queue other events until
// PipelineWatcher.Ready is called. This prevents aggregate statuses from
// flapping between StatusStarting and StatusOK as components are started
// individually by the service.
var eventQueue []*eventSourcePair

for loop := true; loop; {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
if esp.event.Status() != component.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-hc.readyCh:
for _, esp := range eventQueue {
hc.aggregator.RecordStatus(esp.source, esp.event)
}
eventQueue = nil
loop = false
case <-ctx.Done():
return
}
}

// After PipelineWatcher.Ready, record statuses as they are received.
for {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-ctx.Done():
return
}
}
}
135 changes: 135 additions & 0 deletions extension/healthcheckv2extension/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package healthcheckv2extension

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/extension/extensiontest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)

func TestComponentStatus(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.HTTPConfig.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.GRPCConfig.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.UseV2 = true
ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings())

// Status before Start will be StatusNone
st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, st.Status(), component.StatusNone)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

traces := testhelpers.NewPipelineMetadata("traces")

// StatusStarting will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStarting))
}

// StatusOK will be queued until the PipelineWatcher Ready method is called.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusOK))
}

// Note the use of assert.Eventually here and throughout this test is because
// status events are processed asynchronously in the background.
assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStarting
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.Ready())

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusOK
}, time.Second, 10*time.Millisecond)

// StatusStopping will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopping))
}

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStopping
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.NotReady())
require.NoError(t, ext.Shutdown(context.Background()))

// Events sent after shutdown will be discarded
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopped))
}

st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, component.StatusStopping, st.Status())
}

func TestNotifyConfig(t *testing.T) {
confMap, err := confmaptest.LoadConf(
filepath.Join("internal", "http", "testdata", "config.yaml"),
)
require.NoError(t, err)
confJSON, err := os.ReadFile(
filepath.Clean(filepath.Join("internal", "http", "testdata", "config.json")),
)
require.NoError(t, err)

endpoint := testutil.GetAvailableLocalAddress(t)

cfg := createDefaultConfig().(*Config)
cfg.UseV2 = true
cfg.HTTPConfig.Endpoint = endpoint
cfg.HTTPConfig.Config.Enabled = true
cfg.HTTPConfig.Config.Path = "/config"

ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings())

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

client := &http.Client{}
url := fmt.Sprintf("http://%s/config", endpoint)

var resp *http.Response

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)

require.NoError(t, ext.NotifyConfig(context.Background(), confMap))

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, confJSON, body)
}
Loading
Loading