Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add allocation_fallback_strategy option as fallback strategy for per-node strategy #3482

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
12e1282
Add least-weighted as fallback strategy for per-node strategy
lhp-nemlig Nov 21, 2024
b7649be
Add changelog file
lhp-nemlig Nov 21, 2024
31982b7
Change fallback strategy to consistent-hashing
lhp-nemlig Nov 21, 2024
92d0069
Update changelog
lhp-nemlig Nov 21, 2024
381222c
Fix bad test condition that might pass even if target was not assigned
lhp-nemlig Nov 21, 2024
ee6e856
Make fallback strategy a config option
lhp-nemlig Nov 21, 2024
ef2d720
Update changelog
lhp-nemlig Nov 21, 2024
ea6e956
Merge branch 'main' into add_fallback_strategy_for_per_node_strategy
lhp-nemlig Nov 21, 2024
6f757c3
Add period to test comments
lhp-nemlig Nov 21, 2024
e62f7d2
Merge branch 'main' into add_fallback_strategy_for_per_node_strategy
lhp-nemlig Nov 22, 2024
7711a32
Add feature gate for enabling fallback strategy
lhp-nemlig Nov 22, 2024
e1207cd
Fix featuregate id
lhp-nemlig Nov 22, 2024
7eabd10
Update cmd/otel-allocator/allocation/per_node_test.go
lhp-nemlig Nov 22, 2024
6be618c
Update cmd/otel-allocator/allocation/per_node_test.go
lhp-nemlig Nov 22, 2024
7d32f7e
Update cmd/otel-allocator/allocation/per_node_test.go
lhp-nemlig Nov 22, 2024
0254654
Only add fallbackstrategy if nonempty
lhp-nemlig Nov 22, 2024
a5b8dca
Remove unnecessary comments
lhp-nemlig Nov 22, 2024
d6435e3
Add unit test for fallbackstrategy feature gate
lhp-nemlig Nov 22, 2024
0f878f0
Update changelog
lhp-nemlig Nov 22, 2024
bf5a367
Merge branch 'main' into add_fallback_strategy_for_per_node_strategy
lhp-nemlig Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .chloggen/add_fallback_strategy_for_per_node_strategy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added allocation_fallback_strategy option as fallback strategy for per-node allocation strategy

# One or more tracking issues related to the change
issues: [3477]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
If using per-node allocation strategy, targets that are not attached to a node will not
be allocated. As the per-node strategy is required when running as a daemonset, it is
not possible to assign some targets under a daemonset deployment
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 5 additions & 0 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (a *allocator) SetFilter(filter Filter) {
a.filter = filter
}

