Skip to content

Commit

Permalink
Improvement for #5 and #12 (#13)
Browse files Browse the repository at this point in the history
* resolve issue #5 and #12

* remove redundant empty lines and fix some comments

* Update main.go

sort library imports

* Address review comments
Refactor code, move metricsprovider to internal

Signed-off-by: Abdul Qadeer <aqadeer@paypal.com>

* Update copyright in Makefile

* Delete manifests directory

Signed-off-by: Abdul Qadeer <aqadeer@paypal.com>

Co-authored-by: Abdul Qadeer <aqadeer@paypal.com>
  • Loading branch information
wangchen615 and Abdul Qadeer authored Feb 22, 2021
1 parent d3ea39d commit fd8dfd7
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 155 deletions.
12 changes: 9 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
FROM golang:1.5-alpine
FROM golang:1.15.5

ADD ./load-watcher /usr/local/bin/load-watcher
WORKDIR /go/src/github.com/paypal/load-watcher
COPY . .
RUN make build

CMD ["/usr/local/bin/load-watcher"]
FROM alpine:3.12

COPY --from=0 /go/src/github.com/paypal/load-watcher/bin/load-watcher /bin/load-watcher

CMD ["/bin/load-watcher"]
27 changes: 27 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2021 PayPal
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

COMMONENVVAR=GOOS=$(shell uname -s | tr A-Z a-z) GOARCH=$(subst x86_64,amd64,$(patsubst i%86,386,$(shell uname -m)))
BUILDENVVAR=CGO_ENABLED=0

.PHONY: all
all: build

.PHONY: build
build:
$(COMMONENVVAR) $(BUILDENVVAR) go build -o bin/load-watcher main.go

.PHONY: clean
clean:
rm -rf ./bin
25 changes: 17 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,20 @@ GET /watcher

This will return metrics for all nodes. A query parameter to filter by host can be added with `host`.

## Client Configuration
- To use the Kubernetes metric server client out of a cluster, please configure your `KUBE_CONFIG` environment varirables to your
kubernetes client configuration file path.

- To use the prometheus client out of a cluster, please configure `PROM_HOST` and `PROM_TOKEN` environment variables to
your Prometheus endpoint and token. Please ignore `PROM_TOKEN` as empty string if no authentication is needed to access
the Prometheus APIs. When using the prometheus in a cluster, the default endpoint is `prometheus-k8s:9090`. You need to
configure `PROM_HOST` if your Prometheus endpoint is different.
## Metrics Provider Configuration
- By default Kubernetes Metrics Server client is configured. Set `KUBE_CONFIG` env var to your kubernetes client configuration file path if running out of cluster.

- To use the Prometheus client, please configure environment variables `METRICS_PROVIDER_NAME`, `METRICS_PROVIDER_ADDRESS` and `METRICS_PROVIDER_TOKEN` to `Prometheus`, Prometheus address and auth token. Please do not set `METRICS_PROVIDER_TOKEN` if no authentication
is needed to access the Prometheus APIs. Default value of address set is `http://prometheus-k8s:9090` for Prometheus client.

- To use the SignalFx client, please configure `METRICS_PROVIDER_NAME`, `METRICS_PROVIDER_ADDRESS` and `METRICS_PROVIDER_TOKEN` to `SignalFx`, SignalFx address and auth token respectively. Default value of address set is `https://api.signalfx.com` for Prometheus client.

## Deploy `load-watcher` as a service
To deploy `load-watcher` as a monitoring service in your Kubernetes cluster, you can run the following.
```bash
> kubectl create -f manifests/load-watcher-deployment.yaml
```

## Using `load-watcher` client
- `load-watcher-client.go` shows an example to use `load-watcher` packages as libraries in a client mode. When `load-watcher` is running as a
service exposing an endpoint in a cluster, a client, such as Trimaran plugins, can use its libraries to create a client getting the latest metrics.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ require (
github.com/stretchr/testify v1.5.1
k8s.io/apimachinery v0.19.0
k8s.io/client-go v0.19.0
k8s.io/klog/v2 v2.2.0
k8s.io/metrics v0.19.0
)
21 changes: 12 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ limitations under the License.
package main

import (
"log"

"github.com/paypal/load-watcher/pkg/metricsprovider"
"github.com/paypal/load-watcher/pkg/watcher"
"github.com/paypal/load-watcher/pkg/watcher/api"
log "github.com/sirupsen/logrus"
)

