Skip to content

Commit

Permalink
WIP(x/net/http/client): Extract outwards libuv.Loop and timeout logic
Browse files Browse the repository at this point in the history
  • Loading branch information
spongehah committed Aug 30, 2024
1 parent cebff18 commit c4d7315
Show file tree
Hide file tree
Showing 9 changed files with 639 additions and 445 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/goplus/llgoexamples
go 1.20

require (
github.com/goplus/llgo v0.9.7-0.20240816085229-53d2d080f4c4
github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b
golang.org/x/net v0.28.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/goplus/llgo v0.9.7-0.20240816085229-53d2d080f4c4 h1:fqqbWhWaoseSplLJF8OTkNGl4Kruqm1wQWT/Yooq6E4=
github.com/goplus/llgo v0.9.7-0.20240816085229-53d2d080f4c4/go.mod h1:5Fs+08NslqofJ7xtOiIXugkurYOoQvY02ZkFNWA1uEI=
github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b h1:iC0vVA8F2DNJ9wVyHI9fP9U0nM+si3LSQJ1TtGftXyo=
github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b/go.mod h1:5Fs+08NslqofJ7xtOiIXugkurYOoQvY02ZkFNWA1uEI=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
Expand Down
4 changes: 2 additions & 2 deletions x/net/http/_demo/timeout/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

func main() {
client := &http.Client{
Timeout: time.Millisecond, // Set a small timeout to ensure it will time out
//Timeout: time.Second * 5,
//Timeout: time.Millisecond, // Set a small timeout to ensure it will time out
Timeout: time.Second * 5,
}
req, err := http.NewRequest("GET", "https://www.baidu.com", nil)
if err != nil {
Expand Down
50 changes: 42 additions & 8 deletions x/net/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log"
"net/url"
"reflect"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -158,6 +159,9 @@ func (c *Client) do(req *Request) (retres *Response, reterr error) {
Host: host,
Cancel: ireq.Cancel,
ctx: ireq.ctx,

timer: ireq.timer,
timeoutch: ireq.timeoutch,
}
if includeBody && ireq.GetBody != nil {
req.Body, err = ireq.GetBody()
Expand Down Expand Up @@ -305,11 +309,14 @@ func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, d
}

// TODO(spongehah) timeout(send)
req.timeoutch = make(chan struct{}, 1)
req.deadline = deadline
if deadline.IsZero() {
didTimeout = alwaysFalse
} else {
didTimeout = func() bool { return req.timer.GetDueIn() == 0 }
}
//stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

sub := deadline.Sub(time.Now())
req.timeout = sub
resp, err = rt.RoundTrip(req)
if err != nil {
//stopTimer()
Expand Down Expand Up @@ -469,6 +476,34 @@ func (b *cancelTimerBody) Close() error {
return err
}

// knownRoundTripperImpl reports whether rt is a RoundTripper that's
// maintained by the Go team and known to implement the latest
// optional semantics (notably contexts). The Request is used
// to check whether this particular request is using an alternate protocol,
// in which case we need to check the RoundTripper for that protocol.
func knownRoundTripperImpl(rt RoundTripper, req *Request) bool {
switch t := rt.(type) {
case *Transport:
if altRT := t.alternateRoundTripper(req); altRT != nil {
return knownRoundTripperImpl(altRT, req)
}
return true
// TODO(spongehah)
//case *http2Transport, http2noDialH2RoundTripper:
// return true
}
// There's a very minor chance of a false positive with this.
// Instead of detecting our golang.org/x/net/http2.Transport,
// it might detect a Transport type in a different http2
// package. But I know of none, and the only problem would be
// some temporarily leaked goroutines if the transport didn't
// support contexts. So this is a good enough heuristic:
if reflect.TypeOf(rt).String() == "*http2.Transport" {
return true
}
return false
}

// setRequestCancel sets req.Cancel and adds a deadline context to req
// if deadline is non-zero. The RoundTripper's type is used to
// determine whether the legacy CancelRequest behavior should be used.
Expand All @@ -482,11 +517,10 @@ func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTi
if deadline.IsZero() {
return nop, alwaysFalse
}
//knownTransport := knownRoundTripperImpl(rt, req)
knownTransport := knownRoundTripperImpl(rt, req)
oldCtx := req.Context()

//if req.Cancel == nil && knownTransport {
if req.Cancel == nil {
if req.Cancel == nil && knownTransport {
// If they already had a Request.Context that's
// expiring sooner, do nothing:
if !timeBeforeContextDeadline(deadline, oldCtx) {
Expand All @@ -504,7 +538,7 @@ func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTi
req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline)
}

cancel := make(chan struct{}, 1)
cancel := make(chan struct{})
req.Cancel = cancel

doCancel := func() {
Expand All @@ -518,7 +552,7 @@ func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTi
}
}

stopTimerCh := make(chan struct{}, 1)
stopTimerCh := make(chan struct{})
var once sync.Once
stopTimer = func() {
once.Do(func() {
Expand Down
17 changes: 11 additions & 6 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/goplus/llgo/c/libuv"
"golang.org/x/net/idna"

"github.com/goplus/llgo/c"
Expand All @@ -37,12 +38,14 @@ type Request struct {
RemoteAddr string
RequestURI string
//TLS *tls.ConnectionState
Cancel <-chan struct{}
timeoutch chan struct{} //optional
Cancel <-chan struct{}

Response *Response
timeout time.Duration
ctx context.Context

deadline time.Time
timeoutch chan struct{} //tmp timeout
timer *libuv.Timer
}

const defaultChunkSize = 8192
Expand Down Expand Up @@ -117,6 +120,7 @@ func NewRequestWithContext(ctx context.Context, method, urlStr string, body io.R
Header: make(Header),
Body: rc,
Host: u.Host,
timer: nil,
}
if body != nil {
switch v := body.(type) {
Expand Down Expand Up @@ -258,7 +262,7 @@ var reqWriteExcludeHeader = map[string]bool{
// extraHeaders may be nil
// waitForContinue may be nil
// always closes body
func (r *Request) write(usingProxy bool, extraHeader Header, client *hyper.ClientConn, exec *hyper.Executor) (err error) {
func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hyper.Executor) (err error) {
//trace := httptrace.ContextClientTrace(r.Context())
//if trace != nil && trace.WroteRequest != nil {
// defer func() {
Expand All @@ -269,13 +273,14 @@ func (r *Request) write(usingProxy bool, extraHeader Header, client *hyper.Clien
//}

// Prepare the hyper.Request
hyperReq, err := r.newHyperRequest(usingProxy, extraHeader)
hyperReq, err := r.newHyperRequest(taskData.pc.isProxy, taskData.req.extra)
if err != nil {
return err
}
// Send it!
sendTask := client.Send(hyperReq)
setTaskId(sendTask, read)
taskData.taskId = read
sendTask.SetUserdata(c.Pointer(taskData))
sendRes := exec.Push(sendTask)
if sendRes != hyper.OK {
err = errors.New("failed to send the request")
Expand Down
1 change: 1 addition & 0 deletions x/net/http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func appendToResponseBody(userdata c.Pointer, chunk *hyper.Buf) c.Int {
_, err := writer.Write(bytes)
if err != nil {
fmt.Println("Error writing to response body:", err)
writer.Close()
return hyper.IterBreak
}
return hyper.IterContinue
Expand Down
15 changes: 4 additions & 11 deletions x/net/http/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,11 +670,9 @@ func (r *Request) writeBody(hyperReq *hyper.Request) error {
var body = r.unwrapBody()
hyperReqBody := hyper.NewBody()
buf := make([]byte, defaultChunkSize)
//hyperBuf := hyper.CopyBuf(&buf[0], uintptr(defaultChunkSize))
reqData := &bodyReq{
body: body,
buf: buf,
//hyperBuf: hyperBuf,
body: body,
buf: buf,
closeBody: r.closeBody,
}
hyperReqBody.SetUserdata(c.Pointer(reqData))
Expand All @@ -685,18 +683,14 @@ func (r *Request) writeBody(hyperReq *hyper.Request) error {
}

type bodyReq struct {
body io.Reader
buf []byte
//hyperBuf *hyper.Buf
body io.Reader
buf []byte
closeBody func() error
}

func setPostData(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) c.Int {
req := (*bodyReq)(userdata)
n, err := req.body.Read(req.buf)
//buf := req.hyperBuf.Bytes()
//bufLen := req.hyperBuf.Len()
//n, err := req.body.Read(unsafe.Slice(buf, bufLen))
if err != nil {
if err == io.EOF {
*chunk = nil
Expand All @@ -708,7 +702,6 @@ func setPostData(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) c.In
}
if n > 0 {
*chunk = hyper.CopyBuf(&req.buf[0], uintptr(n))
//*chunk = req.hyperBuf
return hyper.PollReady
}
if n == 0 {
Expand Down
Loading

0 comments on commit c4d7315

Please sign in to comment.