// SetFallbackStrategy sets the fallback strategy to use.
func (a *allocator) SetFallbackStrategy(strategy Strategy) {
a.strategy.SetFallbackStrategy(strategy)
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
Expand Down
2 changes: 2 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,5 @@ func (s *consistentHashingStrategy) SetCollectors(collectors map[string]*Collect
s.consistentHasher = consistent.New(members, s.config)

}

func (s *consistentHashingStrategy) SetFallbackStrategy(fallbackStrategy Strategy) {}
2 changes: 2 additions & 0 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ func (s *leastWeightedStrategy) GetCollectorForTarget(collectors map[string]*Col
}

func (s *leastWeightedStrategy) SetCollectors(_ map[string]*Collector) {}

func (s *leastWeightedStrategy) SetFallbackStrategy(fallbackStrategy Strategy) {}
18 changes: 16 additions & 2 deletions cmd/otel-allocator/allocation/per_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,31 @@ const perNodeStrategyName = "per-node"
var _ Strategy = &perNodeStrategy{}

type perNodeStrategy struct {
collectorByNode map[string]*Collector
collectorByNode map[string]*Collector
fallbackStrategy Strategy
}

func newPerNodeStrategy() Strategy {
return &perNodeStrategy{
collectorByNode: make(map[string]*Collector),
collectorByNode: make(map[string]*Collector),
fallbackStrategy: nil,
}
}

func (s *perNodeStrategy) SetFallbackStrategy(fallbackStrategy Strategy) {
s.fallbackStrategy = fallbackStrategy
}

func (s *perNodeStrategy) GetName() string {
return perNodeStrategyName
}

func (s *perNodeStrategy) GetCollectorForTarget(collectors map[string]*Collector, item *target.Item) (*Collector, error) {
targetNodeName := item.GetNodeName()
if targetNodeName == "" && s.fallbackStrategy != nil {
return s.fallbackStrategy.GetCollectorForTarget(collectors, item)
}

collector, ok := s.collectorByNode[targetNodeName]
if !ok {
return nil, fmt.Errorf("could not find collector for node %s", targetNodeName)
Expand All @@ -54,4 +64,8 @@ func (s *perNodeStrategy) SetCollectors(collectors map[string]*Collector) {
s.collectorByNode[collector.NodeName] = collector
}
}

if s.fallbackStrategy != nil {
s.fallbackStrategy.SetCollectors(collectors)
}
}
84 changes: 83 additions & 1 deletion cmd/otel-allocator/allocation/per_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,17 @@ import (

var loggerPerNode = logf.Log.WithName("unit-tests")

// Tests that two targets with the same target url and job name but different label set are both added.
func GetTargetsWithNodeName(targets []*target.Item) (targetsWithNodeName []*target.Item) {
for _, item := range targets {
if item.GetNodeName() != "" {
targetsWithNodeName = append(targetsWithNodeName, item)
}
}
return targetsWithNodeName
}

// Tests that four targets, with one of them lacking node labels, are assigned except for the
// target that lacks node labels.
func TestAllocationPerNode(t *testing.T) {
// prepare allocator with initial targets and collectors
s, _ := New("per-node", loggerPerNode)
Expand Down Expand Up @@ -93,6 +103,78 @@ func TestAllocationPerNode(t *testing.T) {
}
}

// Tests that four targets, with one of them missing node labels, are all assigned.
func TestAllocationPerNodeUsingFallback(t *testing.T) {
// prepare allocator with initial targets and collectors
s, _ := New("per-node", loggerPerNode, WithFallbackStrategy(consistentHashingStrategyName))

cols := MakeNCollectors(4, 0)
s.SetCollectors(cols)
firstLabels := labels.Labels{
{Name: "test", Value: "test1"},
{Name: "__meta_kubernetes_pod_node_name", Value: "node-0"},
}
secondLabels := labels.Labels{
{Name: "test", Value: "test2"},
{Name: "__meta_kubernetes_node_name", Value: "node-1"},
}
// no label, should be skipped
lhp-nemlig marked this conversation as resolved.
Show resolved Hide resolved
thirdLabels := labels.Labels{
{Name: "test", Value: "test3"},
}
// endpointslice target kind and name
fourthLabels := labels.Labels{
{Name: "test", Value: "test4"},
{Name: "__meta_kubernetes_endpointslice_address_target_kind", Value: "Node"},
{Name: "__meta_kubernetes_endpointslice_address_target_name", Value: "node-3"},
}

firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "")
secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "")
thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "")
fourthTarget := target.NewItem("sample-name", "0.0.0.0:8000", fourthLabels, "")

targetList := map[string]*target.Item{
firstTarget.Hash(): firstTarget,
secondTarget.Hash(): secondTarget,
thirdTarget.Hash(): thirdTarget,
fourthTarget.Hash(): fourthTarget,
}

// test that targets and collectors are added properly
s.SetTargets(targetList)

// verify length
actualItems := s.TargetItems()

// one target should be skipped
lhp-nemlig marked this conversation as resolved.
Show resolved Hide resolved
expectedTargetLen := len(targetList)
assert.Len(t, actualItems, expectedTargetLen)

// verify allocation to nodes
for targetHash, item := range targetList {
actualItem, found := actualItems[targetHash]
// if third target, should be skipped
lhp-nemlig marked this conversation as resolved.
Show resolved Hide resolved
assert.True(t, found, "target with hash %s not found", item.Hash())

// only the first two targets should be allocated
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName)

