Skip to content

Commit

Permalink
refactor testing of serveRequest method
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Wolf committed Jun 30, 2023
1 parent fe60cf5 commit 8dae0c4
Showing 1 changed file with 155 additions and 78 deletions.
233 changes: 155 additions & 78 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,85 +58,160 @@ func TestExtractLastEventID(t *testing.T) {
assert.Equal(t, "id4711", got)
}

func TestServeRequestReplayOnly(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()

replayMessages := []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
}

buf := &bytes.Buffer{}
prop := observer.NewProperty[Messager](nil)

serveRequest(ctx, buf, DefaultMessageToBytesConverter, prop.Observe(), replayMessages, 0, nil)

assert.Equal(t, messagesToBytes(replayMessages, DefaultMessageToBytesConverter), buf.Bytes())
}

func TestServeRequestNoKeepAlive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
defer cancel()

replayMessages := []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
}
messages := []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
}

writer := newMaxWriter(len(replayMessages) + len(messages))

prop := observer.NewProperty[Messager](nil)
stream := prop.Observe()
for _, msg := range messages {
prop.Update(msg)
}

// trigger error in writer
prop.Update(DefaultKeepAliveMessage)

serveRequest(ctx, writer, DefaultMessageToBytesConverter, stream, replayMessages, 0, nil)

assert.Equal(t, messagesToBytes(append(replayMessages, messages...), DefaultMessageToBytesConverter), writer.Bytes())
}

func TestServeRequestWithKeepAlive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messages := []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
func TestServe(t *testing.T) {
type testCase struct {
name string
ctxTimeout time.Duration
expectedCtxError error
writer *maxWriter
expectedWriteCalled int
replayMessages []Messager
messages []Messager
expectedMessages []Messager
keepAliveInterval time.Duration
}

prop := observer.NewProperty[Messager](nil)
stream := prop.Observe()
for _, msg := range messages {
prop.Update(msg)
testCases := []testCase{
{
name: "replay only",
ctxTimeout: 0,
expectedCtxError: context.DeadlineExceeded,
writer: newMaxWriter(1337),
expectedWriteCalled: 2,
replayMessages: []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
},
messages: []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
},
expectedMessages: []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
},
keepAliveInterval: 0,
},
{
name: "replay write error",
ctxTimeout: 3 * time.Second,
expectedCtxError: nil,
writer: newMaxWriter(1),
expectedWriteCalled: 2,
replayMessages: []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
},
messages: nil,
expectedMessages: []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
},
keepAliveInterval: 0,
},
{
name: "replay and normal messages, no keep alive",
ctxTimeout: 3 * time.Second,
expectedCtxError: nil,
writer: newMaxWriter(4),
expectedWriteCalled: 5,
replayMessages: []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
},
messages: []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
DefaultKeepAliveMessage,
},
expectedMessages: []Messager{
NewMessage().WithData([]byte("replay")).WithID("1"),
NewMessage().WithEvent("e").WithData([]byte("test")).WithID("2"),
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
},
keepAliveInterval: 0,
},
{
name: "replay, normal messages and keep alive",
ctxTimeout: 3 * time.Second,
expectedCtxError: nil,
writer: newMaxWriter(5),
expectedWriteCalled: 6,
replayMessages: nil,
messages: []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
},
expectedMessages: []Messager{
DefaultKeepAliveMessage,
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
DefaultKeepAliveMessage,
DefaultKeepAliveMessage,
},
keepAliveInterval: 1 * time.Millisecond,
},
{
name: "only normal messages, context timeout",
ctxTimeout: 333 * time.Millisecond,
expectedCtxError: context.DeadlineExceeded,
writer: newMaxWriter(1337),
expectedWriteCalled: 2,
replayMessages: nil,
messages: []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
},
expectedMessages: []Messager{
NewMessage().WithEvent("e1").WithData([]byte("message 1")),
NewMessage().WithEvent("e2").WithData([]byte("message 2")),
},
keepAliveInterval: 0,
},
{
name: "only keep alive",
ctxTimeout: 30 * time.Millisecond,
expectedCtxError: context.DeadlineExceeded,
writer: newMaxWriter(1337),
expectedWriteCalled: 2,
replayMessages: nil,
messages: nil,
expectedMessages: []Messager{
DefaultKeepAliveMessage, // initial one
DefaultKeepAliveMessage, // sent via keep alive logic
},
keepAliveInterval: 20 * time.Millisecond,
},
{
name: "initial keep alive error",
ctxTimeout: 3 * time.Second,
expectedCtxError: nil,
writer: newMaxWriter(0),
expectedWriteCalled: 1,
replayMessages: nil,
messages: nil,
expectedMessages: nil,
keepAliveInterval: 1 * time.Millisecond,
},
}

writer := newMaxWriter(len(messages) + 2) // +2 keep alive messages
serveRequest(ctx, writer, DefaultMessageToBytesConverter, stream, nil, 1*time.Millisecond, DefaultKeepAliveMessage)

splitMinLen := len(messages) + 1 + 1 + 1 // initial keep alive message + one keep alive msg + one empty split entry at the end

keepAliveMessageBytes := DefaultMessageToBytesConverter.Convert(DefaultKeepAliveMessage)

keepAliveDetected := false
split := bytes.SplitAfter(writer.Bytes(), []byte("\n\n"))
assert.True(t, len(split) >= splitMinLen)
assert.Equal(t, keepAliveMessageBytes, split[0])
assert.Equal(t, DefaultMessageToBytesConverter.Convert(messages[0]), split[1])
assert.Equal(t, DefaultMessageToBytesConverter.Convert(messages[1]), split[2])
for i := len(messages) + 1; i < len(split)-1; i++ { // last split entry is always empty
assert.Equal(t, keepAliveMessageBytes, split[i])
keepAliveDetected = true
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
prop := observer.NewProperty[Messager](nil)
stream := prop.Observe()
for _, msg := range tc.messages {
prop.Update(msg)
}

ctx, cancel := context.WithTimeout(context.Background(), tc.ctxTimeout)
serveRequest(ctx, tc.writer, DefaultMessageToBytesConverter, stream, tc.replayMessages, tc.keepAliveInterval, DefaultKeepAliveMessage)
assert.ErrorIs(t, ctx.Err(), tc.expectedCtxError)
cancel()

assert.Equal(t, messagesToBytes(tc.expectedMessages, DefaultMessageToBytesConverter), tc.writer.Bytes())
assert.Equal(t, tc.expectedWriteCalled, tc.writer.writeCalled)
})
}
assert.True(t, keepAliveDetected)
}

type noFlushMock struct {
Expand All @@ -153,8 +228,9 @@ func (n *noFlushMock) Write(i []byte) (int, error) { return 0, nil }
func (n *noFlushMock) WriteHeader(statusCode int) { n.statusCode = statusCode }

type maxWriter struct {
i int
maxWrites int
writeCalled int
writesExecuted int
maxWrites int
bytes.Buffer
}

Expand All @@ -163,11 +239,12 @@ func newMaxWriter(maxWrites int) *maxWriter {
}

func (m *maxWriter) Write(p []byte) (n int, err error) {
if m.i >= m.maxWrites {
m.writeCalled++
if m.writesExecuted >= m.maxWrites {
return 0, fmt.Errorf("max writes reached")
}

m.i++
m.writesExecuted++
return m.Buffer.Write(p)
}

Expand Down

0 comments on commit 8dae0c4

Please sign in to comment.