Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make federate sharding possible in metrics-collector by using workers #1628

Merged
merged 5 commits into from
Sep 23, 2024

Conversation

saswatamcode
Copy link
Member

@saswatamcode saswatamcode commented Sep 15, 2024

Attempting to implement federate sharding by sharding the match rules (both passed as file and flag) between a number of workers.

If the code looks messy, that is because it is, there is a lot of unused and scattered logic throughout metrics-collector. I have tried to refactor a lot of this

@saswatamcode
Copy link
Member Author

/retest

@@ -44,6 +44,14 @@ type ObservabilityAddonSpec struct {
// +kubebuilder:validation:Maximum=3600
Interval int32 `json:"interval,omitempty"`

// Workers is the number of workers that work parallelly to push metrics to hub server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Workers is the number of workers that work in parallel ... sounds more natural than the way this is written

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
…ve unused recording-file and collect-file flags

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
…nd collectrules, also removed sort-of-defunct last metrics and reload handling

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
…st only tests for reconfiguring, which again, is not a critical function of metrics-collector

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
var statusClient client.Client

// TODO(saswatamcode): Evaluate better way for status reporting.
// Currently, it reports status for every 3 errors in forward request. This is too much of an overhead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there is much to win here, given that it will be replaced by MCOA. In my view, it's not worth the effort. It's good enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, MCOA would still use metrics-collector, right? Or are you saying we won't be reporting status from here in MCOA anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics collector should be replaced by the prometheusAgent. Nothing is set in stone though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics collector should be replaced by the prometheusAgent. Nothing is set in stone though.

Yup me and @moadz discussed this, but I don't think it would be feasible yet, as you can't downsample, so would end up sending all raw 30s scrape interval data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope - we would federate every 5 mins in PrometheusAgent, and then remote-write on every successful federate. So it is feasible in the same way that it works for metrics-collector currnetly (federate interval driven push).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah in that case it is, but still promagent would end up doing single large /federate call

"id",
opt.Identifier,
"The unique identifier for metrics sent with this client.")
cmd.Flags().BoolVar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose for exposing these flags?

Copy link
Member Author

@saswatamcode saswatamcode Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e2e tests in the future without Kube basically. The main function of metrics-collector is to federate and remote write. It ideally shouldn't be doing kube API calls. I already have one like so, but to add it here, would need some go.mod changes which we can tackle in later PRs.

package e2e_test

import (
	"fmt"
	"os"
	"strconv"
	"testing"
	"time"

	"github.com/efficientgo/e2e"
	e2edb "github.com/efficientgo/e2e/db"
	e2einteractive "github.com/efficientgo/e2e/interactive"
	e2emon "github.com/efficientgo/e2e/monitoring"
	e2eobs "github.com/efficientgo/e2e/observable"
	"github.com/thanos-io/thanos/test/e2e/e2ethanos"

	"github.com/efficientgo/core/backoff"

	"github.com/efficientgo/core/testutil"
)

