Skip to content

Commit

Permalink
Fixed race test to have changing config values
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Aug 1, 2023
1 parent b46d5b9 commit 7ea6280
Showing 1 changed file with 90 additions and 47 deletions.
137 changes: 90 additions & 47 deletions agent/hcp/telemetry_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
)

const (
testRefreshInterval = 100 * time.Millisecond
testSinkServiceName = "test.telemetry_config_provider"
testRaceSampleCount = 5000
testRefreshInterval = 100 * time.Millisecond
testSinkServiceName = "test.telemetry_config_provider"
testRaceWriteSampleCount = 100
testRaceReadSampleCount = 5000
)

var (
Expand Down Expand Up @@ -222,58 +223,100 @@ func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
}
}

func TestTelemetryConfigProvider_Race(t *testing.T) {
cfg, err := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
// mockRaceClient is a mock HCP client that fetches TelemetryConfig.
// The mock TelemetryConfig returned can be manually updated at any time.
// It manages concurrent read/write access to config with a sync.RWMutex.
type mockRaceClient struct {
client.Client
cfg *client.TelemetryConfig
rw sync.RWMutex
}

// updateCfg acquires a write lock and updates client config to a new value givent a count.
func (m *mockRaceClient) updateCfg(count int) (*client.TelemetryConfig, error) {
m.rw.Lock()
defer m.rw.Unlock()

labels := map[string]string{fmt.Sprintf("label_%d", count): fmt.Sprintf("value_%d", count)}

filters, err := regexp.Compile(fmt.Sprintf("consul_filter_%d", count))
if err != nil {
return nil, err
}

endpoint, err := url.Parse(fmt.Sprintf("http://consul-endpoint-%d.com", count))
if err != nil {
return nil, err
}

cfg := &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Filters: filters,
Endpoint: endpoint,
Labels: labels,
},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testRefreshInterval,
},
}
m.cfg = cfg

return cfg, nil
}

// FetchTelemetryConfig returns the current config held by the mockRaceClient.
func (m *mockRaceClient) FetchTelemetryConfig(ctx context.Context) (*client.TelemetryConfig, error) {
m.rw.RLock()
defer m.rw.RUnlock()

return m.cfg, nil
}

func TestTelemetryConfigProvider_Race(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockClient := client.NewMockClient(t)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(cfg, nil)
initCfg, err := testTelemetryCfg(&testConfig{
endpoint: "test.com",
filters: "test",
labels: map[string]string{"test_label": "test_value"},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)

m := &mockRaceClient{
cfg: initCfg,
}

// Start the provider goroutine
// Every refresh interval, config will be modified.
provider, err := NewHCPProvider(ctx, mockClient, cfg)
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
provider, err := NewHCPProvider(ctx, m, m.cfg)
require.NoError(t, err)

// Every refresh interval, try to query config using Get* methods inducing a race condition.
timer := time.NewTimer(testRefreshInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
wg := &sync.WaitGroup{}
// Start goroutines that try to access label configuration.
kickOff(wg, testRaceSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetLabels(), cfg.MetricsConfig.Labels)
})

// Start goroutines that try to access endpoint configuration.
kickOff(wg, testRaceSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetFilters(), cfg.MetricsConfig.Filters)
})

// Start goroutines that try to access filter configuration.
kickOff(wg, testRaceSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetEndpoint(), cfg.MetricsConfig.Endpoint)
})

wg.Wait()
// Stop after 10 refresh intervals.
case <-time.After(10 * testRefreshInterval):
return
case <-ctx.Done():
require.Fail(t, "Context cancelled before test finishes")
return
}
for count := 0; count < testRaceWriteSampleCount; count++ {
// Force a TelemetryConfig value change in the mockRaceClient.
newCfg, err := m.updateCfg(count)
require.NoError(t, err)
// Force provider to obtain new client TelemetryConfig immediately.
// This call is necessary to guarantee TelemetryConfig changes to assert on expected values below.
provider.getUpdate(context.Background())

// Start goroutines to access label configuration.
wg := &sync.WaitGroup{}
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetLabels(), newCfg.MetricsConfig.Labels)
})

// Start goroutines to access endpoint configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetFilters(), newCfg.MetricsConfig.Filters)
})

// Start goroutines to access filter configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetEndpoint(), newCfg.MetricsConfig.Endpoint)
})

wg.Wait()
}
}

Expand Down

0 comments on commit 7ea6280

Please sign in to comment.