Skip to content

Commit

Permalink
Merge pull request #297 from splitio/SDKS-9085-healthcheck-monitor
Browse files Browse the repository at this point in the history
[SDKS-9085] Adding large segment healthcheck monitor
  • Loading branch information
sanzmauro authored Dec 2, 2024
2 parents 77ed8ba + 014c1ec commit 1204cac
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 52 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.3.0
github.com/splitio/gincache v1.0.1
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241202205419-67ca9241a954
github.com/splitio/go-toolkit/v5 v5.4.0
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU=
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68 h1:Nr48cVYJZCOQzfKPGPsYcHykzEa4M/ADPkzO+eo3GOI=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241202205419-67ca9241a954 h1:vqz8xWtmCoBc6jk1ijcjTMAStrpdQDa7EjOEgBJ97ew=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241202205419-67ca9241a954/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
2 changes: 1 addition & 1 deletion splitio/producer/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error {

// Healcheck Monitor
splitsConfig, segmentsConfig, storageConfig := getAppCounterConfigs(storages.SplitStorage)
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, &storageConfig, logger)
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, nil, &storageConfig, logger)
servicesMonitor := hcServices.NewMonitorImp(getServicesCountersConfig(advanced), logger)

impressionsCounter := strategy.NewImpressionsCounter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ const (
Low
)

const (
// Splits counter type
Splits = iota
// Segments counter type
Segments
// Storage counter type
Storage
)

