From 2df38511dfcdb731dedaf89489df4b6dae375c60 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Tue, 4 Jul 2023 20:55:50 +0600 Subject: [PATCH 01/10] wait for dispatching events on close --- pkg/client/client.go | 2 ++ pkg/event/dispatcher.go | 9 +++++++++ pkg/event/processor.go | 16 ++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/pkg/client/client.go b/pkg/client/client.go index 2cc01989e..e3890dfee 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -24,6 +24,7 @@ import ( "reflect" "runtime/debug" "strconv" + "time" "github.com/optimizely/go-sdk/pkg/config" "github.com/optimizely/go-sdk/pkg/decide" @@ -1064,6 +1065,7 @@ func (o *OptimizelyClient) GetOptimizelyConfig() (optimizelyConfig *config.Optim // Close closes the Optimizely instance and stops any ongoing tasks from its children components. func (o *OptimizelyClient) Close() { o.execGroup.TerminateAndWait() + o.EventProcessor.WaitForDispatchingEventsOnClose(time.Minute, time.Second) } func (o *OptimizelyClient) getDecisionVariableMap(feature entities.Feature, variation *entities.Variation, featureEnabled bool) (map[string]interface{}, decide.DecisionReasons) { diff --git a/pkg/event/dispatcher.go b/pkg/event/dispatcher.go index 1fd7610b8..8d49a5fb8 100644 --- a/pkg/event/dispatcher.go +++ b/pkg/event/dispatcher.go @@ -37,6 +37,7 @@ const sleepTime = 1 * time.Second // Dispatcher dispatches events type Dispatcher interface { DispatchEvent(event LogEvent) (bool, error) + EventsCount() int } // httpEventDispatcher is the HTTP implementation of the Dispatcher interface @@ -67,6 +68,10 @@ func (ed *httpEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return success, err } +func (ed *httpEventDispatcher) EventsCount() int { + return 0 +} + // NewHTTPEventDispatcher creates a full http dispatcher. The requester and logger parameters can be nil. func NewHTTPEventDispatcher(sdkKey string, requester *utils.HTTPRequester, logger logging.OptimizelyLogProducer) Dispatcher { if requester == nil { @@ -100,6 +105,10 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } +func (ed *QueueEventDispatcher) EventsCount() int { + return ed.eventQueue.Size() +} + // flush the events func (ed *QueueEventDispatcher) flushEvents() { diff --git a/pkg/event/processor.go b/pkg/event/processor.go index f765cc1b8..b6323e657 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -37,6 +37,7 @@ type Processor interface { ProcessEvent(event UserEvent) bool OnEventDispatch(callback func(logEvent LogEvent)) (int, error) RemoveOnEventDispatch(id int) error + WaitForDispatchingEventsOnClose(timeout, interval time.Duration) } // BatchEventProcessor is used out of the box by the SDK to queue up and batch events to be sent to the Optimizely @@ -184,6 +185,21 @@ func (p *BatchEventProcessor) Start(ctx context.Context) { p.startTicker(ctx) } +// Start does not do any initialization, just starts the ticker +func (p *BatchEventProcessor) WaitForDispatchingEventsOnClose(timeout, interval time.Duration) { + startTime := time.Now() + for { + if p.Q.Size() == 0 && p.EventDispatcher.EventsCount() == 0 { + break + } + + if time.Since(startTime) > timeout { + break + } + time.Sleep(interval) + } +} + // ProcessEvent takes the given user event (can be an impression or conversion event) and queues it up to be dispatched // to the Optimizely log endpoint. A dispatch happens when we flush the events, which can happen on a set interval or // when the specified batch size (defaulted to 10) is reached. From b73ea5394e498a921adef57ea7c3288252bcc984 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Tue, 4 Jul 2023 21:13:11 +0600 Subject: [PATCH 02/10] update tests --- pkg/client/client_test.go | 2 ++ pkg/client/factory_test.go | 4 ++++ pkg/event/dispatcher.go | 6 +++--- pkg/event/processor.go | 2 +- pkg/event/processor_test.go | 11 ++++++++++- 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 96e65a696..d870c25fe 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -105,6 +105,8 @@ func (m *MockProcessor) RemoveOnEventDispatch(id int) error { return nil } +func (m *MockProcessor) WaitForDispatchingEventsOnClose(_, _ time.Duration) {} + type MockNotificationCenter struct { notification.Center mock.Mock diff --git a/pkg/client/factory_test.go b/pkg/client/factory_test.go index 0d9ae2b03..b089c6e60 100644 --- a/pkg/client/factory_test.go +++ b/pkg/client/factory_test.go @@ -61,6 +61,10 @@ func (f *MockDispatcher) DispatchEvent(event event.LogEvent) (bool, error) { return true, nil } +func (f *MockDispatcher) EventsInQueue() int { + return 0 +} + type MockConfigManager struct { config.ProjectConfigManager projectConfig config.ProjectConfig diff --git a/pkg/event/dispatcher.go b/pkg/event/dispatcher.go index 8d49a5fb8..2e1fd4a59 100644 --- a/pkg/event/dispatcher.go +++ b/pkg/event/dispatcher.go @@ -37,7 +37,7 @@ const sleepTime = 1 * time.Second // Dispatcher dispatches events type Dispatcher interface { DispatchEvent(event LogEvent) (bool, error) - EventsCount() int + EventsInQueue() int } // httpEventDispatcher is the HTTP implementation of the Dispatcher interface @@ -68,7 +68,7 @@ func (ed *httpEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return success, err } -func (ed *httpEventDispatcher) EventsCount() int { +func (ed *httpEventDispatcher) EventsInQueue() int { return 0 } @@ -105,7 +105,7 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } -func (ed *QueueEventDispatcher) EventsCount() int { +func (ed *QueueEventDispatcher) EventsInQueue() int { return ed.eventQueue.Size() } diff --git a/pkg/event/processor.go b/pkg/event/processor.go index b6323e657..e31ca6641 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -189,7 +189,7 @@ func (p *BatchEventProcessor) Start(ctx context.Context) { func (p *BatchEventProcessor) WaitForDispatchingEventsOnClose(timeout, interval time.Duration) { startTime := time.Now() for { - if p.Q.Size() == 0 && p.EventDispatcher.EventsCount() == 0 { + if p.Q.Size() == 0 && p.EventDispatcher.EventsInQueue() == 0 { break } diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index 2a77fb8c6..0d0974be6 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -42,6 +42,10 @@ func (c *CountingDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } +func (c *CountingDispatcher) EventsInQueue() int { + return 0 +} + type MockDispatcher struct { ShouldFail bool Events Queue @@ -56,6 +60,10 @@ func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } +func (m *MockDispatcher) EventsInQueue() int { + return m.Events.Size() +} + func NewMockDispatcher(queueSize int, shouldFail bool) *MockDispatcher { return &MockDispatcher{Events: NewInMemoryQueue(queueSize), ShouldFail: shouldFail} } @@ -493,7 +501,8 @@ func (l *NoOpLogger) SetLogLevel(level logging.LogLevel) { } -/** +/* +* goos: darwin goarch: amd64 pkg: github.com/optimizely/go-sdk/pkg/event From 43600658362f2ba4ae38f967f84b54977515f94b Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Tue, 4 Jul 2023 21:35:01 +0600 Subject: [PATCH 03/10] use context --- pkg/client/client.go | 8 +++++++- pkg/client/client_test.go | 2 +- pkg/event/processor.go | 25 ++++++++++++++----------- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index e3890dfee..2bad13448 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,6 +18,7 @@ package client import ( + "context" "encoding/json" "errors" "fmt" @@ -55,6 +56,9 @@ type OptimizelyClient struct { defaultDecideOptions *decide.Options } +// WaitForDispatchingEventsTimeout holds the timeout value for the waiting for the dispatching events on client close +const WaitForDispatchingEventsTimeout = 10 * time.Second + // CreateUserContext creates a context of the user for which decision APIs will be called. // A user context will be created successfully even when the SDK is not fully configured yet. func (o *OptimizelyClient) CreateUserContext(userID string, attributes map[string]interface{}) OptimizelyUserContext { @@ -1065,7 +1069,9 @@ func (o *OptimizelyClient) GetOptimizelyConfig() (optimizelyConfig *config.Optim // Close closes the Optimizely instance and stops any ongoing tasks from its children components. func (o *OptimizelyClient) Close() { o.execGroup.TerminateAndWait() - o.EventProcessor.WaitForDispatchingEventsOnClose(time.Minute, time.Second) + ctx, cancel := context.WithTimeout(context.Background(), WaitForDispatchingEventsTimeout) + defer cancel() + o.EventProcessor.WaitForDispatchingEventsOnClose(ctx) } func (o *OptimizelyClient) getDecisionVariableMap(feature entities.Feature, variation *entities.Variation, featureEnabled bool) (map[string]interface{}, decide.DecisionReasons) { diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index d870c25fe..0a4e5c7cb 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -105,7 +105,7 @@ func (m *MockProcessor) RemoveOnEventDispatch(id int) error { return nil } -func (m *MockProcessor) WaitForDispatchingEventsOnClose(_, _ time.Duration) {} +func (m *MockProcessor) WaitForDispatchingEventsOnClose(ctx context.Context) {} type MockNotificationCenter struct { notification.Center diff --git a/pkg/event/processor.go b/pkg/event/processor.go index e31ca6641..b75ddee13 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -37,7 +37,7 @@ type Processor interface { ProcessEvent(event UserEvent) bool OnEventDispatch(callback func(logEvent LogEvent)) (int, error) RemoveOnEventDispatch(id int) error - WaitForDispatchingEventsOnClose(timeout, interval time.Duration) + WaitForDispatchingEventsOnClose(ctx context.Context) } // BatchEventProcessor is used out of the box by the SDK to queue up and batch events to be sent to the Optimizely @@ -66,6 +66,9 @@ const DefaultEventQueueSize = 2000 // DefaultEventFlushInterval holds the default value for the event flush interval const DefaultEventFlushInterval = 30 * time.Second +// WaitForDispatchingEventsInterval holds the checking interval for the dispatching events on client close +const WaitForDispatchingEventsInterval = 500 * time.Millisecond + // DefaultEventEndPoint is used as the default endpoint for sending events. const DefaultEventEndPoint = "https://logx.optimizely.com/v1/events" @@ -185,18 +188,18 @@ func (p *BatchEventProcessor) Start(ctx context.Context) { p.startTicker(ctx) } -// Start does not do any initialization, just starts the ticker -func (p *BatchEventProcessor) WaitForDispatchingEventsOnClose(timeout, interval time.Duration) { - startTime := time.Now() +// WaitForDispatchingEventsOnClose waits until all the events are dispatched +func (p *BatchEventProcessor) WaitForDispatchingEventsOnClose(ctx context.Context) { for { - if p.Q.Size() == 0 && p.EventDispatcher.EventsInQueue() == 0 { - break - } - - if time.Since(startTime) > timeout { - break + select { + case <-ctx.Done(): + return + default: + if p.Q.Size() == 0 && p.EventDispatcher.EventsInQueue() == 0 { + return + } + time.Sleep(WaitForDispatchingEventsInterval) } - time.Sleep(interval) } } From 4a290ad81d60c409036511a88384eec7ff0b5b40 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Wed, 5 Jul 2023 18:28:40 +0600 Subject: [PATCH 04/10] add unit test --- pkg/event/dispatcher.go | 1 + pkg/event/processor_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/pkg/event/dispatcher.go b/pkg/event/dispatcher.go index 2e1fd4a59..59b11da52 100644 --- a/pkg/event/dispatcher.go +++ b/pkg/event/dispatcher.go @@ -105,6 +105,7 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } +// EventsInQueue returns the events count in the queue. func (ed *QueueEventDispatcher) EventsInQueue() int { return ed.eventQueue.Size() } diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index 0d0974be6..dbb64c0b5 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -377,6 +377,32 @@ func TestBatchEventProcessor_FlushesOnClose(t *testing.T) { assert.Equal(t, 0, processor.eventsCount()) } +func TestBatchEventProcessor_WaitForDispatchingEventsOnClose(t *testing.T) { + eg := newExecutionContext() + processor := NewBatchEventProcessor( + WithQueueSize(100), + WithQueue(NewInMemoryQueue(100)), + WithEventDispatcher(NewMockDispatcher(100, false))) + eg.Go(processor.Start) + + impression := BuildTestImpressionEvent() + + for i := 0; i < 100; i++ { + processor.ProcessEvent(impression) + } + + assert.Equal(t, 100, processor.eventsCount()) + + // Triggers the flush in the processor + eg.TerminateAndWait() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + processor.WaitForDispatchingEventsOnClose(ctx) + + assert.Equal(t, 0, processor.eventsCount()) + assert.Equal(t, 0, processor.EventDispatcher.EventsInQueue()) +} + func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) { eg := newExecutionContext() dispatcher := NewMockDispatcher(100, false) From 9eed895be100a1fa969c458dbd22a8e3f1be4cdf Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Wed, 5 Jul 2023 21:25:23 +0600 Subject: [PATCH 05/10] update tests --- pkg/event/processor_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index dbb64c0b5..0166e165a 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -47,8 +47,9 @@ func (c *CountingDispatcher) EventsInQueue() int { } type MockDispatcher struct { - ShouldFail bool - Events Queue + ShouldFail bool + Events Queue + eventsQueue Queue // dispatch events from this queue } func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) { @@ -57,15 +58,27 @@ func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) { } m.Events.Add(event) + if m.eventsQueue != nil { + m.eventsQueue.Add(event) + go m.flushEvents() + } return true, nil } func (m *MockDispatcher) EventsInQueue() int { - return m.Events.Size() + return m.eventsQueue.Size() +} + +func (m *MockDispatcher) flushEvents() { + queueSize := m.eventsQueue.Size() + for ; queueSize > 0; queueSize = m.eventsQueue.Size() { + m.eventsQueue.Remove(1) + time.Sleep(5 * time.Millisecond) + } } func NewMockDispatcher(queueSize int, shouldFail bool) *MockDispatcher { - return &MockDispatcher{Events: NewInMemoryQueue(queueSize), ShouldFail: shouldFail} + return &MockDispatcher{Events: NewInMemoryQueue(queueSize), eventsQueue: NewInMemoryQueue(queueSize), ShouldFail: shouldFail} } func newExecutionContext() *utils.ExecGroup { @@ -188,7 +201,6 @@ func TestDefaultEventProcessor_BatchSizes(t *testing.T) { assert.Equal(t, 50, len(logEvent.Event.Visitors)) logEvent, _ = evs[1].(LogEvent) assert.Equal(t, 50, len(logEvent.Event.Visitors)) - } eg.TerminateAndWait() } From 363eb10d599ac3a5d1ea1522a162a4edf6a6f856 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 6 Jul 2023 21:23:29 +0600 Subject: [PATCH 06/10] refactor code --- pkg/client/client.go | 8 -------- pkg/client/factory_test.go | 4 ---- pkg/event/dispatcher.go | 24 +++++++++++++++--------- pkg/event/processor.go | 22 ++++++---------------- pkg/event/processor_test.go | 21 ++++++--------------- 5 files changed, 27 insertions(+), 52 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 2bad13448..2cc01989e 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,14 +18,12 @@ package client import ( - "context" "encoding/json" "errors" "fmt" "reflect" "runtime/debug" "strconv" - "time" "github.com/optimizely/go-sdk/pkg/config" "github.com/optimizely/go-sdk/pkg/decide" @@ -56,9 +54,6 @@ type OptimizelyClient struct { defaultDecideOptions *decide.Options } -// WaitForDispatchingEventsTimeout holds the timeout value for the waiting for the dispatching events on client close -const WaitForDispatchingEventsTimeout = 10 * time.Second - // CreateUserContext creates a context of the user for which decision APIs will be called. // A user context will be created successfully even when the SDK is not fully configured yet. func (o *OptimizelyClient) CreateUserContext(userID string, attributes map[string]interface{}) OptimizelyUserContext { @@ -1069,9 +1064,6 @@ func (o *OptimizelyClient) GetOptimizelyConfig() (optimizelyConfig *config.Optim // Close closes the Optimizely instance and stops any ongoing tasks from its children components. func (o *OptimizelyClient) Close() { o.execGroup.TerminateAndWait() - ctx, cancel := context.WithTimeout(context.Background(), WaitForDispatchingEventsTimeout) - defer cancel() - o.EventProcessor.WaitForDispatchingEventsOnClose(ctx) } func (o *OptimizelyClient) getDecisionVariableMap(feature entities.Feature, variation *entities.Variation, featureEnabled bool) (map[string]interface{}, decide.DecisionReasons) { diff --git a/pkg/client/factory_test.go b/pkg/client/factory_test.go index b089c6e60..0d9ae2b03 100644 --- a/pkg/client/factory_test.go +++ b/pkg/client/factory_test.go @@ -61,10 +61,6 @@ func (f *MockDispatcher) DispatchEvent(event event.LogEvent) (bool, error) { return true, nil } -func (f *MockDispatcher) EventsInQueue() int { - return 0 -} - type MockConfigManager struct { config.ProjectConfigManager projectConfig config.ProjectConfig diff --git a/pkg/event/dispatcher.go b/pkg/event/dispatcher.go index 59b11da52..4536e8647 100644 --- a/pkg/event/dispatcher.go +++ b/pkg/event/dispatcher.go @@ -18,6 +18,7 @@ package event import ( + "context" "fmt" "net/http" "time" @@ -37,7 +38,6 @@ const sleepTime = 1 * time.Second // Dispatcher dispatches events type Dispatcher interface { DispatchEvent(event LogEvent) (bool, error) - EventsInQueue() int } // httpEventDispatcher is the HTTP implementation of the Dispatcher interface @@ -68,10 +68,6 @@ func (ed *httpEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return success, err } -func (ed *httpEventDispatcher) EventsInQueue() int { - return 0 -} - // NewHTTPEventDispatcher creates a full http dispatcher. The requester and logger parameters can be nil. func NewHTTPEventDispatcher(sdkKey string, requester *utils.HTTPRequester, logger logging.OptimizelyLogProducer) Dispatcher { if requester == nil { @@ -105,14 +101,24 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } -// EventsInQueue returns the events count in the queue. -func (ed *QueueEventDispatcher) EventsInQueue() int { - return ed.eventQueue.Size() +// waitForDispatchingEventsOnClose will wait until all the event are dispatched or +// until the given context is alive +func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + if ed.eventQueue.Size() == 0 { + return + } + time.Sleep(WaitForDispatchingEventsInterval) + } + } } // flush the events func (ed *QueueEventDispatcher) flushEvents() { - // Limit flushing to a single worker if !ed.processing.TryAcquire(1) { return diff --git a/pkg/event/processor.go b/pkg/event/processor.go index b75ddee13..0c20b7366 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -37,7 +37,6 @@ type Processor interface { ProcessEvent(event UserEvent) bool OnEventDispatch(callback func(logEvent LogEvent)) (int, error) RemoveOnEventDispatch(id int) error - WaitForDispatchingEventsOnClose(ctx context.Context) } // BatchEventProcessor is used out of the box by the SDK to queue up and batch events to be sent to the Optimizely @@ -69,6 +68,9 @@ const DefaultEventFlushInterval = 30 * time.Second // WaitForDispatchingEventsInterval holds the checking interval for the dispatching events on client close const WaitForDispatchingEventsInterval = 500 * time.Millisecond +// WaitForDispatchingEventsTimeout holds the timeout value for the waiting for the dispatching events on client close +const WaitForDispatchingEventsTimeout = 30 * time.Second + // DefaultEventEndPoint is used as the default endpoint for sending events. const DefaultEventEndPoint = "https://logx.optimizely.com/v1/events" @@ -188,21 +190,6 @@ func (p *BatchEventProcessor) Start(ctx context.Context) { p.startTicker(ctx) } -// WaitForDispatchingEventsOnClose waits until all the events are dispatched -func (p *BatchEventProcessor) WaitForDispatchingEventsOnClose(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - if p.Q.Size() == 0 && p.EventDispatcher.EventsInQueue() == 0 { - return - } - time.Sleep(WaitForDispatchingEventsInterval) - } - } -} - // ProcessEvent takes the given user event (can be an impression or conversion event) and queues it up to be dispatched // to the Optimizely log endpoint. A dispatch happens when we flush the events, which can happen on a set interval or // when the specified batch size (defaulted to 10) is reached. @@ -264,6 +251,9 @@ func (p *BatchEventProcessor) startTicker(ctx context.Context) { d, ok := p.EventDispatcher.(*QueueEventDispatcher) if ok { d.flushEvents() + waitCtx, cancel := context.WithTimeout(context.Background(), WaitForDispatchingEventsTimeout) + defer cancel() + d.waitForDispatchingEventsOnClose(waitCtx) } p.Ticker.Stop() return diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index 0166e165a..82529d240 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -42,10 +42,6 @@ func (c *CountingDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } -func (c *CountingDispatcher) EventsInQueue() int { - return 0 -} - type MockDispatcher struct { ShouldFail bool Events Queue @@ -65,10 +61,6 @@ func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } -func (m *MockDispatcher) EventsInQueue() int { - return m.eventsQueue.Size() -} - func (m *MockDispatcher) flushEvents() { queueSize := m.eventsQueue.Size() for ; queueSize > 0; queueSize = m.eventsQueue.Size() { @@ -399,20 +391,20 @@ func TestBatchEventProcessor_WaitForDispatchingEventsOnClose(t *testing.T) { impression := BuildTestImpressionEvent() - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { processor.ProcessEvent(impression) } - assert.Equal(t, 100, processor.eventsCount()) + assert.Equal(t, 10, processor.eventsCount()) // Triggers the flush in the processor eg.TerminateAndWait() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - processor.WaitForDispatchingEventsOnClose(ctx) + + dispatcher, ok := processor.EventDispatcher.(*MockDispatcher) + assert.True(t, ok) assert.Equal(t, 0, processor.eventsCount()) - assert.Equal(t, 0, processor.EventDispatcher.EventsInQueue()) + assert.Equal(t, 0, dispatcher.eventsQueue.Size()) } func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) { @@ -540,7 +532,6 @@ func (l *NoOpLogger) SetLogLevel(level logging.LogLevel) { } /* -* goos: darwin goarch: amd64 pkg: github.com/optimizely/go-sdk/pkg/event From 5bf3e140b7ebc9e22a4b539a1e92b5d776c3bd55 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 6 Jul 2023 21:50:15 +0600 Subject: [PATCH 07/10] refactor code --- pkg/client/client_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 0a4e5c7cb..96e65a696 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -105,8 +105,6 @@ func (m *MockProcessor) RemoveOnEventDispatch(id int) error { return nil } -func (m *MockProcessor) WaitForDispatchingEventsOnClose(ctx context.Context) {} - type MockNotificationCenter struct { notification.Center mock.Mock From fdf9858c0cf22db61e4f2b24010f3cb6f29e3613 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 6 Jul 2023 22:07:58 +0600 Subject: [PATCH 08/10] fix linter --- pkg/event/dispatcher.go | 8 +++++--- pkg/event/processor.go | 4 +--- pkg/event/processor_test.go | 1 - 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/event/dispatcher.go b/pkg/event/dispatcher.go index 4536e8647..01c715b9c 100644 --- a/pkg/event/dispatcher.go +++ b/pkg/event/dispatcher.go @@ -101,9 +101,11 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) { return true, nil } -// waitForDispatchingEventsOnClose will wait until all the event are dispatched or -// until the given context is alive -func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(ctx context.Context) { +// waitForDispatchingEventsOnClose will wait until all the event are dispatched +func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { select { case <-ctx.Done(): diff --git a/pkg/event/processor.go b/pkg/event/processor.go index 0c20b7366..aa8d9d8d9 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -251,9 +251,7 @@ func (p *BatchEventProcessor) startTicker(ctx context.Context) { d, ok := p.EventDispatcher.(*QueueEventDispatcher) if ok { d.flushEvents() - waitCtx, cancel := context.WithTimeout(context.Background(), WaitForDispatchingEventsTimeout) - defer cancel() - d.waitForDispatchingEventsOnClose(waitCtx) + d.waitForDispatchingEventsOnClose(WaitForDispatchingEventsTimeout) } p.Ticker.Stop() return diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index 82529d240..164e9e44f 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -65,7 +65,6 @@ func (m *MockDispatcher) flushEvents() { queueSize := m.eventsQueue.Size() for ; queueSize > 0; queueSize = m.eventsQueue.Size() { m.eventsQueue.Remove(1) - time.Sleep(5 * time.Millisecond) } } From f15a897a36020a8ca7e08bb8ce55958cb5c330a1 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 6 Jul 2023 22:48:54 +0600 Subject: [PATCH 09/10] update test --- pkg/event/dispatcher_test.go | 37 ++++++++++++++++++++++++++++++++++++ pkg/event/processor_test.go | 26 ------------------------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/pkg/event/dispatcher_test.go b/pkg/event/dispatcher_test.go index 3ccd596dd..04622c9b4 100644 --- a/pkg/event/dispatcher_test.go +++ b/pkg/event/dispatcher_test.go @@ -197,3 +197,40 @@ func TestQueueEventDispatcher_FailDispath(t *testing.T) { assert.Equal(t, float64(1), metricsRegistry.GetGauge(metrics.DispatcherQueueSize).(*MetricsGauge).Get()) assert.Equal(t, float64(0), metricsRegistry.GetCounter(metrics.DispatcherSuccessFlush).(*MetricsCounter).Get()) } + +func TestQueueEventDispatcher_WaitForDispatchingEventsOnClose(t *testing.T) { + metricsRegistry := NewMetricsRegistry() + + q := NewQueueEventDispatcher("", metricsRegistry) + + assert.True(t, q.Dispatcher != nil) + if d, ok := q.Dispatcher.(*httpEventDispatcher); ok { + assert.True(t, d.requester != nil && d.logger != nil) + } else { + assert.True(t, false) + } + sender := &MockDispatcher{Events: NewInMemoryQueue(100), eventsQueue: NewInMemoryQueue(100)} + q.Dispatcher = sender + + eventTags := map[string]interface{}{"revenue": 55.0, "value": 25.1} + config := TestConfig{} + + for i := 0; i < 10; i++ { + conversionUserEvent := CreateConversionUserEvent(config, entities.Event{ExperimentIds: []string{"15402980349"}, ID: "15368860886", Key: "sample_conversion"}, userContext, eventTags) + + batch := createBatchEvent(conversionUserEvent, createVisitorFromUserEvent(conversionUserEvent)) + assert.Equal(t, conversionUserEvent.Timestamp, batch.Visitors[0].Snapshots[0].Events[0].Timestamp) + + logEvent := createLogEvent(batch, DefaultEventEndPoint) + + success, _ := q.DispatchEvent(logEvent) + + assert.True(t, success) + } + + // wait for the events to be dispatched + q.waitForDispatchingEventsOnClose(10 * time.Second) + + // check the queue + assert.Equal(t, 0, q.eventQueue.Size()) +} diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index 164e9e44f..fe7c12613 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -380,32 +380,6 @@ func TestBatchEventProcessor_FlushesOnClose(t *testing.T) { assert.Equal(t, 0, processor.eventsCount()) } -func TestBatchEventProcessor_WaitForDispatchingEventsOnClose(t *testing.T) { - eg := newExecutionContext() - processor := NewBatchEventProcessor( - WithQueueSize(100), - WithQueue(NewInMemoryQueue(100)), - WithEventDispatcher(NewMockDispatcher(100, false))) - eg.Go(processor.Start) - - impression := BuildTestImpressionEvent() - - for i := 0; i < 10; i++ { - processor.ProcessEvent(impression) - } - - assert.Equal(t, 10, processor.eventsCount()) - - // Triggers the flush in the processor - eg.TerminateAndWait() - - dispatcher, ok := processor.EventDispatcher.(*MockDispatcher) - assert.True(t, ok) - - assert.Equal(t, 0, processor.eventsCount()) - assert.Equal(t, 0, dispatcher.eventsQueue.Size()) -} - func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) { eg := newExecutionContext() dispatcher := NewMockDispatcher(100, false) From 704fdaee7869b15b8ca0e0f329615bb50251ac24 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 6 Jul 2023 23:21:05 +0600 Subject: [PATCH 10/10] update license header --- pkg/event/dispatcher.go | 4 ++-- pkg/event/dispatcher_test.go | 2 +- pkg/event/processor.go | 10 +++++----- pkg/event/processor_test.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/event/dispatcher.go b/pkg/event/dispatcher.go index 01c715b9c..987990359 100644 --- a/pkg/event/dispatcher.go +++ b/pkg/event/dispatcher.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019, Optimizely, Inc. and contributors * + * Copyright 2019,2023 Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * @@ -114,7 +114,7 @@ func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(timeout time.Dur if ed.eventQueue.Size() == 0 { return } - time.Sleep(WaitForDispatchingEventsInterval) + time.Sleep(CloseEventDispatchWaitTime) } } } diff --git a/pkg/event/dispatcher_test.go b/pkg/event/dispatcher_test.go index 04622c9b4..a3c4e07c9 100644 --- a/pkg/event/dispatcher_test.go +++ b/pkg/event/dispatcher_test.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019-2020,2022 Optimizely, Inc. and contributors * + * Copyright 2019-2020,2022-2023 Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * diff --git a/pkg/event/processor.go b/pkg/event/processor.go index aa8d9d8d9..f1727d9c7 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -65,11 +65,11 @@ const DefaultEventQueueSize = 2000 // DefaultEventFlushInterval holds the default value for the event flush interval const DefaultEventFlushInterval = 30 * time.Second -// WaitForDispatchingEventsInterval holds the checking interval for the dispatching events on client close -const WaitForDispatchingEventsInterval = 500 * time.Millisecond +// CloseEventDispatchWaitTime holds the checking interval for the dispatching events on client close +const CloseEventDispatchWaitTime = 500 * time.Millisecond -// WaitForDispatchingEventsTimeout holds the timeout value for the waiting for the dispatching events on client close -const WaitForDispatchingEventsTimeout = 30 * time.Second +// CloseEventDispatchTimeout holds the timeout value for the waiting for the dispatching events on client close +const CloseEventDispatchTimeout = 30 * time.Second // DefaultEventEndPoint is used as the default endpoint for sending events. const DefaultEventEndPoint = "https://logx.optimizely.com/v1/events" @@ -251,7 +251,7 @@ func (p *BatchEventProcessor) startTicker(ctx context.Context) { d, ok := p.EventDispatcher.(*QueueEventDispatcher) if ok { d.flushEvents() - d.waitForDispatchingEventsOnClose(WaitForDispatchingEventsTimeout) + d.waitForDispatchingEventsOnClose(CloseEventDispatchTimeout) } p.Ticker.Stop() return diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index fe7c12613..15b346d28 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019-2020, Optimizely, Inc. and contributors * + * Copyright 2019-2020,2023 Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. *