func TestMetricsCollectorE2E(t *testing.T) {
	e, err := e2e.NewDockerEnvironment("metrics-col")
	testutil.Ok(t, err)
	t.Cleanup(e.Close)

	m, err := e2emon.Start(e)
	testutil.Ok(t, err)

	prom := e2edb.NewPrometheus(e, "prom2")
	receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithImage("quay.io/thanos/thanos:v0.35.1").WithIngestionEnabled().Init()

	testutil.Ok(t, e2e.StartAndWaitReady(prom, receive1))

	collector := NewMetricsCollector(e, "collector-1", MetricsCollectorOptions{
		From:     prom.InternalEndpoint("http"),
		To:       receive1.InternalEndpoint("remote-write"),
		Matchers: []string{`{__name__="up"}`, `{__name__="go_gc_duration_seconds"}`, `{__name__="go_goroutines"}`, `{__name__="go_memstats_alloc_bytes"}`, `{__name__="go_memstats_heap_alloc_bytes"}`, `{__name__="go_memstats_heap_idle_bytes"}`, `{__name__="go_memstats_heap_inuse_bytes"}`, `{__name__="go_memstats_heap_objects"}`, `{__name__="go_memstats_heap_released_bytes"}`, `{__name__="go_memstats_heap_sys_bytes"}`, `{__name__="go_memstats_last_gc_time_seconds"}`, `{__name__="go_memstats_lookups_total"}`, `{__name__="go_memstats_mallocs_total"}`, `{__name__="go_memstats_sys_bytes"}`, `{__name__="go_threads"}`, `{__name__="process_cpu_seconds_total"}`, `{__name__="process_open_fds"}`, `{__name__="process_resident_memory_bytes"}`, `{__name__="process_start_time_seconds"}`, `{__name__="process_virtual_memory_bytes"}`},
		// Matchers: []string{`{__name__="up"}`},
		LogLevel: "info",
		Interval: "2s",
		Workers:  4,
	})

	testutil.Ok(t, e2e.StartAndWaitReady(collector))

	time.Sleep(10 * time.Second)

	testutil.Ok(t, receive1.WaitSumMetricsWithOptions(e2emon.GreaterOrEqual(10), []string{"prometheus_tsdb_head_series"}, e2emon.WaitMissingMetrics()))

	query1 := e2edb.NewThanosQuerier(
		e,
		"query1",
		[]string{
			receive1.InternalEndpoint("grpc"),
		},
		e2edb.WithImage("quay.io/thanos/thanos:v0.35.1"),
	)
	testutil.Ok(t, e2e.StartAndWaitReady(query1))
	testutil.Ok(t, query1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))
	testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s", prom.Endpoint("http"))))
	testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s", query1.Endpoint("http"))))
	testutil.Ok(t, m.OpenUserInterfaceInBrowser())
	testutil.Ok(t, e2einteractive.RunUntilEndpointHit())

}

type MetricsCollectorOptions struct {
	From     string
	To       string
	Matchers []string
	LogLevel string
	Interval string
	Workers  int
}

func NewMetricsCollector(e e2e.Environment, name string, o MetricsCollectorOptions) *e2eobs.Observable {
	f := e.Runnable(name).WithPorts(map[string]int{"metrics": 9002}).Future()

	args := e2e.BuildArgs(map[string]string{
		"--from":                     "http://" + o.From,
		"--to-upload":                "http://" + o.To + "/api/v1/receive",
		"--log-level":                o.LogLevel,
		"--interval":                 o.Interval,
		"--disable-status-reporting": "true",
		"--disable-hypershift":       "true",
		"--worker-number":            strconv.Itoa(o.Workers),
	})

	for _, matcher := range o.Matchers {
		args = append(args, "--match="+matcher)
	}

	return e2eobs.AsObservable(f.Init(wrapWithDefaults(e2e.StartOptions{
		Image:   "example.com/metrics-collector:v0.0.1",
		Command: e2e.NewCommandWithoutEntrypoint("metrics-collector", args...),
	})), "metrics")
}

var defaultBackoffConfig = backoff.Config{
	Min:        300 * time.Millisecond,
	Max:        600 * time.Millisecond,
	MaxRetries: 50,
}

func wrapWithDefaults(opt e2e.StartOptions) e2e.StartOptions {
	if opt.User == "" {
		opt.User = strconv.Itoa(os.Getuid())
	}
	if opt.WaitReadyBackoff == nil {
		opt.WaitReadyBackoff = &defaultBackoffConfig
	}
	return opt
}

Copy link
Contributor

@thibaultmg thibaultmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Made some comments but no blocker.

/lgtm

@saswatamcode
Copy link
Member Author

/hold

Holding for a bit so others can take a look

Copy link
Contributor

@moadz moadz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - awesome work Saswata!

I don't, however, think it's appropriate to merge this day before code-freeze. Feels like there is substantial testing risk associated with this change and I wouldn't want to compromise the GA date. We can discuss this further in sync/offline.