func main() {
// client, err := metricsprovider.NewMetricsServerClient()
client, err := metricsprovider.NewPromClient()
client, err := api.NewLibraryClient(watcher.EnvMetricProviderOpts)
if err != nil {
log.Fatalf("unable to create client: %v", err)
}
metrics, err := client.GetLatestWatcherMetrics()
if err != nil {
log.Fatalf("unable to create new client: %v", err)
log.Errorf("unable to get watcher metrics: %v", err)
}
w := watcher.NewWatcher(client)
w.StartWatching()
log.Infof("received metrics: %v", metrics)

// Keep the watcher server up
select {}
}
}
20 changes: 9 additions & 11 deletions pkg/watcher/fetcher.go → pkg/watcher/api/api.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 PayPal
Copyright 2021 PayPal
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,14 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package watcher
package api

// Interface to be implemented by any metrics provider client to interact with Watcher
type FetcherClient interface {
// Return the client name
Name() string
// Fetch metrics for given host
FetchHostMetrics(host string, window *Window) ([]Metric, error)
// Fetch metrics for all hosts
FetchAllHostsMetrics(window *Window) (map[string][]Metric, error)
}
import "github.com/paypal/load-watcher/pkg/watcher"

// Watcher Client API
type Client interface {
// Returns latest metrics present in load Watcher cache
GetLatestWatcherMetrics() (*watcher.WatcherMetrics, error)
}
110 changes: 110 additions & 0 deletions pkg/watcher/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2021 PayPal
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"net/http"
"time"

"github.com/francoispqt/gojay"
"github.com/paypal/load-watcher/pkg/watcher"
"github.com/paypal/load-watcher/pkg/watcher/internal/metricsprovider"

"k8s.io/klog/v2"
)

const (
httpClientTimeoutSeconds = 55 * time.Second
)

// Client for Watcher APIs as a library
type libraryClient struct {
fetcherClient watcher.MetricsProviderClient
watcher *watcher.Watcher
}

// Client for Watcher APIs as a service
type serviceClient struct {
httpClient http.Client
watcherAddress string
}

// Creates a new watcher client when using watcher as a library
func NewLibraryClient(opts watcher.MetricsProviderOpts) (Client, error) {
var err error
client := libraryClient{}
switch opts.Name {
case watcher.PromClientName:
client.fetcherClient, err = metricsprovider.NewPromClient(opts)
case watcher.SignalFxClientName:
client.fetcherClient, err = metricsprovider.NewSignalFxClient(opts)
default:
client.fetcherClient, err = metricsprovider.NewMetricsServerClient()
}
if err != nil {
return client, err
}
client.watcher = watcher.NewWatcher(client.fetcherClient)
client.watcher.StartWatching()
return client, nil
}

// Creates a new watcher client when using watcher as a service
func NewServiceClient(watcherAddress string) (Client, error) {
return serviceClient{
httpClient: http.Client{
Timeout: httpClientTimeoutSeconds,
},
watcherAddress: watcherAddress,
}, nil
}

func (c libraryClient) GetLatestWatcherMetrics() (*watcher.WatcherMetrics, error) {
return c.watcher.GetLatestWatcherMetrics(watcher.FifteenMinutes)
}

