Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0
github.com/twmb/murmur3 v1.0.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
)

// Work around issue wtih git.apache.org/thrift.git
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ github.com/twmb/murmur3 v1.0.0/go.mod h1:5Y5m8Y8WIyucaICVP+Aep5C8ydggjEuRQHDq1ic
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
32 changes: 22 additions & 10 deletions pkg/event/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func TestDefaultEventProcessor_ProcessImpression(t *testing.T) {

func TestCustomEventProcessor_Create(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithEventDispatcher(NewMockDispatcher(100, false)), WithQueueSize(10), WithFlushInterval(100))
processor := NewBatchEventProcessor(
WithEventDispatcher(NewMockDispatcher(100, false)),
WithQueueSize(10),
WithFlushInterval(100))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand All @@ -85,8 +88,10 @@ func TestCustomEventProcessor_Create(t *testing.T) {

func TestDefaultEventProcessor_LogEventNotification(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithFlushInterval(100), WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false)),
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)),
WithSDKKey("fakeSDKKey"))

var logEvent LogEvent
Expand Down Expand Up @@ -118,7 +123,8 @@ func TestDefaultEventProcessor_LogEventNotification(t *testing.T) {

func TestDefaultEventProcessor_DefaultConfig(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand Down Expand Up @@ -321,8 +327,10 @@ func TestBatchEventProcessor_FlushesOnClose(t *testing.T) {

func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))

processor.Start(exeCtx)

Expand Down Expand Up @@ -355,8 +363,10 @@ func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {

func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))

processor.Start(exeCtx)

Expand Down Expand Up @@ -389,8 +399,10 @@ func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {

func TestChanQueueEventProcessor_ProcessImpression(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(&HTTPEventDispatcher{}))
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(&HTTPEventDispatcher{}))

processor.Start(exeCtx)

Expand Down