Skip to content

Commit

Permalink
v0.3.4, fixed a problem that cancel request during relay http, pipe t…
Browse files Browse the repository at this point in the history
…ee is not work if content-encoding is gzip
  • Loading branch information
superisaac committed Jan 4, 2024
1 parent 527b4e6 commit a81ea8d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 44 deletions.
37 changes: 18 additions & 19 deletions chains/eosapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,24 @@ func (self *EOSAPI) DelegateREST(rootCtx context.Context, m *nodemuxcore.Multipl
chain.Log().Infof("retrieved block number %d from get_block request", requiredHeight)
}
r.Body = io.NopCloser(bytes.NewBuffer(body))
_, _, err0 := m.DefaultPipeTeeREST(rootCtx, chain, path, w, r, requiredHeight)
return err0
} else if path == "/v1/chain/get_info" {
requiredHeight = 0
ep, body, err := m.DefaultPipeTeeREST(rootCtx, chain, path, w, r, requiredHeight)
if err != nil || ep == nil || body == nil {
return err
}

var chainInfo eosapiChainInfo
if err := json.Unmarshal(body, &chainInfo); err != nil {
chain.Log().Warnf("json unmarshal body %#v", err)
}
block := &nodemuxcore.Block{
Height: chainInfo.LastBlockNum,
Hash: chainInfo.LastBlockId,
}
m.UpdateBlockIfChanged(ep, block)
return nil
}
// } else if path == "/v1/chain/get_infoxx" {
// requiredHeight = 0
// ep, body, err := m.DefaultPipeTeeREST(rootCtx, chain, path, w, r, requiredHeight)
// if err != nil || ep == nil || body == nil {
// return err
// }

// var chainInfo eosapiChainInfo
// if err := json.Unmarshal(body, &chainInfo); err != nil {
// chain.Log().Warnf("json unmarshal body %#v", err)
// }
// block := &nodemuxcore.Block{
// Height: chainInfo.LastBlockNum,
// Hash: chainInfo.LastBlockId,
// }
// m.UpdateBlockIfChanged(ep, block)
// return nil
// }
return m.DefaultPipeREST(rootCtx, chain, path, w, r, requiredHeight)
}
25 changes: 19 additions & 6 deletions core/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,26 @@ func (self *Endpoint) PipeRequest(rootCtx context.Context, path string, w http.R
}

// pipe the response

w.WriteHeader(resp.StatusCode)
//w.Header().Add("content-type", resp.Header.Get("content-type"))

for hn, hvs := range resp.Header {
for _, hv := range hvs {
w.Header().Add(hn, hv)
}
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
if written, err := io.Copy(w, resp.Body); err != nil {
self.Log().WithFields(log.Fields{
"written": written,
"path": path,
}).Warnf("io copy error %#v", err)
return err
}
return nil
}

func (self *Endpoint) PipeTeeRequest(rootCtx context.Context, path string, w http.ResponseWriter, r *http.Request) ([]byte, error) {
func (self *Endpoint) xxPipeTeeRequest(rootCtx context.Context, path string, w http.ResponseWriter, r *http.Request) ([]byte, error) {
resp, err := self.RespondRequest(rootCtx, path, r)
if err != nil {
if os.IsTimeout(err) {
Expand All @@ -132,6 +141,7 @@ func (self *Endpoint) PipeTeeRequest(rootCtx context.Context, path string, w htt

// pipe the response
w.WriteHeader(resp.StatusCode)

if resp.StatusCode == http.StatusOK {
for hn, hvs := range resp.Header {
if strings.ToLower(hn) == "content-type" {
Expand Down Expand Up @@ -160,8 +170,11 @@ func (self *Endpoint) PipeTeeRequest(rootCtx context.Context, path string, w htt
func (self *Endpoint) RespondRequest(rootCtx context.Context, path string, r *http.Request) (*http.Response, error) {
self.Connect()
self.incrRelayCount()
ctx, cancel := context.WithCancel(rootCtx)
defer cancel()
//ctx, _ := context.WithCancel(rootCtx)
//defer cancel()

// ctx, cancel := context.WithTimeout(rootCtx, time.Second * 600)
// defer cancel()

// prepare request
// TODO: join the server url and method
Expand All @@ -170,7 +183,7 @@ func (self *Endpoint) RespondRequest(rootCtx context.Context, path string, r *ht
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, r.Method, url, io.NopCloser(bytes.NewBuffer(body)))
req, err := http.NewRequestWithContext(rootCtx, r.Method, url, io.NopCloser(bytes.NewBuffer(body)))
if err != nil {
return nil, errors.Wrap(err, "http.NewRequestWithContext")
}
Expand Down
38 changes: 19 additions & 19 deletions core/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,26 +200,26 @@ func (self *Multiplexer) DefaultRelayRPC(

// Pipe the request to response and tee the choosed endpoint and the response body
// for possible caching and handlers
func (self *Multiplexer) DefaultPipeTeeREST(
rootCtx context.Context,
chain ChainRef,
path string,
w http.ResponseWriter,
r *http.Request, overHeight int) (*Endpoint, []byte, error) {
// func (self *Multiplexer) DefaultPipeTeeREST(
// rootCtx context.Context,
// chain ChainRef,
// path string,
// w http.ResponseWriter,
// r *http.Request, overHeight int) (*Endpoint, []byte, error) {

ep, found := self.SelectOverHeight(chain, path, overHeight)
if !found {
if overHeight > 0 {
// if not find then relay to any healthy endpoint
return self.DefaultPipeTeeREST(rootCtx, chain, path, w, r, -2)
}
w.WriteHeader(404)
w.Write([]byte("not found"))
return ep, nil, nil
}
body, err := ep.PipeTeeRequest(rootCtx, path, w, r)
return ep, body, err
}
// ep, found := self.SelectOverHeight(chain, path, overHeight)
// if !found {
// if overHeight > 0 {
// // if not find then relay to any healthy endpoint
// return self.DefaultPipeTeeREST(rootCtx, chain, path, w, r, -2)
// }
// w.WriteHeader(404)
// w.Write([]byte("not found"))
// return ep, nil, nil
// }
// body, err := ep.xxPipeTeeRequest(rootCtx, path, w, r)
// return ep, body, err
// }

// Pipe the request to response
func (self *Multiplexer) DefaultPipeREST(rootCtx context.Context, chain ChainRef, path string, w http.ResponseWriter, r *http.Request, overHeight int) error {
Expand Down

0 comments on commit a81ea8d

Please sign in to comment.