func (c serviceClient) GetLatestWatcherMetrics() (*watcher.WatcherMetrics, error) {
req, err := http.NewRequest(http.MethodGet, c.watcherAddress+watcher.BaseUrl, nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")

//TODO(aqadeer): Add a couple of retries for transient errors
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
klog.V(6).Infof("received status code %v from watcher", resp.StatusCode)
if resp.StatusCode == http.StatusOK {
data := watcher.Data{NodeMetricsMap: make(map[string]watcher.NodeMetrics)}
metrics := watcher.WatcherMetrics{Data: data}
dec := gojay.BorrowDecoder(resp.Body)
defer dec.Release()
err = dec.Decode(&metrics)
if err != nil {
klog.Errorf("unable to decode watcher metrics: %v", err)
return nil, err
} else {
return &metrics, nil
}
} else {
klog.Errorf("received status code %v from watcher", resp.StatusCode)
}
return nil, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var (
)

const (
k8sClientName = "k8s"
// env variable that provides path to kube config file, if deploying from outside K8s cluster
kubeConfig = "KUBE_CONFIG"
)
Expand All @@ -54,10 +53,10 @@ type metricsServerClient struct {
// This client fetches node metrics from metric server
metricsClientSet *metricsv.Clientset
// This client fetches node capacity
coreClientSet *kubernetes.Clientset
coreClientSet *kubernetes.Clientset
}

func NewMetricsServerClient() (watcher.FetcherClient, error) {
func NewMetricsServerClient() (watcher.MetricsProviderClient, error) {
var config *rest.Config
var err error
kubeConfig := ""
Expand All @@ -80,7 +79,7 @@ func NewMetricsServerClient() (watcher.FetcherClient, error) {
}

func (m metricsServerClient) Name() string {
return k8sClientName
return watcher.K8sClientName
}

func (m metricsServerClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) {
Expand All @@ -90,14 +89,24 @@ func (m metricsServerClient) FetchHostMetrics(host string, window *watcher.Windo
if err != nil {
return metrics, err
}
var fetchedMetric watcher.Metric
var cpuFetchedMetric watcher.Metric
var memFetchedMetric watcher.Metric
node, err := m.coreClientSet.CoreV1().Nodes().Get(context.Background(), host, metav1.GetOptions{})
if err != nil {
return metrics, err
}
fetchedMetric.Value = float64(100*nodeMetrics.Usage.Cpu().MilliValue()) / float64(node.Status.Capacity.Cpu().MilliValue())
fetchedMetric.Type = watcher.CPU
metrics = append(metrics, fetchedMetric)

// Added CPU latest metric
cpuFetchedMetric.Value = float64(100*nodeMetrics.Usage.Cpu().MilliValue()) / float64(node.Status.Capacity.Cpu().MilliValue())
cpuFetchedMetric.Type = watcher.CPU
cpuFetchedMetric.Operator = watcher.Latest
metrics = append(metrics, cpuFetchedMetric)

// Added Memory latest metric
memFetchedMetric.Value = float64(100*nodeMetrics.Usage.Memory().Value()) / float64(node.Status.Capacity.Memory().Value())
memFetchedMetric.Type = watcher.Memory
memFetchedMetric.Operator = watcher.Latest
metrics = append(metrics, memFetchedMetric)
return metrics, nil
}

Expand All @@ -112,19 +121,35 @@ func (m metricsServerClient) FetchAllHostsMetrics(window *watcher.Window) (map[s
if err != nil {
return metrics, err
}
nodeCapacityMap := make(map[string]int64)

cpuNodeCapacityMap := make(map[string]int64)
memNodeCPUCapacityMap := make(map[string]int64)
for _, host := range nodeList.Items {
nodeCapacityMap[host.Name] = host.Status.Capacity.Cpu().MilliValue()
cpuNodeCapacityMap[host.Name] = host.Status.Capacity.Cpu().MilliValue()
memNodeCPUCapacityMap[host.Name] = host.Status.Capacity.Memory().Value()
}
for _, host := range nodeMetricsList.Items {
var fetchedMetric watcher.Metric
fetchedMetric.Type = watcher.CPU
if _, ok := nodeCapacityMap[host.Name]; !ok {
log.Errorf("unable to find host %v in node list", host.Name)
var cpuFetchedMetric watcher.Metric
cpuFetchedMetric.Type = watcher.CPU
cpuFetchedMetric.Operator = watcher.Latest
if _, ok := cpuNodeCapacityMap[host.Name]; !ok {
log.Errorf("unable to find host %v in node list caching cpu capacity", host.Name)
continue
}

cpuFetchedMetric.Value = float64(100*host.Usage.Cpu().MilliValue()) / float64(cpuNodeCapacityMap[host.Name])
metrics[host.Name] = append(metrics[host.Name], cpuFetchedMetric)

var memFetchedMetric watcher.Metric
memFetchedMetric.Type = watcher.Memory
memFetchedMetric.Operator = watcher.Latest
if _, ok := memNodeCPUCapacityMap[host.Name]; !ok {
log.Errorf("unable to find host %v in node list caching memory capacity", host.Name)
continue
}
fetchedMetric.Value = float64(100*host.Usage.Cpu().MilliValue()) / float64(nodeCapacityMap[host.Name])
metrics[host.Name] = append(metrics[host.Name], fetchedMetric)
memFetchedMetric.Value = float64(100*host.Usage.Memory().Value()) / float64(memNodeCPUCapacityMap[host.Name])
metrics[host.Name] = append(metrics[host.Name], memFetchedMetric)
}

return metrics, nil
}
}
Loading

0 comments on commit fd8dfd7

Please sign in to comment.