diff --git a/chains/eosapi.go b/chains/eosapi.go index ba44a4c..aaa2a1d 100644 --- a/chains/eosapi.go +++ b/chains/eosapi.go @@ -62,25 +62,24 @@ func (self *EOSAPI) DelegateREST(rootCtx context.Context, m *nodemuxcore.Multipl chain.Log().Infof("retrieved block number %d from get_block request", requiredHeight) } r.Body = io.NopCloser(bytes.NewBuffer(body)) - _, _, err0 := m.DefaultPipeTeeREST(rootCtx, chain, path, w, r, requiredHeight) - return err0 - } else if path == "/v1/chain/get_info" { - requiredHeight = 0 - ep, body, err := m.DefaultPipeTeeREST(rootCtx, chain, path, w, r, requiredHeight) - if err != nil || ep == nil || body == nil { - return err - } - - var chainInfo eosapiChainInfo - if err := json.Unmarshal(body, &chainInfo); err != nil { - chain.Log().Warnf("json unmarshal body %#v", err) - } - block := &nodemuxcore.Block{ - Height: chainInfo.LastBlockNum, - Hash: chainInfo.LastBlockId, - } - m.UpdateBlockIfChanged(ep, block) - return nil } + // } else if path == "/v1/chain/get_infoxx" { + // requiredHeight = 0 + // ep, body, err := m.DefaultPipeTeeREST(rootCtx, chain, path, w, r, requiredHeight) + // if err != nil || ep == nil || body == nil { + // return err + // } + + // var chainInfo eosapiChainInfo + // if err := json.Unmarshal(body, &chainInfo); err != nil { + // chain.Log().Warnf("json unmarshal body %#v", err) + // } + // block := &nodemuxcore.Block{ + // Height: chainInfo.LastBlockNum, + // Hash: chainInfo.LastBlockId, + // } + // m.UpdateBlockIfChanged(ep, block) + // return nil + // } return m.DefaultPipeREST(rootCtx, chain, path, w, r, requiredHeight) } diff --git a/core/endpoint.go b/core/endpoint.go index fe70980..4bace3a 100644 --- a/core/endpoint.go +++ b/core/endpoint.go @@ -109,17 +109,26 @@ func (self *Endpoint) PipeRequest(rootCtx context.Context, path string, w http.R } // pipe the response + + w.WriteHeader(resp.StatusCode) + //w.Header().Add("content-type", resp.Header.Get("content-type")) + for hn, hvs := range resp.Header { for _, hv := range hvs { w.Header().Add(hn, hv) } } - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) + if written, err := io.Copy(w, resp.Body); err != nil { + self.Log().WithFields(log.Fields{ + "written": written, + "path": path, + }).Warnf("io copy error %#v", err) + return err + } return nil } -func (self *Endpoint) PipeTeeRequest(rootCtx context.Context, path string, w http.ResponseWriter, r *http.Request) ([]byte, error) { +func (self *Endpoint) xxPipeTeeRequest(rootCtx context.Context, path string, w http.ResponseWriter, r *http.Request) ([]byte, error) { resp, err := self.RespondRequest(rootCtx, path, r) if err != nil { if os.IsTimeout(err) { @@ -132,6 +141,7 @@ func (self *Endpoint) PipeTeeRequest(rootCtx context.Context, path string, w htt // pipe the response w.WriteHeader(resp.StatusCode) + if resp.StatusCode == http.StatusOK { for hn, hvs := range resp.Header { if strings.ToLower(hn) == "content-type" { @@ -160,8 +170,11 @@ func (self *Endpoint) PipeTeeRequest(rootCtx context.Context, path string, w htt func (self *Endpoint) RespondRequest(rootCtx context.Context, path string, r *http.Request) (*http.Response, error) { self.Connect() self.incrRelayCount() - ctx, cancel := context.WithCancel(rootCtx) - defer cancel() + //ctx, _ := context.WithCancel(rootCtx) + //defer cancel() + + // ctx, cancel := context.WithTimeout(rootCtx, time.Second * 600) + // defer cancel() // prepare request // TODO: join the server url and method @@ -170,7 +183,7 @@ func (self *Endpoint) RespondRequest(rootCtx context.Context, path string, r *ht if err != nil { return nil, err } - req, err := http.NewRequestWithContext(ctx, r.Method, url, io.NopCloser(bytes.NewBuffer(body))) + req, err := http.NewRequestWithContext(rootCtx, r.Method, url, io.NopCloser(bytes.NewBuffer(body))) if err != nil { return nil, errors.Wrap(err, "http.NewRequestWithContext") } diff --git a/core/multiplexer.go b/core/multiplexer.go index f5f9554..a5bc1c7 100644 --- a/core/multiplexer.go +++ b/core/multiplexer.go @@ -200,26 +200,26 @@ func (self *Multiplexer) DefaultRelayRPC( // Pipe the request to response and tee the choosed endpoint and the response body // for possible caching and handlers -func (self *Multiplexer) DefaultPipeTeeREST( - rootCtx context.Context, - chain ChainRef, - path string, - w http.ResponseWriter, - r *http.Request, overHeight int) (*Endpoint, []byte, error) { +// func (self *Multiplexer) DefaultPipeTeeREST( +// rootCtx context.Context, +// chain ChainRef, +// path string, +// w http.ResponseWriter, +// r *http.Request, overHeight int) (*Endpoint, []byte, error) { - ep, found := self.SelectOverHeight(chain, path, overHeight) - if !found { - if overHeight > 0 { - // if not find then relay to any healthy endpoint - return self.DefaultPipeTeeREST(rootCtx, chain, path, w, r, -2) - } - w.WriteHeader(404) - w.Write([]byte("not found")) - return ep, nil, nil - } - body, err := ep.PipeTeeRequest(rootCtx, path, w, r) - return ep, body, err -} +// ep, found := self.SelectOverHeight(chain, path, overHeight) +// if !found { +// if overHeight > 0 { +// // if not find then relay to any healthy endpoint +// return self.DefaultPipeTeeREST(rootCtx, chain, path, w, r, -2) +// } +// w.WriteHeader(404) +// w.Write([]byte("not found")) +// return ep, nil, nil +// } +// body, err := ep.xxPipeTeeRequest(rootCtx, path, w, r) +// return ep, body, err +// } // Pipe the request to response func (self *Multiplexer) DefaultPipeREST(rootCtx context.Context, chain ChainRef, path string, w http.ResponseWriter, r *http.Request, overHeight int) error {