From 16a7b294919f22a9f4edb63320e33f2dd305b07c Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Thu, 11 Jul 2024 01:49:13 -0700 Subject: [PATCH] [extension/healthcheckv2] Add partial gRPC service implementation (#34028) **Description:** The PR is the fourth in a series to decompose #30673 into more manageable pieces for review. This PR introduces the basic structure for the gRPC health check service, which is an implementation of the [grpc_health_v1 service](https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto). This PR implements the unary `Check` RPC. In order minimize PR size, the streaming `Watch` RPC will be introduced in a followup. **Link to tracking Issue:** #26661 **Testing:** Units / manual **Documentation:** Comments, etc. --- .chloggen/healthcheckv2-grpc-check.yaml | 27 + extension/healthcheckv2extension/go.mod | 2 +- .../internal/grpc/grpc.go | 61 ++ .../internal/grpc/grpc_test.go | 743 ++++++++++++++++++ .../internal/grpc/package_test.go | 14 + .../internal/grpc/server.go | 82 ++ 6 files changed, 928 insertions(+), 1 deletion(-) create mode 100644 .chloggen/healthcheckv2-grpc-check.yaml create mode 100644 extension/healthcheckv2extension/internal/grpc/grpc.go create mode 100644 extension/healthcheckv2extension/internal/grpc/grpc_test.go create mode 100644 extension/healthcheckv2extension/internal/grpc/package_test.go create mode 100644 extension/healthcheckv2extension/internal/grpc/server.go diff --git a/.chloggen/healthcheckv2-grpc-check.yaml b/.chloggen/healthcheckv2-grpc-check.yaml new file mode 100644 index 000000000000..417adb019b5f --- /dev/null +++ b/.chloggen/healthcheckv2-grpc-check.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'healthcheckv2extension' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add partial gRPC service implementation to healthcheckv2. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26661] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/healthcheckv2extension/go.mod b/extension/healthcheckv2extension/go.mod index d4a46938b7c2..418f5eecb7a1 100644 --- a/extension/healthcheckv2extension/go.mod +++ b/extension/healthcheckv2extension/go.mod @@ -16,6 +16,7 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.65.0 ) require ( @@ -66,7 +67,6 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect - google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/extension/healthcheckv2extension/internal/grpc/grpc.go b/extension/healthcheckv2extension/internal/grpc/grpc.go new file mode 100644 index 000000000000..e89a8996c49f --- /dev/null +++ b/extension/healthcheckv2extension/internal/grpc/grpc.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package grpc // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc" + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + grpcstatus "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" +) + +var ( + errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.") + + statusToServingStatusMap = map[component.Status]healthpb.HealthCheckResponse_ServingStatus{ + component.StatusNone: healthpb.HealthCheckResponse_NOT_SERVING, + component.StatusStarting: healthpb.HealthCheckResponse_NOT_SERVING, + component.StatusOK: healthpb.HealthCheckResponse_SERVING, + component.StatusRecoverableError: healthpb.HealthCheckResponse_SERVING, + component.StatusPermanentError: healthpb.HealthCheckResponse_SERVING, + component.StatusFatalError: healthpb.HealthCheckResponse_NOT_SERVING, + component.StatusStopping: healthpb.HealthCheckResponse_NOT_SERVING, + component.StatusStopped: healthpb.HealthCheckResponse_NOT_SERVING, + } +) + +func (s *Server) Check( + _ context.Context, + req *healthpb.HealthCheckRequest, +) (*healthpb.HealthCheckResponse, error) { + st, ok := s.aggregator.AggregateStatus(status.Scope(req.Service), status.Concise) + if !ok { + return nil, errNotFound + } + + return &healthpb.HealthCheckResponse{ + Status: s.toServingStatus(st.Event), + }, nil +} + +func (s *Server) toServingStatus( + ev status.Event, +) healthpb.HealthCheckResponse_ServingStatus { + if s.componentHealthConfig.IncludeRecoverable && + ev.Status() == component.StatusRecoverableError && + time.Now().After(ev.Timestamp().Add(s.componentHealthConfig.RecoveryDuration)) { + return healthpb.HealthCheckResponse_NOT_SERVING + } + + if s.componentHealthConfig.IncludePermanent && ev.Status() == component.StatusPermanentError { + return healthpb.HealthCheckResponse_NOT_SERVING + } + + return statusToServingStatusMap[ev.Status()] +} diff --git a/extension/healthcheckv2extension/internal/grpc/grpc_test.go b/extension/healthcheckv2extension/internal/grpc/grpc_test.go new file mode 100644 index 000000000000..866acd446ff0 --- /dev/null +++ b/extension/healthcheckv2extension/internal/grpc/grpc_test.go @@ -0,0 +1,743 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confignet" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + grpcstatus "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" +) + +func TestCheck(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + config := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + var server *Server + traces := testhelpers.NewPipelineMetadata("traces") + metrics := testhelpers.NewPipelineMetadata("metrics") + + type teststep struct { + step func() + eventually bool + service string + expectedStatus healthpb.HealthCheckResponse_ServingStatus + expectedErr error + } + + tests := []struct { + name string + config *Config + componentHealthSettings *common.ComponentHealthConfig + teststeps []teststep + }{ + { + name: "exclude recoverable and permanent errors", + config: config, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + service: metrics.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // errors will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewStatusEvent(component.StatusOK), + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopping, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopped, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopped, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + { + name: "include recoverable and exclude permanent errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: false, + IncludeRecoverable: true, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + service: metrics.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // metrics and overall status will be NOT_SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: "", + eventually: true, + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // metrics and overall status will recover and resume SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewStatusEvent(component.StatusOK), + ) + }, + service: "", + eventually: true, + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // permament error will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopping, + ) + }, + service: "", + eventually: true, + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopped, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopped, + ) + }, + service: "", + eventually: true, + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + { + name: "include permanent and exclude recoverable errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: true, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + service: metrics.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // recoverable will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // permament error included + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopping, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopped, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopped, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + { + name: "include permanent and recoverable errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: true, + IncludeRecoverable: true, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + service: metrics.PipelineID.String(), + expectedErr: grpcstatus.Error(codes.NotFound, "Service not found."), + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // metrics and overall status will be NOT_SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: "", + eventually: true, + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // metrics and overall status will recover and resume SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewStatusEvent(component.StatusOK), + ) + }, + service: "", + eventually: true, + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopping, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopped, + ) + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopped, + ) + }, + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server = NewServer( + config, + tc.componentHealthSettings, + componenttest.NewNopTelemetrySettings(), + status.NewAggregator(testhelpers.ErrPriority(tc.componentHealthSettings)), + ) + require.NoError(t, server.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, server.Shutdown(context.Background())) }) + + cc, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer func() { + assert.NoError(t, cc.Close()) + }() + + client := healthpb.NewHealthClient(cc) + + for _, ts := range tc.teststeps { + if ts.step != nil { + ts.step() + } + + if ts.eventually { + assert.Eventually(t, func() bool { + resp, err := client.Check( + context.Background(), + &healthpb.HealthCheckRequest{Service: ts.service}, + ) + require.NoError(t, err) + return ts.expectedStatus == resp.Status + }, time.Second, 10*time.Millisecond) + continue + } + + resp, err := client.Check( + context.Background(), + &healthpb.HealthCheckRequest{Service: ts.service}, + ) + require.Equal(t, ts.expectedErr, err) + if ts.expectedErr != nil { + continue + } + assert.Equal(t, ts.expectedStatus, resp.Status) + } + }) + } + +} diff --git a/extension/healthcheckv2extension/internal/grpc/package_test.go b/extension/healthcheckv2extension/internal/grpc/package_test.go new file mode 100644 index 000000000000..0a083f58c642 --- /dev/null +++ b/extension/healthcheckv2extension/internal/grpc/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package grpc // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc" + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/extension/healthcheckv2extension/internal/grpc/server.go b/extension/healthcheckv2extension/internal/grpc/server.go new file mode 100644 index 000000000000..1ab61b956fd3 --- /dev/null +++ b/extension/healthcheckv2extension/internal/grpc/server.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package grpc // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/component" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" +) + +type Server struct { + healthpb.UnimplementedHealthServer + grpcServer *grpc.Server + aggregator *status.Aggregator + config *Config + componentHealthConfig *common.ComponentHealthConfig + telemetry component.TelemetrySettings + doneCh chan struct{} +} + +var _ component.Component = (*Server)(nil) + +func NewServer( + config *Config, + componentHealthConfig *common.ComponentHealthConfig, + telemetry component.TelemetrySettings, + aggregator *status.Aggregator, +) *Server { + srv := &Server{ + config: config, + componentHealthConfig: componentHealthConfig, + telemetry: telemetry, + aggregator: aggregator, + doneCh: make(chan struct{}), + } + if srv.componentHealthConfig == nil { + srv.componentHealthConfig = &common.ComponentHealthConfig{} + } + return srv +} + +// Start implements the component.Component interface. +func (s *Server) Start(ctx context.Context, host component.Host) error { + var err error + s.grpcServer, err = s.config.ToServer(ctx, host, s.telemetry) + if err != nil { + return err + } + + healthpb.RegisterHealthServer(s.grpcServer, s) + ln, err := s.config.NetAddr.Listen(context.Background()) + if err != nil { + return err + } + + go func() { + defer close(s.doneCh) + + if err = s.grpcServer.Serve(ln); err != nil && !errors.Is(err, grpc.ErrServerStopped) { + s.telemetry.ReportStatus(component.NewPermanentErrorEvent(err)) + } + }() + + return nil +} + +// Shutdown implements the component.Component interface. +func (s *Server) Shutdown(context.Context) error { + if s.grpcServer == nil { + return nil + } + s.grpcServer.GracefulStop() + <-s.doneCh + return nil +}