Skip to content

Commit

Permalink
Extract service.pipelines interface, add skeleton graph implementation (
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Jan 23, 2023
1 parent 701fe0d commit dc3071c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 3 deletions.
45 changes: 45 additions & 0 deletions service/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"net/http"

"go.opentelemetry.io/collector/component"
)

var _ pipelines = (*pipelinesGraph)(nil)

type pipelinesGraph struct{}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
// TODO actual implementation
return nil
}

func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {
// TODO actual implementation
return nil
}

func (g *pipelinesGraph) GetExporters() map[component.DataType]map[component.ID]component.Component {
// TODO actual implementation
return make(map[component.DataType]map[component.ID]component.Component)
}

func (g *pipelinesGraph) HandleZPages(w http.ResponseWriter, r *http.Request) {
// TODO actual implementation
}
2 changes: 1 addition & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type serviceHost struct {

buildInfo component.BuildInfo

pipelines *builtPipelines
pipelines
serviceExtensions *extensions.Extensions
}

Expand Down
13 changes: 12 additions & 1 deletion service/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/processor"
Expand All @@ -39,6 +40,15 @@ const (
zComponentKind = "zcomponentkind"
)

type pipelines interface {
StartAll(ctx context.Context, host component.Host) error
ShutdownAll(ctx context.Context) error
GetExporters() map[component.DataType]map[component.ID]component.Component
HandleZPages(w http.ResponseWriter, r *http.Request)
}

var _ pipelines = (*builtPipelines)(nil)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
type baseConsumer interface {
Capabilities() consumer.Capabilities
Expand Down Expand Up @@ -187,13 +197,14 @@ type pipelinesSettings struct {
Receivers *receiver.Builder
Processors *processor.Builder
Exporters *exporter.Builder
Connectors *connector.Builder

// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs map[component.ID]*PipelineConfig
}

// buildPipelines builds all pipelines from config.
func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines, error) {
func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, error) {
exps := &builtPipelines{
telemetry: set.Telemetry,
allReceivers: make(map[component.DataType]map[component.ID]component.Component),
Expand Down
4 changes: 3 additions & 1 deletion service/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestBuildPipelines(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Build the pipeline
pipelines, err := buildPipelines(context.Background(), pipelinesSettings{
pips, err := buildPipelines(context.Background(), pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Receivers: receiver.NewBuilder(
Expand Down Expand Up @@ -211,7 +211,9 @@ func TestBuildPipelines(t *testing.T) {
PipelineConfigs: test.pipelineConfigs,
})
assert.NoError(t, err)
assert.IsType(t, &builtPipelines{}, pips)

pipelines := pips.(*builtPipelines)
assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))

for dt, pipeline := range test.pipelineConfigs {
Expand Down

0 comments on commit dc3071c

Please sign in to comment.