-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_conn_handler.go
156 lines (123 loc) · 2.83 KB
/
client_conn_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package goentangle
import (
"errors"
"io"
"sync"
)
// Client connection is shut down.
var ErrShutdown = errors.New("connection is shut down")
// Client connection handler.
//
// Convenience type for handling requests and responses for clients.
type ClientConnHandler struct {
// Underlying connection.
conn *Conn
// Pending messages.
pending map[MessageId]chan Message
// Lock for pending table.
pendingLock sync.Mutex
// Closing.
closing bool
// Shut down.
shutdown bool
// Lock for state.
stateLock sync.Mutex
}
// Receive messages and dispatch.
func (h *ClientConnHandler) receive() {
// Receive messages for as long as possible.
var err error
for err == nil {
var msg Message
if msg, err = h.conn.Receive(); err != nil {
break
}
// Ignore anything that is not a response or exception.
ignore := true
switch msg.(type) {
case *ResponseMessage, *ExceptionMessage, *NotificationAcknowledgementMessage:
ignore = false
}
if ignore {
continue
}
// Determine the recipient of the message.
var done chan Message
h.pendingLock.Lock()
done, _ = h.pending[msg.MessageId()]
delete(h.pending, msg.MessageId())
h.pendingLock.Unlock()
// Send the message to the recipient if possible.
if done != nil {
done <- msg
}
}
// Close all pending message channels.
h.pendingLock.Lock()
h.stateLock.Lock()
h.shutdown = true
for _, done := range h.pending {
done <- nil
}
h.stateLock.Unlock()
h.pendingLock.Unlock()
}
// Call a remote function.
func (h *ClientConnHandler) Call(method string, args []interface{}, notify bool, trace bool) (resp Message, err error) {
// Make sure we're in normal operating state.
h.stateLock.Lock()
if h.shutdown || h.closing {
err = ErrShutdown
h.stateLock.Unlock()
return
}
h.stateLock.Unlock()
// Acquire the lock for the pending table.
h.pendingLock.Lock()
// Send the message.
var msgId MessageId
if notify {
msgId, err = h.conn.SendNotification(method, args)
} else {
msgId, err = h.conn.SendRequest(method, args, trace)
}
if err != nil {
if err == io.EOF {
err = ErrShutdown
}
h.pendingLock.Unlock()
return
}
// Allocate a channel for awaiting the response, update the pending table
// and unlock.
done := make(chan Message, 1)
h.pending[msgId] = done
h.pendingLock.Unlock()
// Wait for the response.
defer close(done)
if resp = <-done; resp == nil {
err = ErrShutdown
return
}
return
}
// Close the connection.
func (h *ClientConnHandler) Close() error {
h.stateLock.Lock()
defer h.stateLock.Unlock()
if h.closing || h.shutdown {
return ErrShutdown
}
h.closing = true
h.conn.Close()
return nil
}
// New client connection handler.
func NewClientConnHandler(conn *Conn) (h *ClientConnHandler) {
h = &ClientConnHandler{
conn: conn,
pending: make(map[MessageId]chan Message),
}
go h.receive()
return
}