Skip to content

Commit

Permalink
Trace ID aware load-balancing exporter - 3/4 (#1412)
Browse files Browse the repository at this point in the history
* Added the backend resolver
* Added the metrics definitions

**Link to tracking Issue:** Partially solves open-telemetry/opentelemetry-collector#1724, next step after #1349

**Testing:** unit tests

**Documentation:** godoc

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling authored Nov 10, 2020
1 parent e1a9ff1 commit e0204ba
Show file tree
Hide file tree
Showing 11 changed files with 829 additions and 0 deletions.
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/stretchr/testify v1.6.1
go.opencensus.io v0.22.5
go.opentelemetry.io/collector v0.14.1-0.20201106183657-c6b8f28c60b5
go.uber.org/zap v1.16.0
)
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/collector v0.14.1-0.20201106172639-0f8e82223a4b h1:SzlMszC7ScKIi2Eoz96gvFE8J9vwKI7fatHLMOQFa4A=
Expand Down
80 changes: 80 additions & 0 deletions exporter/loadbalancingexporter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadbalancingexporter

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
mNumResolutions = stats.Int64("loadbalancer_num_resolutions", "Number of times the resolver triggered a new resolutions", stats.UnitDimensionless)
mNumBackends = stats.Int64("loadbalancer_num_backends", "Current number of backends in use", stats.UnitDimensionless)
mBackendLatency = stats.Int64("loadbalancer_backend_latency", "Response latency in ms for the backends", stats.UnitMilliseconds)
)

// MetricViews return the metrics views according to given telemetry level.
func MetricViews() []*view.View {
return []*view.View{
{
Name: mNumResolutions.Name(),
Measure: mNumResolutions,
Description: mNumResolutions.Description(),
Aggregation: view.Count(),
TagKeys: []tag.Key{
tag.MustNewKey("resolver"),
tag.MustNewKey("success"),
},
},
{
Name: mNumBackends.Name(),
Measure: mNumBackends,
Description: mNumBackends.Description(),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{
tag.MustNewKey("resolver"),
},
},
{
Name: "loadbalancer_num_backend_updates", // counts the number of times the measure was changed
Measure: mNumBackends,
Description: "Number of times the list of backends was updated",
Aggregation: view.Count(),
TagKeys: []tag.Key{
tag.MustNewKey("resolver"),
},
},
{
Name: mBackendLatency.Name(),
Measure: mBackendLatency,
Description: mBackendLatency.Description(),
TagKeys: []tag.Key{
tag.MustNewKey("endpoint"),
},
Aggregation: view.Distribution(0, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000),
},
{
Name: "loadbalancer_backend_outcome",
Measure: mBackendLatency,
Description: "Number of success/failures for each endpoint",
TagKeys: []tag.Key{
tag.MustNewKey("endpoint"),
tag.MustNewKey("success"),
},
Aggregation: view.Count(),
},
}
}
35 changes: 35 additions & 0 deletions exporter/loadbalancingexporter/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadbalancingexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcessorMetrics(t *testing.T) {
expectedViewNames := []string{
"loadbalancer_num_resolutions",
"loadbalancer_num_backends",
"loadbalancer_num_backend_updates",
"loadbalancer_backend_latency",
}

views := MetricViews()
for i, viewName := range expectedViewNames {
assert.Equal(t, viewName, views[i].Name)
}
}
36 changes: 36 additions & 0 deletions exporter/loadbalancingexporter/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadbalancingexporter

import "context"

// resolver determines the contract for sources of backend endpoint information
type resolver interface {
// resolve returns the current list of endpoints.
// returns either a non-nil error and a nil list of endpoints, or a non-nil list of endpoints and nil error.
resolve(context.Context) ([]string, error)

// start signals the resolver to start its work
start(context.Context) error

// shutdown signals the resolver to finish its work. This should block until the current resolutions are finished.
// Once this is invoked, callbacks will not be triggered anymore and will need to be registered again in case the consumer
// decides to restart the resolver.
shutdown(context.Context) error

// onChange registers a function to call back whenever the list of backends is updated.
// Make sure to register the callbacks before starting the exporter.
onChange(func([]string))
}
180 changes: 180 additions & 0 deletions exporter/loadbalancingexporter/resolver_dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadbalancingexporter

import (
"context"
"errors"
"fmt"
"net"
"sort"
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
)

var _ resolver = (*dnsResolver)(nil)

const (
defaultResInterval = 5 * time.Second
defaultResTimeout = time.Second
)

var errNoHostname = errors.New("no hostname specified to resolve the backends")

type dnsResolver struct {
logger *zap.Logger

hostname string
resolver netResolver
resInterval time.Duration
resTimeout time.Duration

endpoints []string
onChangeCallbacks []func([]string)

stopCh chan (struct{})
updateLock sync.Mutex
shutdownWg sync.WaitGroup
changeCallbackLock sync.RWMutex
}

type netResolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
}

func newDNSResolver(logger *zap.Logger, hostname string) (*dnsResolver, error) {
if len(hostname) == 0 {
return nil, errNoHostname
}

return &dnsResolver{
logger: logger,
hostname: hostname,
resolver: &net.Resolver{},
resInterval: defaultResInterval,
resTimeout: defaultResTimeout,
stopCh: make(chan struct{}),
}, nil
}

func (r *dnsResolver) start(ctx context.Context) error {
if _, err := r.resolve(ctx); err != nil {
return err
}

go r.periodicallyResolve()

return nil
}

func (r *dnsResolver) shutdown(ctx context.Context) error {
r.changeCallbackLock.Lock()
r.onChangeCallbacks = nil
r.changeCallbackLock.Unlock()

close(r.stopCh)
r.shutdownWg.Wait()
return nil
}

func (r *dnsResolver) periodicallyResolve() {
ticker := time.NewTicker(r.resInterval)
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), r.resTimeout)
if _, err := r.resolve(ctx); err != nil {
r.logger.Warn("failed to resolve", zap.Error(err))
}
cancel()
case <-r.stopCh:
return
}
}

func (r *dnsResolver) resolve(ctx context.Context) ([]string, error) {
r.shutdownWg.Add(1)
defer r.shutdownWg.Done()

// the context to use for all metrics in this function
mCtx, _ := tag.New(ctx, tag.Upsert(tag.MustNewKey("resolver"), "dns"))

addrs, err := r.resolver.LookupIPAddr(ctx, r.hostname)
if err != nil {
failedCtx, _ := tag.New(mCtx, tag.Upsert(tag.MustNewKey("success"), "false"))
stats.Record(failedCtx, mNumResolutions.M(1))
return nil, err
}

// from this point, we don't fail anymore
successCtx, _ := tag.New(mCtx, tag.Upsert(tag.MustNewKey("success"), "true"))
stats.Record(successCtx, mNumResolutions.M(1))

var backends []string
for _, ip := range addrs {
var backend string
if ip.IP.To4() != nil {
backend = ip.String()
} else {
// it's an IPv6 address
backend = fmt.Sprintf("[%s]", ip.String())
}
backends = append(backends, backend)
}

// keep it always in the same order
sort.Strings(backends)

if equalStringSlice(r.endpoints, backends) {
return r.endpoints, nil
}

// the list has changed!
r.updateLock.Lock()
r.endpoints = backends
r.updateLock.Unlock()
stats.Record(mCtx, mNumBackends.M(int64(len(backends))))

// propagate the change
r.changeCallbackLock.RLock()
for _, callback := range r.onChangeCallbacks {
callback(r.endpoints)
}
r.changeCallbackLock.RUnlock()

return r.endpoints, nil
}

func (r *dnsResolver) onChange(f func([]string)) {
r.changeCallbackLock.Lock()
defer r.changeCallbackLock.Unlock()
r.onChangeCallbacks = append(r.onChangeCallbacks, f)
}

func equalStringSlice(source, candidate []string) bool {
if len(source) != len(candidate) {
return false
}
for i := range source {
if source[i] != candidate[i] {
return false
}
}

return true
}
Loading

0 comments on commit e0204ba

Please sign in to comment.