This repository has been archived by the owner on Jun 7, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
server.go
228 lines (203 loc) · 6.08 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package lumina
import (
"bytes"
"context"
"encoding/hex"
"errors"
"github.com/palantir/stacktrace"
"log"
"net"
)
var _ = hex.EncodeToString
type HeloCallback func(context.Context) (context.Context, error)
type Server struct {
Handler Handler
// Called once after PKT_HELO is received. Protocol processing is deferred
// until the callback returns.
OnHELO HeloCallback
}
func (srv *Server) Serve(ln net.Listener) error {
if srv.Handler == nil {
return stacktrace.NewError("server handler not set")
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
return stacktrace.Propagate(err, "unable to accept")
}
go srv.serveConn(conn)
}
}
// Shutdown gracefully shuts down the server without interrupting any active
// connections. Shutdown works by first closing all open listeners, then closing
// all idle connections, and then waiting indefinitely for connections to return
// to idle and then shut down. If the provided context expires before the shut-
// down is complete, Shutdown returns the context's error, otherwise it returns
// any error returned from closing the Server's underlying Listener(s).
//
// When Shutdown is called, Serve immediately return ErrServerClosed. Make sure
// the program doesn't exit and waits instead for Shutdown to return.
//
// Once Shutdown has been called on a server, it may not be reused; future calls
// to methods such as Serve will return ErrServerClosed.
// func (s *Server) Shutdown(ctx context.Context) error {
// return nil
// }
func (srv *Server) serveConn(conn net.Conn) {
defer conn.Close()
logger := newTaggedLogger()
logger.Print("incoming connection from ", conn.RemoteAddr())
ctx := context.TODO()
ctx = setConn(ctx, conn)
ctx = setLogger(ctx, logger)
s := ServerSession{
conn: conn,
logger: logger,
ctx: ctx,
}
s.serve(srv.Handler, srv.OnHELO)
}
type ServerSession struct {
conn net.Conn
logger *log.Logger
ctx context.Context
}
var errInternalServerError = errors.New("internal server error")
func (s *ServerSession) serveOne(handler Handler) error {
var (
req Request
reqErr error // error when recv and serve request
rsp Packet
rspErr error // error when send response
)
req, reqErr = s.recvRequest(handler)
switch reqErr {
case nil:
// TODO: further refine context
rsp, reqErr = handler.ServeRequest(s.ctx, req)
if reqErr != nil {
reqErr = stacktrace.Propagate(reqErr, "error while serving request")
} else {
reqType := req.getType()
rspType := rsp.getType()
if !(rspType == PKT_RPC_FAIL || reqType == PKT_HELO && rspType == PKT_HELO_RESULT || rspType == req.getResponseType()) {
reqErr = stacktrace.NewError("%s response is not allowed for %s request", rspType, reqType)
}
}
case errInternalServerError: // non-critical
break
default: // critical
return stacktrace.Propagate(reqErr, "unrecoverable error while receiving request")
}
if reqErr != nil {
s.logger.Print(reqErr)
rsp = &RpcFailPacket{
Result: -1,
Error: "internal server error",
}
}
if rspErr = s.sendResponse(rsp); rspErr != nil {
rspErr = stacktrace.Propagate(rspErr, "unrecoverable error while sending response")
}
if reqErr != nil {
return reqErr
}
return rspErr
}
func (s *ServerSession) serve(handler Handler, onHelo HeloCallback) {
var err error
var newCtx context.Context
if err = s.serveOne(&heloHandler{s}); err != nil {
goto _ret
}
if onHelo != nil {
if newCtx, err = onHelo(s.ctx); err != nil {
goto _ret
}
s.ctx = newCtx
}
for {
if err = s.serveOne(handler); err != nil {
goto _ret
}
}
_ret:
s.logger.Print(err)
return
}
func (s *ServerSession) recvRequest(h Handler) (req Request, err error) {
var reqRaw RawPacket
err = reqRaw.readFrom(s.conn)
if err != nil {
err = stacktrace.Propagate(err, "unable to read raw packet from client")
return
}
// TODO: return a SERVFAIL instead of abort connection
reqType := reqRaw.GetType()
s.logger.Print("server recv request type: ", reqType)
// logger.Print("server recv request rawdata: ", hex.EncodeToString(reqRaw.GetPayload()))
if !h.AcceptRequest(reqType) {
s.logger.Printf("request of type %v is not accepted by server", reqType)
err = errInternalServerError
return
}
req, _ = h.GetPacketOfType(reqType).(Request)
if req == nil {
s.logger.Printf("request of type %v is not supported by server", reqType)
err = errInternalServerError
return
}
r := bytes.NewReader(reqRaw.GetPayload())
err = req.readFrom(r)
if err == nil && r.Len() > 0 {
err = errTrailingData
}
if err != nil {
s.logger.Print(stacktrace.Propagate(err, "unable to unmarshal request of type %v", reqType))
err = errInternalServerError
return
}
// logger.Printf("server recv request data: %+v", req)
if err = req.validateFields(); err != nil {
s.logger.Print(stacktrace.Propagate(err, "validation failed for request of type %v", reqType))
err = errInternalServerError
return
}
if cw, _ := req.(cacheWriter); cw != nil {
cw.setCache(reqRaw)
}
return
}
// TODO: standalone code path for internal server error
func (s *ServerSession) sendResponse(rsp Packet) (err error) {
rspType := rsp.getType()
s.logger.Print("server send response type: ", rspType)
// logger.Printf("server send response data: %+v", rsp)
var rspRaw RawPacket
if cr, _ := rsp.(cacheReader); cr != nil {
rspRaw = cr.getCache()
}
if rspRaw == nil {
if err = rsp.validateFields(); err != nil {
s.logger.Print(stacktrace.Propagate(err, "validation failed for response of type %v", rspType))
return
}
rspPayload := &bytes.Buffer{}
if err = rsp.writeTo(rspPayload); err != nil {
s.logger.Print(stacktrace.Propagate(err, "unable to marshal response of type %v", rspType))
return
}
rspRaw, err = NewRawPacket(rspType, rspPayload.Bytes())
if err != nil {
s.logger.Print(stacktrace.Propagate(err, "unable to compose raw packet for response of type %v", rspType))
return
}
}
// logger.Print("server send response rawdata: ", hex.EncodeToString(rspRaw.GetPayload()))
if err = rspRaw.writeTo(s.conn); err != nil {
s.logger.Print(stacktrace.Propagate(err, "unable to write raw packet to client"))
return
}
return
}