Skip to content

Commit

Permalink
feature(kcmas): scrape with basic auth
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb authored and waynepeking348 committed Oct 19, 2023
1 parent a5e0e3d commit 4694a8b
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 8 deletions.
4 changes: 2 additions & 2 deletions cmd/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (

const (
healthZPath = "/healthz"
pprofPrefix = "/debug/pprof"
debugPrefix = "/debug"
)

// GenericOptions is used as an extendable way to support
Expand Down Expand Up @@ -152,7 +152,7 @@ func NewGenericContext(
// it will use corev1 event recorder and wrap it with a v1 event recorder adapter.
broadcastAdapter := events.NewEventBroadcasterAdapter(clientSet.KubeClient)

httpHandler := process.NewHTTPHandler(genericConf.GenericEndpointHandleChains, []string{healthZPath, pprofPrefix})
httpHandler := process.NewHTTPHandler(genericConf.GenericEndpointHandleChains, []string{healthZPath, debugPrefix})

// since some authentication implementation needs kcc and kcc only support agent component, so we only enable
// authentication for agent component for now.
Expand Down
6 changes: 6 additions & 0 deletions cmd/katalyst-metric/app/options/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type CollectorOptions struct {
ShardNum int
CollectorName string
CollectInterval time.Duration
CredentialPath string
}

// NewCollectorOptions creates a new CollectorOptions with a default config.
Expand All @@ -50,6 +51,8 @@ func NewCollectorOptions() *CollectorOptions {

PodLabelSelector: labels.Nothing().String(),
NodeLabelSelector: labels.Everything().String(),

CredentialPath: "/etc/katalyst/credential",
}
}

Expand All @@ -69,6 +72,8 @@ func (o *CollectorOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.DurationVar(&o.CollectInterval, "collector-interval", o.CollectInterval, fmt.Sprintf(
"the interval between two collecting actions"))

fs.StringVar(&o.CredentialPath, "credential-path", o.CredentialPath, fmt.Sprintf(
"directory path where credential files should be in"))
}

// ApplyTo fills up config with options
Expand All @@ -89,6 +94,7 @@ func (o *CollectorOptions) ApplyTo(c *metric.CollectorConfiguration) error {
}
c.NodeSelector = nodeSelector

c.CredentialPath = o.CredentialPath
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/config/metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type CollectorConfiguration struct {

// CollectorName is used to switch from different collector implementations.
CollectorName string

// CredentialPath is the path where the credential files should be in. Which and how many files should be in it
// depends on the authentication method. For now, we only support basic auth,so there should be two files with name
// username and password.
CredentialPath string
}

func NewCollectorConfiguration() *CollectorConfiguration {
Expand Down
51 changes: 49 additions & 2 deletions pkg/custom-metric/collector/prometheus/collector_promethes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"net/http"
"path"
"sync"
"time"

Expand Down Expand Up @@ -59,6 +60,9 @@ const (
metricNamePromCollectorStoreReqCount = "kcmas_collector_store_req_cnt"
metricNamePromCollectorStoreItemCount = "kcmas_collector_store_item_cnt"
metricNamePromCollectorStoreLatency = "kcmas_collector_store_latency"

fileNameUsername = "username"
fileNamePassword = "password"
)

// prometheusCollector implements MetricCollector using self-defined parser functionality
Expand All @@ -70,7 +74,10 @@ type prometheusCollector struct {
collectConf *metric.CollectorConfiguration
genericConf *metric.GenericMetricConfiguration

client *http.Client
client *http.Client
username string
password string

emitter metrics.MetricEmitter
metricStore store.MetricStore

Expand Down Expand Up @@ -98,6 +105,8 @@ func NewPrometheusCollector(ctx context.Context, baseCtx *katalystbase.GenericCo
return nil, fmt.Errorf("creating HTTP client failed: %v", err)
}

username, password := extractCredential(collectConf.CredentialPath)

// since collector will define its own pod/node label selectors, so we will construct informer separately
klog.Infof("enabled with pod selector: %v, node selector: %v", collectConf.PodSelector.String(), collectConf.NodeSelector.String())
podFactory := informers.NewSharedInformerFactoryWithOptions(baseCtx.Client.KubeClient, time.Hour*24,
Expand Down Expand Up @@ -125,6 +134,8 @@ func NewPrometheusCollector(ctx context.Context, baseCtx *katalystbase.GenericCo
nodeInformer.Informer().HasSynced,
},
client: client,
username: username,
password: password,
emitter: baseCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags("prom_collector"),
scrapes: make(map[string]*ScrapeManager),
syncSuccess: false,
Expand Down Expand Up @@ -325,7 +336,8 @@ func (p *prometheusCollector) addRequest(pod *v1.Pod) {

// todo all ScrapeManager will share the same http connection now,
// reconsider whether it's reasonable in production
s, err := NewScrapeManager(p.ctx, p.genericConf.OutOfDataPeriod, p.client, pod.Spec.NodeName, targetURL, p.emitter)
s, err := NewScrapeManager(p.ctx, p.genericConf.OutOfDataPeriod, p.client, pod.Spec.NodeName, targetURL,
p.emitter, p.username, p.password)
if err != nil {
klog.Errorf("failed to new http.Request: %v", err)
return
Expand Down Expand Up @@ -425,3 +437,38 @@ func (p *prometheusCollector) sync() {
{Key: "type", Val: "failed"},
}...)
}

// extractCredential get username and password from the credential directory
func extractCredential(credentialDir string) (string, string) {
usernameFilePath := path.Join(credentialDir, fileNameUsername)
username, usernameErr := extractCredentialFile(usernameFilePath)
if usernameErr != nil {
general.Warningf("get username failed, err:%v", usernameErr)
return "", ""
}

passwordFilePath := path.Join(credentialDir, fileNamePassword)
password, passwordErr := extractCredentialFile(passwordFilePath)
if passwordErr != nil {
general.Warningf("get password failed, err:%v", passwordErr)
return "", ""
}

return username, password
}

func extractCredentialFile(filePath string) (string, error) {
FileExists := general.IsPathExists(filePath)
if !FileExists {
return "", fmt.Errorf("file %v does not exist", filePath)
}

lines, err := general.ReadFileIntoLines(filePath)
if err != nil {
return "", fmt.Errorf("read username file failed, err:%v", err)
}
if len(lines) != 1 {
return "", fmt.Errorf("username is more than 1 line which is unexpected")
}
return lines[0], nil
}
47 changes: 45 additions & 2 deletions pkg/custom-metric/collector/prometheus/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package prometheus

import (
"bufio"
"context"
"net/http"
"net/http/httptest"
"os"
"path"
"strconv"
"strings"
"testing"
Expand All @@ -33,9 +36,43 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config/metric"
metricconf "github.com/kubewharf/katalyst-core/pkg/config/metric"
"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/local"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

var (
credentialPath = "/tmp/katalyst-ut/credential"
username = "katalyst"
password = "password"
)

func setupCredential(credentialPath string) error {
err := general.EnsureDirectory(credentialPath)
if err != nil {
return err
}

usernameFile, err := os.OpenFile(path.Join(credentialPath, fileNameUsername), os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return err
}
defer usernameFile.Close()
usernameWriter := bufio.NewWriter(usernameFile)
_, _ = usernameWriter.WriteString(username)
_ = usernameWriter.Flush()

passwordFile, err := os.OpenFile(path.Join(credentialPath, fileNamePassword), os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return err
}
defer passwordFile.Close()
passwordWriter := bufio.NewWriter(passwordFile)
_, _ = passwordWriter.WriteString(password)
_ = passwordWriter.Flush()

return nil
}

func TestPrometheusAddRequests(t *testing.T) {
t.Parallel()

Expand All @@ -45,18 +82,24 @@ func TestPrometheusAddRequests(t *testing.T) {
}))
defer server.Close()

err2 := setupCredential(credentialPath)
assert.NoError(t, err2)

baseCtx, _ := katalystbase.GenerateFakeGenericContext(nil, nil, nil, nil)
genericConf := &metricconf.GenericMetricConfiguration{}
collectConf := &metric.CollectorConfiguration{
PodSelector: labels.NewSelector(),
NodeSelector: labels.NewSelector(),
PodSelector: labels.NewSelector(),
NodeSelector: labels.NewSelector(),
CredentialPath: credentialPath,
}
storeConf := &metricconf.StoreConfiguration{}
localStore, _ := local.NewLocalMemoryMetricStore(ctx, baseCtx, genericConf, storeConf)

promCollector, err := NewPrometheusCollector(ctx, baseCtx, genericConf, collectConf, localStore)
assert.NoError(t, err)
promCollector.(*prometheusCollector).client, _ = newPrometheusClient()
assert.Equal(t, promCollector.(*prometheusCollector).username, username)
assert.Equal(t, promCollector.(*prometheusCollector).password, password)

hostAndPort := strings.Split(strings.TrimPrefix(server.URL, "http://"), ":")
assert.Equal(t, 2, len(hostAndPort))
Expand Down
3 changes: 2 additions & 1 deletion pkg/custom-metric/collector/prometheus/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type ScrapeManager struct {
metricTags []metrics.MetricTag
}

func NewScrapeManager(ctx context.Context, outOfDataPeriod time.Duration, client *http.Client, node, url string, emitter metrics.MetricEmitter) (*ScrapeManager, error) {
func NewScrapeManager(ctx context.Context, outOfDataPeriod time.Duration, client *http.Client, node, url string, emitter metrics.MetricEmitter, username, password string) (*ScrapeManager, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
Expand All @@ -79,6 +79,7 @@ func NewScrapeManager(ctx context.Context, outOfDataPeriod time.Duration, client
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", httpUserAgent)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(60, 'f', -1, 64))
req.SetBasicAuth(username, password)

sCtx, cancel := context.WithCancel(ctx)
return &ScrapeManager{
Expand Down
2 changes: 1 addition & 1 deletion pkg/custom-metric/collector/prometheus/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ without_timestamp{label_test="without_timestamp",namespace="n1",object="pod",obj
defer server.Close()

client, _ := newPrometheusClient()
s, _ := NewScrapeManager(ctx, time.Hour, client, "fake-node", server.URL, metrics.DummyMetrics{})
s, _ := NewScrapeManager(ctx, time.Hour, client, "fake-node", server.URL, metrics.DummyMetrics{}, "", "")
// to make sure the metric will only be collected once
s.scrape()
time.Sleep(time.Millisecond * 300)
Expand Down

0 comments on commit 4694a8b

Please sign in to comment.