// HealthyResult description
type HealthyResult struct {
Name string
Expand Down
71 changes: 43 additions & 28 deletions splitio/provisional/healthcheck/application/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

hc "github.com/splitio/go-split-commons/v6/healthcheck/application"
"github.com/splitio/go-toolkit/v5/logging"
toolkitsync "github.com/splitio/go-toolkit/v5/sync"
"github.com/splitio/split-synchronizer/v5/splitio/provisional/healthcheck/application/counter"
Expand All @@ -21,13 +22,12 @@ type MonitorIterface interface {

// MonitorImp description
type MonitorImp struct {
splitsCounter counter.ThresholdCounterInterface
segmentsCounter counter.ThresholdCounterInterface
storageCounter counter.PeriodicCounterInterface
producerMode toolkitsync.AtomicBool
healthySince *time.Time
lock sync.RWMutex
logger logging.LoggerInterface
counters map[int]counter.ThresholdCounterInterface
storageCounter counter.PeriodicCounterInterface
producerMode toolkitsync.AtomicBool
healthySince *time.Time
lock sync.RWMutex
logger logging.LoggerInterface
}

// HealthDto struct
Expand Down Expand Up @@ -56,7 +56,7 @@ func (m *MonitorImp) getHealthySince(healthy bool) *time.Time {

func checkIfIsHealthy(result []ItemDto) bool {
for _, r := range result {
if r.Healthy == false && r.Severity == counter.Critical {
if !r.Healthy && r.Severity == counter.Critical {
return false
}
}
Expand All @@ -71,7 +71,10 @@ func (m *MonitorImp) GetHealthStatus() HealthDto {

var items []ItemDto
var results []counter.HealthyResult
results = append(results, m.splitsCounter.IsHealthy(), m.segmentsCounter.IsHealthy())

for _, mc := range m.counters {
results = append(results, mc.IsHealthy())
}

if m.producerMode.IsSet() {
results = append(results, m.storageCounter.IsHealthy())
Expand Down Expand Up @@ -104,12 +107,12 @@ func (m *MonitorImp) NotifyEvent(counterType int) {

m.logger.Debug(fmt.Sprintf("Notify Event. Type: %d.", counterType))

switch counterType {
case counter.Splits:
m.splitsCounter.NotifyHit()
case counter.Segments:
m.segmentsCounter.NotifyHit()
counter, ok := m.counters[counterType]
if !ok {
m.logger.Debug(fmt.Sprintf("wrong counterType: %d", counterType))
return
}
counter.NotifyHit()
}

// Reset counter value
Expand All @@ -119,21 +122,23 @@ func (m *MonitorImp) Reset(counterType int, value int) {

m.logger.Debug(fmt.Sprintf("Reset. Type: %d. Value: %d", counterType, value))

switch counterType {
case counter.Splits:
m.splitsCounter.ResetThreshold(value)
case counter.Segments:
m.segmentsCounter.ResetThreshold(value)
counter, ok := m.counters[counterType]
if !ok {
m.logger.Debug(fmt.Sprintf("wrong counterType: %d", counterType))
return
}
counter.ResetThreshold(value)
}

// Start counters
func (m *MonitorImp) Start() {
m.lock.Lock()
defer m.lock.Unlock()

m.splitsCounter.Start()
m.segmentsCounter.Start()
for _, counter := range m.counters {
counter.Start()
}

if m.producerMode.IsSet() {
m.storageCounter.Start()
}
Expand All @@ -146,8 +151,9 @@ func (m *MonitorImp) Stop() {
m.lock.Lock()
defer m.lock.Unlock()

m.splitsCounter.Stop()
m.segmentsCounter.Stop()
for _, counter := range m.counters {
counter.Stop()
}

if m.producerMode.IsSet() {
m.storageCounter.Stop()
Expand All @@ -158,16 +164,25 @@ func (m *MonitorImp) Stop() {
func NewMonitorImp(
splitsConfig counter.ThresholdConfig,
segmentsConfig counter.ThresholdConfig,
largeSegmentsConfig *counter.ThresholdConfig,
storageConfig *counter.PeriodicConfig,
logger logging.LoggerInterface,
) *MonitorImp {
now := time.Now()
splitsCounter := counter.NewThresholdCounter(splitsConfig, logger)
segmentsCounter := counter.NewThresholdCounter(segmentsConfig, logger)
monitor := &MonitorImp{
splitsCounter: counter.NewThresholdCounter(splitsConfig, logger),
segmentsCounter: counter.NewThresholdCounter(segmentsConfig, logger),
producerMode: *toolkitsync.NewAtomicBool(storageConfig != nil),
logger: logger,
healthySince: &now,
counters: map[int]counter.ThresholdCounterInterface{},
producerMode: *toolkitsync.NewAtomicBool(storageConfig != nil),
logger: logger,
healthySince: &now,
}

monitor.counters[hc.Splits] = splitsCounter
monitor.counters[hc.Segments] = segmentsCounter

if largeSegmentsConfig != nil {
monitor.counters[hc.LargeSegments] = counter.NewThresholdCounter(*largeSegmentsConfig, logger)
}

if monitor.producerMode.IsSet() {
Expand Down
8 changes: 7 additions & 1 deletion splitio/provisional/healthcheck/application/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func TestMonitor(t *testing.T) {
Severity: counter.Critical,
}

lsCfg := counter.ThresholdConfig{
Name: "LargeSegments",
Period: 10,
Severity: counter.Critical,
}

storageCfg := counter.PeriodicConfig{
Name: "Storage",
Period: 10,
Expand All @@ -46,7 +52,7 @@ func TestMonitor(t *testing.T) {
},
}

monitor := NewMonitorImp(splitsCfg, segmentsCfg, &storageCfg, logging.NewLogger(nil))
monitor := NewMonitorImp(splitsCfg, segmentsCfg, &lsCfg, &storageCfg, logging.NewLogger(nil))

monitor.Start()

Expand Down
6 changes: 4 additions & 2 deletions splitio/proxy/conf/sections.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package conf

import (
cconf "github.com/splitio/go-split-commons/v6/conf"
"github.com/splitio/go-split-commons/v6/service/api/specs"
"github.com/splitio/split-synchronizer/v5/splitio/common/conf"
)

Expand All @@ -21,7 +22,6 @@ type Main struct {
Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"`
Observability Observability `json:"observability" s-nested:"true"`
FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.2" s-desc:"Spec version for flags"`
LargeSegmentVersion string `json:"largeSegmentVersion" s-cli:"largesegment-version" s-def:"1.0" s-desc:"Spec version for large segments"`
}

// BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters
Expand All @@ -34,6 +34,8 @@ func (m *Main) BuildAdvancedConfig() *cconf.AdvancedConfig {
tmp.SplitsRefreshRate = int(m.Sync.SplitRefreshRateMs / 1000)
tmp.SegmentsRefreshRate = int(m.Sync.SegmentRefreshRateMs / 1000)
tmp.LargeSegment.LazyLoad = m.Sync.Advanced.LargeSegmentLazyLoad
tmp.LargeSegment.RefreshRate = int(m.Sync.LargeSegmentRefreshRateMs / 1000)
tmp.LargeSegment.Version = specs.LARGESEGMENT_V10
return tmp
}

Expand Down Expand Up @@ -72,7 +74,7 @@ type Persistent struct {
type Sync struct {
SplitRefreshRateMs int64 `json:"splitRefreshRateMs" s-cli:"split-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh feature flags"`
SegmentRefreshRateMs int64 `json:"segmentRefreshRateMs" s-cli:"segment-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh segments"`
LargeSegmentRefreshRateMs int64 `json:"largeSegmentRefreshRateMs" s-cli:"largesegment-refresh-rate-ms" s-def:"3600000" s-desc:"How often to refresh large segments"`
LargeSegmentRefreshRateMs int64 `json:"largeSegmentRefreshRateMs" s-cli:"largesegment-refresh-rate-ms" s-def:"600000" s-desc:"How often to refresh large segments"`
Advanced AdvancedSync `json:"advanced" s-nested:"true"`
}

Expand Down
10 changes: 7 additions & 3 deletions splitio/proxy/controllers/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,14 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
return
}

spec, _ := ctx.GetQuery("s")
if spec != specs.FLAG_V1_1 {
spec = specs.FLAG_V1_0
sParam, _ := ctx.GetQuery("s")
spec, err := specs.ParseAndValidate(sParam)
if err != nil {
c.logger.Error(fmt.Sprintf("error parsing spec version: %s.", err))
ctx.JSON(http.StatusBadRequest, gin.H{"code": 400, "message": err.Error()})
return
}

splits.Splits = c.patchUnsupportedMatchers(splits.Splits, spec)

ctx.JSON(http.StatusOK, splits)
Expand Down
11 changes: 6 additions & 5 deletions splitio/proxy/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
)

// Healcheck Monitor
splitsConfig, segmentsConfig := getAppCounterConfigs()
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, nil, logger)
splitsConfig, segmentsConfig, lsConfig := getAppCounterConfigs()
appMonitor := hcApplication.NewMonitorImp(splitsConfig, segmentsConfig, &lsConfig, nil, logger)
servicesMonitor := hcServices.NewMonitorImp(getServicesCountersConfig(*advanced), logger)

// Creating Workers and Tasks
Expand Down Expand Up @@ -137,7 +137,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
ImpressionSyncTask: impressionTask,
ImpressionsCountSyncTask: impressionCountTask,
EventSyncTask: eventsTask,
LargeSegmentSyncTask: tasks.NewFetchLargeSegmentsTask(workers.LargeSegmentUpdater, splitStorage, int(cfg.Sync.LargeSegmentRefreshRateMs/1000), advanced.LargeSegment.Workers, advanced.LargeSegment.QueueSize, logger),
LargeSegmentSyncTask: tasks.NewFetchLargeSegmentsTask(workers.LargeSegmentUpdater, splitStorage, advanced.LargeSegment.RefreshRate, advanced.LargeSegment.Workers, advanced.LargeSegment.QueueSize, logger, appMonitor),
}

// Creating Synchronizer for tasks
Expand Down Expand Up @@ -321,11 +321,12 @@ func startBGSyng(m synchronizer.Manager, mstatus chan int, haveSnapshot bool, on

}

func getAppCounterConfigs() (hcAppCounter.ThresholdConfig, hcAppCounter.ThresholdConfig) {
func getAppCounterConfigs() (hcAppCounter.ThresholdConfig, hcAppCounter.ThresholdConfig, hcAppCounter.ThresholdConfig) {
splitsConfig := hcAppCounter.DefaultThresholdConfig("Splits")
segmentsConfig := hcAppCounter.DefaultThresholdConfig("Segments")
LargeSegmentsConfig := hcAppCounter.DefaultThresholdConfig("LargeSegments")

return splitsConfig, segmentsConfig
return splitsConfig, segmentsConfig, LargeSegmentsConfig
}

func getServicesCountersConfig(advanced conf.AdvancedConfig) []hcServicesCounter.Config {
Expand Down

0 comments on commit 1204cac

Please sign in to comment.