Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pkg/event/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/****************************************************************************
* Copyright 2019, Optimizely, Inc. and contributors *
* Copyright 2019,2023 Optimizely, Inc. and contributors *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
Expand All @@ -18,6 +18,7 @@
package event

import (
"context"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -100,9 +101,26 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
return true, nil
}

// waitForDispatchingEventsOnClose will wait until all the event are dispatched
func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for {
select {
case <-ctx.Done():
return
default:
if ed.eventQueue.Size() == 0 {
return
}
time.Sleep(CloseEventDispatchWaitTime)
}
}
}

// flush the events
func (ed *QueueEventDispatcher) flushEvents() {

// Limit flushing to a single worker
if !ed.processing.TryAcquire(1) {
return
Expand Down
39 changes: 38 additions & 1 deletion pkg/event/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/****************************************************************************
* Copyright 2019-2020,2022 Optimizely, Inc. and contributors *
* Copyright 2019-2020,2022-2023 Optimizely, Inc. and contributors *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
Expand Down Expand Up @@ -197,3 +197,40 @@ func TestQueueEventDispatcher_FailDispath(t *testing.T) {
assert.Equal(t, float64(1), metricsRegistry.GetGauge(metrics.DispatcherQueueSize).(*MetricsGauge).Get())
assert.Equal(t, float64(0), metricsRegistry.GetCounter(metrics.DispatcherSuccessFlush).(*MetricsCounter).Get())
}

func TestQueueEventDispatcher_WaitForDispatchingEventsOnClose(t *testing.T) {
metricsRegistry := NewMetricsRegistry()

q := NewQueueEventDispatcher("", metricsRegistry)

assert.True(t, q.Dispatcher != nil)
if d, ok := q.Dispatcher.(*httpEventDispatcher); ok {
assert.True(t, d.requester != nil && d.logger != nil)
} else {
assert.True(t, false)
}
sender := &MockDispatcher{Events: NewInMemoryQueue(100), eventsQueue: NewInMemoryQueue(100)}
q.Dispatcher = sender

eventTags := map[string]interface{}{"revenue": 55.0, "value": 25.1}
config := TestConfig{}

for i := 0; i < 10; i++ {
conversionUserEvent := CreateConversionUserEvent(config, entities.Event{ExperimentIds: []string{"15402980349"}, ID: "15368860886", Key: "sample_conversion"}, userContext, eventTags)

batch := createBatchEvent(conversionUserEvent, createVisitorFromUserEvent(conversionUserEvent))
assert.Equal(t, conversionUserEvent.Timestamp, batch.Visitors[0].Snapshots[0].Events[0].Timestamp)

logEvent := createLogEvent(batch, DefaultEventEndPoint)

success, _ := q.DispatchEvent(logEvent)

assert.True(t, success)
}

// wait for the events to be dispatched
q.waitForDispatchingEventsOnClose(10 * time.Second)

// check the queue
assert.Equal(t, 0, q.eventQueue.Size())
}
7 changes: 7 additions & 0 deletions pkg/event/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ const DefaultEventQueueSize = 2000
// DefaultEventFlushInterval holds the default value for the event flush interval
const DefaultEventFlushInterval = 30 * time.Second

// CloseEventDispatchWaitTime holds the checking interval for the dispatching events on client close
const CloseEventDispatchWaitTime = 500 * time.Millisecond

// CloseEventDispatchTimeout holds the timeout value for the waiting for the dispatching events on client close
const CloseEventDispatchTimeout = 30 * time.Second

// DefaultEventEndPoint is used as the default endpoint for sending events.
const DefaultEventEndPoint = "https://logx.optimizely.com/v1/events"

Expand Down Expand Up @@ -245,6 +251,7 @@ func (p *BatchEventProcessor) startTicker(ctx context.Context) {
d, ok := p.EventDispatcher.(*QueueEventDispatcher)
if ok {
d.flushEvents()
d.waitForDispatchingEventsOnClose(CloseEventDispatchTimeout)
}
p.Ticker.Stop()
return
Expand Down
23 changes: 17 additions & 6 deletions pkg/event/processor_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/****************************************************************************
* Copyright 2019-2020, Optimizely, Inc. and contributors *
* Copyright 2019-2020,2023 Optimizely, Inc. and contributors *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
Expand Down Expand Up @@ -43,8 +43,9 @@ func (c *CountingDispatcher) DispatchEvent(event LogEvent) (bool, error) {
}

type MockDispatcher struct {
ShouldFail bool
Events Queue
ShouldFail bool
Events Queue
eventsQueue Queue // dispatch events from this queue
}

func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
Expand All @@ -53,11 +54,22 @@ func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
}

m.Events.Add(event)
if m.eventsQueue != nil {
m.eventsQueue.Add(event)
go m.flushEvents()
}
return true, nil
}

func (m *MockDispatcher) flushEvents() {
queueSize := m.eventsQueue.Size()
for ; queueSize > 0; queueSize = m.eventsQueue.Size() {
m.eventsQueue.Remove(1)
}
}

func NewMockDispatcher(queueSize int, shouldFail bool) *MockDispatcher {
return &MockDispatcher{Events: NewInMemoryQueue(queueSize), ShouldFail: shouldFail}
return &MockDispatcher{Events: NewInMemoryQueue(queueSize), eventsQueue: NewInMemoryQueue(queueSize), ShouldFail: shouldFail}
}

func newExecutionContext() *utils.ExecGroup {
Expand Down Expand Up @@ -180,7 +192,6 @@ func TestDefaultEventProcessor_BatchSizes(t *testing.T) {
assert.Equal(t, 50, len(logEvent.Event.Visitors))
logEvent, _ = evs[1].(LogEvent)
assert.Equal(t, 50, len(logEvent.Event.Visitors))

}
eg.TerminateAndWait()
}
Expand Down Expand Up @@ -493,7 +504,7 @@ func (l *NoOpLogger) SetLogLevel(level logging.LogLevel) {

}

/**
/*
goos: darwin
goarch: amd64
pkg: github.com/optimizely/go-sdk/pkg/event
Expand Down