-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathsynchronous.go
404 lines (346 loc) · 15.6 KB
/
synchronous.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
package proxy
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/url"
"path"
"time"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdktypes "github.com/pokt-network/shannon-sdk/types"
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/config"
"github.com/pokt-network/poktroll/x/service/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)
var _ relayer.RelayServer = (*synchronousRPCServer)(nil)
func init() {
reg := codectypes.NewInterfaceRegistry()
types.RegisterInterfaces(reg)
}
// synchronousRPCServer is the struct that holds the state of the synchronous
// RPC server. It is used to listen for and respond to relay requests where
// there is a one-to-one correspondence between relay requests and relay responses.
type synchronousRPCServer struct {
logger polylog.Logger
// supplierServiceMap is a map of serviceId -> SupplierServiceConfig
// representing the supplier's advertised services.
supplierServiceMap map[string]*sharedtypes.Service
// serverConfig is the configuration of the proxy server. It contains the
// host address of the server, the service endpoint, and the advertised service.
// endpoints it gets relay requests from.
serverConfig *config.RelayMinerServerConfig
// server is the HTTP server that listens for incoming relay requests.
server *http.Server
// relayerProxy is the main relayer proxy that the server uses to perform its operations.
relayerProxy relayer.RelayerProxy
// servedRelaysProducer is a channel that emits the relays that have been served, allowing
// the servedRelays observable to fan-out notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay
}
// NewSynchronousServer creates a new HTTP server that listens for incoming
// relay requests and forwards them to the supported proxied service endpoint.
// It takes the serviceId, endpointUrl, and the main RelayerProxy as arguments
// and returns a RelayServer that listens to incoming RelayRequests.
// TODO_RESEARCH(#590): Currently, the communication between the AppGateServer and the
// RelayMiner uses HTTP. This could be changed to a more generic and performant
// one, such as pure TCP.
func NewSynchronousServer(
logger polylog.Logger,
serverConfig *config.RelayMinerServerConfig,
supplierServiceMap map[string]*sharedtypes.Service,
servedRelaysProducer chan<- *types.Relay,
proxy relayer.RelayerProxy,
) relayer.RelayServer {
return &synchronousRPCServer{
logger: logger,
supplierServiceMap: supplierServiceMap,
server: &http.Server{Addr: serverConfig.ListenAddress},
relayerProxy: proxy,
servedRelaysProducer: servedRelaysProducer,
serverConfig: serverConfig,
}
}
// Start starts the service server and returns an error if it fails.
// It also waits for the passed in context to end before shutting down.
// This method is blocking and should be called in a goroutine.
func (sync *synchronousRPCServer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
sync.server.Shutdown(ctx)
}()
// Set the HTTP handler.
sync.server.Handler = sync
return sync.server.ListenAndServe()
}
// Stop terminates the service server and returns an error if it fails.
func (sync *synchronousRPCServer) Stop(ctx context.Context) error {
return sync.server.Shutdown(ctx)
}
// ServeHTTP listens for incoming relay requests. It implements the respective
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when synchronousRPCServer is used as an http.Handler with an http.Server.
// (see https://pkg.go.dev/net/http#Handler)
func (sync *synchronousRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
startTime := time.Now()
ctx := request.Context()
sync.logger.Debug().Msg("serving synchronous relay request")
// Extract the relay request from the request body.
sync.logger.Debug().Msg("extracting relay request from request body")
relayRequest, err := sync.newRelayRequest(request)
request.Body.Close()
if err != nil {
sync.replyWithError(err, nil, writer)
sync.logger.Warn().Err(err).Msg("failed serving relay request")
return
}
if err := relayRequest.ValidateBasic(); err != nil {
sync.replyWithError(err, relayRequest, writer)
sync.logger.Warn().Err(err).Msg("failed validating relay response")
return
}
supplierService := relayRequest.Meta.SessionHeader.Service
originHost := request.Host
// When the proxy is behind a reverse proxy, or is getting its requests from
// a CDN or a load balancer, the host header may not contain the on-chain
// advertized address needed to determine the service that the relay request is for.
// These CDNs and reverse proxies usually set the X-Forwarded-Host header
// to the original host.
// RelayMiner operators that have such a setup can set the XForwardedHostLookup
// option to true in the server config to enable the proxy to look up the
// original host from the X-Forwarded-Host header.
// Get the original host from X-Forwarded-Host header if specified in the supplier
// config and fall back to the Host header if it is not specified.
if sync.serverConfig.XForwardedHostLookup {
originHost = request.Header.Get("X-Forwarded-Host")
}
// Extract the hostname from the request's Host header to match it with the
// publicly exposed endpoints of the supplier service which are hostnames
// (i.e. hosts without the port number).
// Add the http scheme to the originHost to parse it as a URL.
originHostUrl, err := url.Parse(fmt.Sprintf("http://%s", originHost))
if err != nil {
// If the originHost cannot be parsed, reply with an internal error so that
// the original error is not exposed to the client.
clientError := ErrRelayerProxyInternalError.Wrap(err.Error())
sync.replyWithError(clientError, relayRequest, writer)
return
}
var serviceConfig *config.RelayMinerSupplierServiceConfig
// Get the Service and serviceUrl corresponding to the originHost.
// TODO_IMPROVE(red-0ne): Checking that the originHost is currently done by
// iterating over the server config's suppliers and checking if the originHost
// is present in any of the supplier's service's hosts. We could improve this
// by building a map at the server initialization level with originHost as the
// key so that we can get the service and serviceUrl in O(1) time.
for _, supplierServiceConfig := range sync.serverConfig.SupplierConfigsMap {
for _, host := range supplierServiceConfig.PubliclyExposedEndpoints {
if host == originHostUrl.Hostname() && supplierService.Id == supplierServiceConfig.ServiceId {
serviceConfig = supplierServiceConfig.ServiceConfig
break
}
}
if serviceConfig != nil {
break
}
}
if serviceConfig == nil {
sync.replyWithError(ErrRelayerProxyServiceEndpointNotHandled, relayRequest, writer)
return
}
// Increment the relays counter.
relaysTotal.With("service_id", supplierService.Id).Add(1)
defer func() {
duration := time.Since(startTime).Seconds()
// Capture the relay request duration metric.
relaysDurationSeconds.With("service_id", supplierService.Id).Observe(duration)
}()
relayRequestSizeBytes.With("service_id", supplierService.Id).
Observe(float64(relayRequest.Size()))
relay, err := sync.serveHTTP(ctx, serviceConfig, supplierService, relayRequest)
if err != nil {
// Reply with an error if the relay could not be served.
sync.replyWithError(err, relayRequest, writer)
sync.logger.Warn().Err(err).Msg("failed serving relay request")
return
}
// Send the relay response to the client.
if err := sync.sendRelayResponse(relay.Res, writer); err != nil {
// If the originHost cannot be parsed, reply with an internal error so that
// the original error is not exposed to the client.
clientError := ErrRelayerProxyInternalError.Wrap(err.Error())
sync.replyWithError(clientError, relayRequest, writer)
sync.logger.Warn().Err(err).Msg("failed sending relay response")
return
}
sync.logger.Info().Fields(map[string]any{
"application_address": relay.Res.Meta.SessionHeader.ApplicationAddress,
"service_id": relay.Res.Meta.SessionHeader.Service.Id,
"session_start_height": relay.Res.Meta.SessionHeader.SessionStartBlockHeight,
"server_addr": sync.server.Addr,
}).Msg("relay request served successfully")
relaysSuccessTotal.With("service_id", supplierService.Id).Add(1)
relayResponseSizeBytes.With("service_id", supplierService.Id).
Observe(float64(relay.Res.Size()))
// Emit the relay to the servedRelays observable.
sync.servedRelaysProducer <- relay
}
// serveHTTP holds the underlying logic of ServeHTTP.
func (sync *synchronousRPCServer) serveHTTP(
ctx context.Context,
serviceConfig *config.RelayMinerSupplierServiceConfig,
supplierService *sharedtypes.Service,
relayRequest *types.RelayRequest,
) (*types.Relay, error) {
// Verify the relay request signature and session.
// TODO_TECHDEBT(red-0ne): Currently, the relayer proxy is responsible for verifying
// the relay request signature. This responsibility should be shifted to the relayer itself.
// Consider using a middleware pattern to handle non-proxy specific logic, such as
// request signature verification, session verification, and response signature.
// This would help in separating concerns and improving code maintainability.
// See https://github.com/pokt-network/poktroll/issues/160
if err := sync.relayerProxy.VerifyRelayRequest(ctx, relayRequest, supplierService); err != nil {
return nil, err
}
// Deserialize the relay request payload to get the upstream HTTP request.
poktHTTPRequest, err := sdktypes.DeserializeHTTPRequest(relayRequest.Payload)
if err != nil {
return nil, err
}
// Build the request to be sent to the native service by substituting
// the destination URL's host with the native service's listen address.
// This business logic is specific to the RelayMiner, and Gateways do not need
// to have have knowledge of it.
// It is the translation of the full Gateway->RelayMiner request to a
// RelayMiner->BackendService and needs to be as transparent as possible.
// The reply being sent back to the Gateway needs to be the same as the original,
// "as if" the request was sent directly to the BackendService. Which means
// the inclusion of any response headers, status codes and bodies.
sync.logger.Debug().
Str("destination_url", serviceConfig.BackendUrl.String()).
Msg("building relay request payload to service")
// Replace the upstream request URL with the host and scheme of the service
// backend's while preserving the other components.
// This is to ensure that the request complies with the requested service's API,
// while being served from another location.
requestUrl, err := url.Parse(poktHTTPRequest.Url)
if err != nil {
return nil, err
}
requestUrl.Host = serviceConfig.BackendUrl.Host
requestUrl.Scheme = serviceConfig.BackendUrl.Scheme
// Prepend the path of the service's backend URL to the path of the upstream request.
// This is done to ensure that the request complies with the service's backend URL,
// while preserving the path of the original request.
// This is particularly important for RESTful APIs where the path is used to
// determine the resource being accessed.
// For example, if the service's backend URL is "http://host:8080/api/v1",
// and the upstream request path is "/users", the final request path will be
// "http://host:8080/api/v1/users".
requestUrl.Path = path.Join(serviceConfig.BackendUrl.Path, requestUrl.Path)
// Merge the query parameters of the upstream request with the query parameters
// of the service's backend URL.
// This is done to ensure that the query parameters of the original request are
// passed and that the service's backend URL query parameters are also included.
// This is important for RESTful APIs where query parameters are used to filter
// and paginate resources.
// For example, if the service's backend URL is "http://host:8080/api/v1?key=abc",
// and the upstream request has a query parameter "page=1", the final request URL
// will be "http://host:8080/api/v1?key=abc&page=1".
query := requestUrl.Query()
for key, values := range serviceConfig.BackendUrl.Query() {
for _, value := range values {
query.Add(key, value)
}
}
requestUrl.RawQuery = query.Encode()
// TODO_TEST(red0ne): Test the request URL construction with different upstream
// request paths and query parameters.
// Use the same method, headers, and body as the original request to query the
// backend URL.
requestUrl.Host = serviceConfig.BackendUrl.Host
// Create the HTTP headers for the request by converting the RelayRequest's
// POKTHTTPRequest.Header to an http.Header.
headers := http.Header{}
poktHTTPRequest.CopyToHTTPHeader(headers)
// Create the HTTP request out of the RelayRequest's payload.
httpRequest := &http.Request{
Method: poktHTTPRequest.Method,
URL: requestUrl,
Header: headers,
Body: io.NopCloser(bytes.NewReader(poktHTTPRequest.BodyBz)),
}
requestUrl.RawQuery = query.Encode()
// TODO_TEST(red0ne): Test the request URL construction with different upstream
// request paths and query parameters.
// Use the same method, headers, and body as the original request to query the
// backend URL.
httpRequest.Host = serviceConfig.BackendUrl.Host
if serviceConfig.Authentication != nil {
httpRequest.SetBasicAuth(
serviceConfig.Authentication.Username,
serviceConfig.Authentication.Password,
)
}
// Add any service configuration specific headers to the request, such as
// authentication or authorization headers. These will override any upstream
// request headers with the same key.
for key, value := range serviceConfig.Headers {
httpRequest.Header.Set(key, value)
}
// Configure the HTTP client to use the appropriate transport based on the
// backend URL scheme.
var client *http.Client
switch serviceConfig.BackendUrl.Scheme {
case "https":
transport := &http.Transport{
TLSClientConfig: &tls.Config{},
}
client = &http.Client{Transport: transport}
default:
client = http.DefaultClient
}
// Send the relay request to the native service.
httpResponse, err := client.Do(httpRequest)
if err != nil {
// Do not expose connection errors with the backend service to the client.
return nil, ErrRelayerProxyInternalError.Wrap(err.Error())
}
defer httpResponse.Body.Close()
// Serialize the service response to be sent back to the client.
// This will include the status code, headers, and body.
_, responseBz, err := sdktypes.SerializeHTTPResponse(httpResponse)
if err != nil {
return nil, err
}
sync.logger.Debug().
Str("relay_request_session_header", relayRequest.Meta.SessionHeader.String()).
Msg("building relay response protobuf from service response")
// Build the relay response using the original service's response.
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it
// was verified to be valid and has to be the same as the relayResponse session header.
relayResponse, err := sync.newRelayResponse(responseBz, relayRequest.Meta.SessionHeader, relayRequest.Meta.SupplierAddress)
if err != nil {
// The client should not have knowledge about the RelayMiner's issues with
// building the relay response. Reply with an internal error so that the
// original error is not exposed to the client.
return nil, ErrRelayerProxyInternalError.Wrap(err.Error())
}
return &types.Relay{Req: relayRequest, Res: relayResponse}, nil
}
// sendRelayResponse marshals the relay response and sends it to the client.
func (sync *synchronousRPCServer) sendRelayResponse(
relayResponse *types.RelayResponse,
writer http.ResponseWriter,
) error {
relayResponseBz, err := relayResponse.Marshal()
if err != nil {
return err
}
_, err = writer.Write(relayResponseBz)
return err
}