Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/event/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package event

import (
"context"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -100,9 +101,26 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
return true, nil
}

// waitForDispatchingEventsOnClose will wait until all the event are dispatched
func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for {
select {
case <-ctx.Done():
return
default:
if ed.eventQueue.Size() == 0 {
return
}
time.Sleep(WaitForDispatchingEventsInterval)
}
}
}

// flush the events
func (ed *QueueEventDispatcher) flushEvents() {

// Limit flushing to a single worker
if !ed.processing.TryAcquire(1) {
return
Expand Down
37 changes: 37 additions & 0 deletions pkg/event/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,40 @@ func TestQueueEventDispatcher_FailDispath(t *testing.T) {
assert.Equal(t, float64(1), metricsRegistry.GetGauge(metrics.DispatcherQueueSize).(*MetricsGauge).Get())
assert.Equal(t, float64(0), metricsRegistry.GetCounter(metrics.DispatcherSuccessFlush).(*MetricsCounter).Get())
}

func TestQueueEventDispatcher_WaitForDispatchingEventsOnClose(t *testing.T) {
metricsRegistry := NewMetricsRegistry()

q := NewQueueEventDispatcher("", metricsRegistry)

assert.True(t, q.Dispatcher != nil)
if d, ok := q.Dispatcher.(*httpEventDispatcher); ok {
assert.True(t, d.requester != nil && d.logger != nil)
} else {
assert.True(t, false)
}
sender := &MockDispatcher{Events: NewInMemoryQueue(100), eventsQueue: NewInMemoryQueue(100)}
q.Dispatcher = sender

eventTags := map[string]interface{}{"revenue": 55.0, "value": 25.1}
config := TestConfig{}

for i := 0; i < 10; i++ {
conversionUserEvent := CreateConversionUserEvent(config, entities.Event{ExperimentIds: []string{"15402980349"}, ID: "15368860886", Key: "sample_conversion"}, userContext, eventTags)

batch := createBatchEvent(conversionUserEvent, createVisitorFromUserEvent(conversionUserEvent))
assert.Equal(t, conversionUserEvent.Timestamp, batch.Visitors[0].Snapshots[0].Events[0].Timestamp)

logEvent := createLogEvent(batch, DefaultEventEndPoint)

success, _ := q.DispatchEvent(logEvent)

assert.True(t, success)
}

// wait for the events to be dispatched
q.waitForDispatchingEventsOnClose(10 * time.Second)

// check the queue
assert.Equal(t, 0, q.eventQueue.Size())
}
7 changes: 7 additions & 0 deletions pkg/event/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ const DefaultEventQueueSize = 2000
// DefaultEventFlushInterval holds the default value for the event flush interval
const DefaultEventFlushInterval = 30 * time.Second

// WaitForDispatchingEventsInterval holds the checking interval for the dispatching events on client close
const WaitForDispatchingEventsInterval = 500 * time.Millisecond

// WaitForDispatchingEventsTimeout holds the timeout value for the waiting for the dispatching events on client close
const WaitForDispatchingEventsTimeout = 30 * time.Second

// DefaultEventEndPoint is used as the default endpoint for sending events.
const DefaultEventEndPoint = "https://logx.optimizely.com/v1/events"

Expand Down Expand Up @@ -245,6 +251,7 @@ func (p *BatchEventProcessor) startTicker(ctx context.Context) {
d, ok := p.EventDispatcher.(*QueueEventDispatcher)
if ok {
d.flushEvents()
d.waitForDispatchingEventsOnClose(WaitForDispatchingEventsTimeout)
}
p.Ticker.Stop()
return
Expand Down
21 changes: 16 additions & 5 deletions pkg/event/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func (c *CountingDispatcher) DispatchEvent(event LogEvent) (bool, error) {
}

type MockDispatcher struct {
ShouldFail bool
Events Queue
ShouldFail bool
Events Queue
eventsQueue Queue // dispatch events from this queue
}

func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
Expand All @@ -53,11 +54,22 @@ func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
}

m.Events.Add(event)
if m.eventsQueue != nil {
m.eventsQueue.Add(event)
go m.flushEvents()
}
return true, nil
}

func (m *MockDispatcher) flushEvents() {
queueSize := m.eventsQueue.Size()
for ; queueSize > 0; queueSize = m.eventsQueue.Size() {
m.eventsQueue.Remove(1)
}
}

func NewMockDispatcher(queueSize int, shouldFail bool) *MockDispatcher {
return &MockDispatcher{Events: NewInMemoryQueue(queueSize), ShouldFail: shouldFail}
return &MockDispatcher{Events: NewInMemoryQueue(queueSize), eventsQueue: NewInMemoryQueue(queueSize), ShouldFail: shouldFail}
}

func newExecutionContext() *utils.ExecGroup {
Expand Down Expand Up @@ -180,7 +192,6 @@ func TestDefaultEventProcessor_BatchSizes(t *testing.T) {
assert.Equal(t, 50, len(logEvent.Event.Visitors))
logEvent, _ = evs[1].(LogEvent)
assert.Equal(t, 50, len(logEvent.Event.Visitors))

}
eg.TerminateAndWait()
}
Expand Down Expand Up @@ -493,7 +504,7 @@ func (l *NoOpLogger) SetLogLevel(level logging.LogLevel) {

}

/**
/*
goos: darwin
goarch: amd64
pkg: github.com/optimizely/go-sdk/pkg/event
Expand Down