// first two should be assigned one to each collector; if third target, it should be assigned
// according to the fallback strategy which may assign it to the otherwise empty collector or
// one of the others, depending on the strategy and collector loop order
if targetHash == thirdTarget.Hash() {
assert.Empty(t, item.GetNodeName())
assert.NotZero(t, len(itemsForCollector))
continue
}

// Only check targets that have been assigned using the per-node (not fallback) strategy here
assert.Len(t, GetTargetsWithNodeName(itemsForCollector), 1)
assert.Equal(t, actualItem, GetTargetsWithNodeName(itemsForCollector)[0])
}
}

func TestTargetsWithNoCollectorsPerNode(t *testing.T) {
// prepare allocator with initial targets and collectors
c, _ := New("per-node", loggerPerNode)
Expand Down
43 changes: 27 additions & 16 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator

var (
strategies = map[string]Strategy{}

registry = map[string]AllocatorProvider{}

// TargetsPerCollector records how many targets have been assigned to each collector.
Expand Down Expand Up @@ -67,6 +69,16 @@ func WithFilter(filter Filter) AllocationOption {
}
}

func WithFallbackStrategy(fallbackStrategy string) AllocationOption {
var strategy, ok = strategies[fallbackStrategy]
if fallbackStrategy != "" && !ok {
panic(fmt.Errorf("unregistered strategy used as fallback: %s", fallbackStrategy))
}
return func(allocator Allocator) {
allocator.SetFallbackStrategy(strategy)
}
}

func RecordTargetsKept(targets map[string]*target.Item) {
targetsRemaining.Add(float64(len(targets)))
}
Expand Down Expand Up @@ -101,6 +113,7 @@ type Allocator interface {
Collectors() map[string]*Collector
GetTargetsForCollectorAndJob(collector string, job string) []*target.Item
SetFilter(filter Filter)
SetFallbackStrategy(strategy Strategy)
}

type Strategy interface {
Expand All @@ -110,6 +123,8 @@ type Strategy interface {
// SetCollectors call. Strategies which don't need this information can just ignore it.
SetCollectors(map[string]*Collector)
GetName() string
// Add fallback strategy for strategies whose main allocation method can sometimes leave targets unassigned
SetFallbackStrategy(Strategy)
}

var _ consistent.Member = Collector{}
Expand All @@ -136,22 +151,18 @@ func NewCollector(name, node string) *Collector {
}

func init() {
err := Register(leastWeightedStrategyName, func(log logr.Logger, opts ...AllocationOption) Allocator {
return newAllocator(log, newleastWeightedStrategy(), opts...)
})
if err != nil {
panic(err)
}
err = Register(consistentHashingStrategyName, func(log logr.Logger, opts ...AllocationOption) Allocator {
return newAllocator(log, newConsistentHashingStrategy(), opts...)
})
if err != nil {
panic(err)
strategies = map[string]Strategy{
leastWeightedStrategyName: newleastWeightedStrategy(),
consistentHashingStrategyName: newConsistentHashingStrategy(),
perNodeStrategyName: newPerNodeStrategy(),
}
err = Register(perNodeStrategyName, func(log logr.Logger, opts ...AllocationOption) Allocator {
return newAllocator(log, newPerNodeStrategy(), opts...)
})
if err != nil {
panic(err)

for strategyName, strategy := range strategies {
err := Register(strategyName, func(log logr.Logger, opts ...AllocationOption) Allocator {
return newAllocator(log, strategy, opts...)
})
if err != nil {
panic(err)
}
}
}
26 changes: 14 additions & 12 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ const (
)

type Config struct {
ListenAddr string `yaml:"listen_addr,omitempty"`
KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"`
ClusterConfig *rest.Config `yaml:"-"`
RootLogger logr.Logger `yaml:"-"`
CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"`
PromConfig *promconfig.Config `yaml:"config"`
AllocationStrategy string `yaml:"allocation_strategy,omitempty"`
FilterStrategy string `yaml:"filter_strategy,omitempty"`
PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"`
HTTPS HTTPSServerConfig `yaml:"https,omitempty"`
ListenAddr string `yaml:"listen_addr,omitempty"`
KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"`
ClusterConfig *rest.Config `yaml:"-"`
RootLogger logr.Logger `yaml:"-"`
CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"`
PromConfig *promconfig.Config `yaml:"config"`
AllocationStrategy string `yaml:"allocation_strategy,omitempty"`
AllocationFallbackStrategy string `yaml:"allocation_fallback_strategy,omitempty"`
FilterStrategy string `yaml:"filter_strategy,omitempty"`
PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"`
HTTPS HTTPSServerConfig `yaml:"https,omitempty"`
}

type PrometheusCRConfig struct {
Expand Down Expand Up @@ -165,8 +166,9 @@ func unmarshal(cfg *Config, configFile string) error {

func CreateDefaultConfig() Config {
return Config{
AllocationStrategy: DefaultAllocationStrategy,
FilterStrategy: DefaultFilterStrategy,
AllocationStrategy: DefaultAllocationStrategy,
AllocationFallbackStrategy: "",
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: DefaultCRScrapeInterval,
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func main() {
log := ctrl.Log.WithName("allocator")

allocatorPrehook = prehook.New(cfg.FilterStrategy, log)
allocator, err = allocation.New(cfg.AllocationStrategy, log, allocation.WithFilter(allocatorPrehook))
allocator, err = allocation.New(cfg.AllocationStrategy, log, allocation.WithFilter(allocatorPrehook), allocation.WithFallbackStrategy(cfg.AllocationFallbackStrategy))
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions cmd/otel-allocator/server/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (m *mockAllocator) SetTargets(_ map[string]*target.Item)
func (m *mockAllocator) Collectors() map[string]*allocation.Collector { return nil }
func (m *mockAllocator) GetTargetsForCollectorAndJob(_ string, _ string) []*target.Item { return nil }
func (m *mockAllocator) SetFilter(_ allocation.Filter) {}
func (m *mockAllocator) SetFallbackStrategy(_ allocation.Strategy) {}

func (m *mockAllocator) TargetItems() map[string]*target.Item {
return m.targetItems
Expand Down
5 changes: 5 additions & 0 deletions internal/manifests/targetallocator/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func ConfigMap(params Params) (*corev1.ConfigMap, error) {
} else {
taConfig["allocation_strategy"] = v1beta1.TargetAllocatorAllocationStrategyConsistentHashing
}

if featuregate.EnableTargetAllocatorFallbackStrategy.IsEnabled() {
taConfig["allocation_fallback_strategy"] = v1beta1.TargetAllocatorAllocationStrategyConsistentHashing
}
swiatekm marked this conversation as resolved.
Show resolved Hide resolved

taConfig["filter_strategy"] = taSpec.FilterStrategy

if taSpec.PrometheusCR.Enabled {
Expand Down
8 changes: 8 additions & 0 deletions pkg/featuregate/featuregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ var (
featuregate.WithRegisterDescription("enables mTLS between the target allocator and the collector"),
featuregate.WithRegisterFromVersion("v0.111.0"),
)
// EnableTargetAllocatorFallbackStrategy is the feature gate that enables consistent-hashing as the fallback
// strategy for allocation strategies that might not assign all jobs (per-node).
EnableTargetAllocatorFallbackStrategy = featuregate.GlobalRegistry().MustRegister(
"operator.targetallocator.fallbackstrategy",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("enables fallback allocation strategy for the target allocator"),
featuregate.WithRegisterFromVersion("v0.114.0"),
)
// EnableConfigDefaulting is the feature gate that enables the operator to default the endpoint for known components.
EnableConfigDefaulting = featuregate.GlobalRegistry().MustRegister(
"operator.collector.default.config",
Expand Down
Loading