-
Notifications
You must be signed in to change notification settings - Fork 14
/
channel_test.go
184 lines (162 loc) · 4.9 KB
/
channel_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package gorums
import (
"context"
"testing"
"time"
"github.com/relab/gorums/ordering"
"github.com/relab/gorums/tests/mock"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type mockSrv struct{}
func (mockSrv) Test(_ ServerCtx, _ *mock.Request) (*mock.Response, error) {
return nil, nil
}
func dummyMgr() *RawManager {
return NewRawManager(
WithGrpcDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
}
var handlerName = "mock.Server.Test"
func dummySrv() *Server {
mockSrv := &mockSrv{}
srv := NewServer()
srv.RegisterHandler(handlerName, func(ctx ServerCtx, in *Message, finished chan<- *Message) {
req := in.Message.(*mock.Request)
defer ctx.Release()
resp, err := mockSrv.Test(ctx, req)
SendMessage(ctx, finished, WrapMessage(in.Metadata, resp, err))
})
return srv
}
func TestChannelCreation(t *testing.T) {
node, err := NewRawNode("127.0.0.1:5000")
if err != nil {
t.Fatal(err)
}
//the node should be closed manually because it isn't added to the configuration
defer node.close()
mgr := dummyMgr()
// a proper connection should NOT be established here
node.connect(mgr)
replyChan := make(chan response, 1)
go func() {
md := &ordering.Metadata{MessageID: 1, Method: handlerName}
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}}
node.channel.enqueue(req, replyChan, false)
}()
select {
case <-replyChan:
case <-time.After(3 * time.Second):
t.Fatal("deadlock: impossible to enqueue messages to the node")
}
}
func TestChannelSuccessfulConnection(t *testing.T) {
addrs, teardown := TestSetup(t, 1, func(_ int) ServerIface {
return dummySrv()
})
defer teardown()
mgr := dummyMgr()
defer mgr.Close()
node, err := NewRawNode(addrs[0])
if err != nil {
t.Fatal(err)
}
if err = mgr.AddNode(node); err != nil {
t.Fatal(err)
}
if len(mgr.Nodes()) < 1 {
t.Fatal("the node was not added to the configuration")
}
if !node.channel.isConnected() {
t.Fatal("a connection could not be made to a live node")
}
if node.conn == nil {
t.Fatal("connection should not be nil")
}
}
func TestChannelUnsuccessfulConnection(t *testing.T) {
mgr := dummyMgr()
defer mgr.Close()
// no servers are listening on the given address
node, err := NewRawNode("127.0.0.1:5000")
if err != nil {
t.Fatal(err)
}
// the node should still be added to the configuration
if err = mgr.AddNode(node); err != nil {
t.Fatal(err)
}
if len(mgr.Nodes()) < 1 {
t.Fatal("the node was not added to the configuration")
}
if node.conn == nil {
t.Fatal("connection should not be nil when NOT using WithBlock()")
}
}
func TestChannelReconnection(t *testing.T) {
srvAddr := "127.0.0.1:5000"
// wait to start the server
startServer, stopServer := testServerSetup(t, srvAddr, dummySrv())
node, err := NewRawNode(srvAddr)
if err != nil {
t.Fatal(err)
}
//the node should be closed manually because it isn't added to the configuration
defer node.close()
mgr := dummyMgr()
// a proper connection should NOT be established here because server is not started
node.connect(mgr)
// send first message when server is down
replyChan1 := make(chan response, 1)
go func() {
md := &ordering.Metadata{MessageID: 1, Method: handlerName}
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}}
node.channel.enqueue(req, replyChan1, false)
}()
// check response: should be error because server is down
select {
case resp := <-replyChan1:
if resp.err == nil {
t.Error("response err: got <nil>, want error")
}
case <-time.After(3 * time.Second):
t.Fatal("deadlock: impossible to enqueue messages to the node")
}
startServer()
// send second message when server is up
replyChan2 := make(chan response, 1)
go func() {
md := &ordering.Metadata{MessageID: 2, Method: handlerName}
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}, opts: getCallOptions(E_Multicast, nil)}
node.channel.enqueue(req, replyChan2, false)
}()
// check response: error should be nil because server is up
select {
case resp := <-replyChan2:
if resp.err != nil {
t.Errorf("response err: got %v, want <nil>", resp.err)
}
case <-time.After(3 * time.Second):
t.Fatal("deadlock: impossible to enqueue messages to the node")
}
stopServer()
// send third message when server has been previously up but is now down
replyChan3 := make(chan response, 1)
go func() {
md := &ordering.Metadata{MessageID: 3, Method: handlerName}
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}}
node.channel.enqueue(req, replyChan3, false)
}()
// check response: should be error because server is down
select {
case resp3 := <-replyChan3:
if resp3.err == nil {
t.Error("response err: got <nil>, want error")
}
case <-time.After(3 * time.Second):
t.Fatal("deadlock: impossible to enqueue messages to the node")
}
}