@@ -57,7 +57,7 @@ func main() {
&opt.WorkerNum,
"worker-number",
opt.WorkerNum,
"The number of client runs in the simulate environment.")
"The number of workers that work parallelly to federate and remote write metrics.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The number of workers that work parallelly to federate and remote write metrics.")
"The number of workers that work in parallel to federate and remote write metrics.")

}
for i, shardWorker := range shardWorkers {
go func(i int, shardWorker *forwarder.Worker) {
fmt.Printf("Starting shard worker %d\n", i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Logger instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes missed that one

var statusClient client.Client

// TODO(saswatamcode): Evaluate better way for status reporting.
// Currently, it reports status for every 3 errors in forward request. This is too much of an overhead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope - we would federate every 5 mins in PrometheusAgent, and then remote-write on every successful federate. So it is feasible in the same way that it works for metrics-collector currnetly (federate interval driven push).

if err != nil {
logger.Log(o.Logger, logger.Error, "msg", "failed to close listener", "err", err)
}
})
}
}

err = runMultiWorkers(o, cfg)
// Run the simulation agent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this comment - what are we simulating here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perfscale simulations I believe, nothing I added, it existed originally.
https://github.com/stolostron/multicluster-observability-operator/tree/main/tools/simulator/metrics-collector

We really should drop this

}

func initConfig(o *Options) (error, *forwarder.Config) {
// Agent is the type of the worker agent that will be running.
// They are classed according to what they collect.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// They are classed according to what they collect.
// They are classified according to what they collect.

},
err: false,
},
}

for i := range tc {
tc[i].c.Metrics = NewWorkerMetrics(prometheus.NewRegistry())
if tc[i].c.ToUploadCA == customCA {
if i == 10 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think checking if customCA is defined is probably more robust. Relying on test-case ordering seems fishy to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was set in kind of wrong way tbh, but got handled using a else branch that checks for the UNIT_TEST var directly in business logic, which is even more fishy. I don't think we are passing it as upload CA exactly.

I can instead just edit test struct instead

@@ -214,75 +246,3 @@ func TestReconfigure(t *testing.T) {
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the TestRun is more of an integration test as opposed to a unit test. Has it been moved elsewhere (i'm going file by file in the review ignore if this is mentioned later)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the feature we dropped basically which is worker reconfigure. This test was testing exactly that

@@ -22,7 +22,7 @@ func TestDefaultTransport(t *testing.T) {
TLSHandshakeTimeout: 10 * time.Second,
DisableKeepAlives: true,
}
http := DefaultTransport(logger, true)
http := DefaultTransport(logger)
if http.Dial == nil || reflect.TypeOf(http) != reflect.TypeOf(want) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http.Dial is deprecated, http.DialContext instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, will update, this was old code

@@ -93,3 +69,9 @@ func (s *StatusReport) UpdateStatus(ctx context.Context, reason status.Reason, m

return nil
}

type NoopReporter struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forwarder.go we use noop reporter by default - why the need for a status reporter at all there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forwarder is in charge of federate and remote writing that federate. So that is where we report status.

Earlier it always reported status using kubeAPI, but for local testing might not need that. So I made the status reporting kind of optional.

// +optional
// +kubebuilder:default:=1
// +kubebuilder:validation:Minimum=1
Workers int32 `json:"workers,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is it clear on the addon spec that workers corresponds to prometheus federate workers and not something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a better description

Copy link
Contributor

@moadz moadz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending commnets and the hold

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
@openshift-ci openshift-ci bot removed the lgtm label Sep 23, 2024
Copy link

sonarcloud bot commented Sep 23, 2024

Quality Gate Failed Quality Gate failed

Failed conditions
39.5% Coverage on New Code (required ≥ 70%)
E Security Rating on New Code (required ≥ A)

See analysis details on SonarCloud

Catch issues before they fail your Quality Gate with our IDE extension SonarLint

Copy link

openshift-ci bot commented Sep 23, 2024

@saswatamcode: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
ci/prow/sonarcloud 278ccf3 link false /test sonarcloud

Full PR test history. Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@saswatamcode
Copy link
Member Author

/hold cancel

Copy link
Contributor

@philipgough philipgough left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

Copy link

openshift-ci bot commented Sep 23, 2024

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: moadz, philipgough, saswatamcode, thibaultmg

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:
  • OWNERS [moadz,philipgough,saswatamcode,thibaultmg]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@saswatamcode saswatamcode merged commit 4fdfb99 into stolostron:main Sep 23, 2024
19 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants