Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support evented read #481

Open
ghost opened this issue Feb 15, 2019 · 15 comments
Open

Support evented read #481

ghost opened this issue Feb 15, 2019 · 15 comments

Comments

@ghost
Copy link

ghost commented Feb 15, 2019

This feature will avoid the need to dedicate a goroutine to read on the connection and enable read buffer pooling.

See eranyanay/1m-go-websockets for memory reduction possible with this feature.

See Go #15735 for discussion about network connection read events.

@eranyanay
Copy link

@garyburd @elithrar @stevenscott89
I'd like to address this and send a PR.

The problem here is mainly the implementation approach, which doesn't expose a way to read a single frame

If we want to support the usage of this library together with asynchronous IO mechanisms (select/epoll), we should introduce one of the following two changes that comes to mind:

  1. Introduce a new function func (c *Conn) NextFrame() (messageType int, r io.Reader, err error)
    which will behave very similar to NextReader, but the inner call to advanceFrame will read the next frame, do most of the logic until the 7th step, where it processes the control messages on its own. If those are ping/pong control messages, the NextFrame flow will return at this point with the entire message. The regular flow for NextReader is kept with no changes
  2. Keep the Conn struct exposed functions unchanged. In this solution we can change the Upgrader struct itself, and introduce a new member, AllowReadControlMessages bool (will come up with a decent name), which is then handed over to the newConn call, to let it know the operation method. This variable is then used during the NextReader and advanceFrame to decide whether to handle ping/pong messages internally, or expose them to the caller.

What's your thoughts on this?
I'd be happy to contribute this feature and help maintaining it over time.

@ghost
Copy link
Author

ghost commented Mar 25, 2019

The design of the standard library evented read API may determine API design choices in this package. Any change to the exported API to this package should wait on the standard library design.

If the standard library provides an edge trigger, then all that's needed is a new error value and a way to opt in to evented read:

// run is called on each edge trigger on c.ws.UnderlyingConn()
func (c *AppConn) run() {
    for {
        mt, r, err := c.ws.NextReader()
        if err == websocket.WaitIO {
             return
        } else if err != nil {
             c.cleanup(err)
        }
        // process mt, r
    }
}

Given that the switch from synchronous read to evented read necessitates changes to an application, the addition of the evented read is an opportunity holistically design an API that addresses other issues with this package. These issues are detection of dead connections and clean close of the connection. There are open issues on both of these topics. Although these features only require a few lines of application code, those few lines of application code are difficult to figure out. This is just spitballing here, but the holistic design may be to shift control over read from the application to this package by delivering data messages and connection closed notifications to the application as callbacks. Note that ping, pong and close messages are already delivered as callbacks. With this additional control, the package can implement the features for the application with little or no exposed API surface area to the application.

Even if the holistic design change in the previous paragraph is not considered, the detection of dead connections must be considered as part of the evented read API. The examples in this package use read with deadline to detect dead connections. What replaces that in the evented world? The replacement may not need any API changes, but we should know what that replacement is before committing to an API.

@eranyanay
Copy link

The standard library evented read API is not exposed at the moment and according to the issue there, doesnt seems like anything is going to change in that matter in the near future.

I dont think the approach of this library to perform epoll mechanism on each and every socket separately is a good idea, since it means the user will either:

  1. Keep using 1goroutine per socket
  2. Have a dedicated goroutine that calls ReadMessage() on all sockets, one at the time, performing epoll systemcalls as the number of open websockets

I think you refer to option 2, and it sort of contradicts the usage of polling mechanisms, where with a single call you can ask the condition of all the open sockets.

The whole point of asynchronous read APIs is that you perform them on multiple connections at once

Also note that performing it internally, how would you configure timeouts?
The beauty when using epoll/select is that you can dedicate one goroutine that calls the systemcall with infinite timeout, and in your suggested approach to hide it in the package we would still need some sort of timeout mechanism. and on connections with very few data, the suggested psuedocode will perform busy wait

I dont think that this logic should be part of the package logic. Its application logic.

Another thing to note, is that its a solution that is only applicable to Linux, and this library is at the moment functional for other OS's too. Leaving it for the user to decide is probably the better approach

Regarding detecting dead connections, I couldnt find any other open issues that directly says that, can you explain? With the Close handler its pretty much working as far as I can tell, and also calling ReadMessage on a closed socket returns an error that can be checked. So Im not sure what else is there to do in that aspect

Regarding detection and actual usage from outside, I think simply exposing the connection file descriptor is enough (do some reflection on Go Conn interface and add a function ws.Fd()), and then use it in application space together with epoll logic.

@ghost
Copy link
Author

ghost commented Mar 26, 2019

This package currently uses the Go standard library for reading the network. The standard library does not perform a separate epoll on each and every socket.

My suggested run method is called in response to an edge trigger and reads messages until there is no data available. The design assumes that the standard library will provide a single poller and that there's a way to invoke a function in response to an IO trigger. The design also assumes that the standard library will provide a way to toggle a connection between blocking and non-blocking read. These design assumptions are based on discussion in the issue linked above.

I am not following the discussion on Linux vs. other platforms. I am assuming that the standard library will provide a cross platform evented read API.

The CloseHandler is called in response to a received close message. That does not help with detecting dead connections. ReadMessage in the evented world is called when data is received. What happens when no data is received?

The most recent open issue (492) is about detecting dead connections.

@garyburd
Copy link
Contributor

I do not have time to review this large feature, nor do I want the additional responsibility of maintaining the project with the feature. The feature will need to wait for a new maintainer.

@nhooyr
Copy link

nhooyr commented Sep 30, 2019

I don't think it's a good idea to add this feature to this library. It's a very unidiomatic way of writing Go and anyone who wants it can always use gobwas/ws instead.

@ghost
Copy link
Author

ghost commented Sep 30, 2019

If Go adds the evented read API required for this feature, will evented read still be unidiomatic?

@iluozhiqiang
Copy link

@garyburd @elithrar @stevenscott89
I'd like to address this and send a PR.

The problem here is mainly the implementation approach, which doesn't expose a way to read a single frame

If we want to support the usage of this library together with asynchronous IO mechanisms (select/epoll), we should introduce one of the following two changes that comes to mind:

  1. Introduce a new function func (c *Conn) NextFrame() (messageType int, r io.Reader, err error)
    which will behave very similar to NextReader, but the inner call to advanceFrame will read the next frame, do most of the logic until the 7th step, where it processes the control messages on its own. If those are ping/pong control messages, the NextFrame flow will return at this point with the entire message. The regular flow for NextReader is kept with no changes
  2. Keep the Conn struct exposed functions unchanged. In this solution we can change the Upgrader struct itself, and introduce a new member, AllowReadControlMessages bool (will come up with a decent name), which is then handed over to the newConn call, to let it know the operation method. This variable is then used during the NextReader and advanceFrame to decide whether to handle ping/pong messages internally, or expose them to the caller.

What's your thoughts on this?
I'd be happy to contribute this feature and help maintaining it over time.

public NextFrame is my need

@lesismal
Copy link

lesismal commented Jun 17, 2021

First of all, thanks for gorilla's great job, I use it in my projects, I like it and learn a lot from it!

All frameworks based on std net are not suitable for supporting non-blocking read and write at the application layer. Because tcp cannot guarantee that any byte can arrive in time, when we try to read a complete websocket frame from a connection, it may block for a long time, especially those maliciously attacking our connection. A solution similar to gobwas/ws+easygo/netpoll has the risk of service unavailability, as I commented here:
gobwas/ws#121 (comment)
and here:
gobwas/ws-examples#18

This problem can only be solved by replacing network read and write with pure asynchronous IO. And I have implemented a non-blocking lib to avoid using one or more goroutines for each connection and reduce memory usage.
It supports tls/http1.x/websocket, and is basically compatible with net/http. Many net/http-based web frameworks can be easily converted to use the non-blocking network layer.
Here is websocket example, welcome if anybody like to try:
https://github.com/lesismal/nbio/blob/master/examples/websocket/server/server.go

@lesismal
Copy link

lesismal commented Jun 17, 2021

@lesismal
Copy link

lesismal commented Jun 17, 2021

What we should support is the asynchronous parsing of tcp-sticky-packet and async-io, both of which are indispensable.

@ghost
Copy link

ghost commented Jun 18, 2021

Because tcp cannot guarantee that any byte can arrive in time, when we try to read a complete websocket frame from a connection, it may block for a long time,

Although there's no bound on how long it takes to deliver a TCP segment, the delivery time is not an issue in practice. A more concerning problem is a peer that stalls after writing part of a message or a peer that slowly drips out a message.

The package (or application) can use a read deadline to avoid blocking for a long time. The read deadline can be adjusted when reading a message to ensure timely delivery of the message from start to end.

And I have implemented a non-blocking lib to avoid using one or more goroutines for each connection and reduce memory usage.

There is an open issue for non-blocking read in the standard library with recent activity from a member of the Go team.

A feature from the standard library is generally preferred over a feature from a third-party package. While the current issue is in the "waiting on new maintainer" state, it seems best to wait on the standard library issue.

Edit: The waiting on new maintainer label was removed last year, but does seem like something that should wait on the maintainer. Any work in this area is a big change.

@lesismal
Copy link

Although there's no bound on how long it takes to deliver a TCP segment, the delivery time is not an issue in practice. A more concerning problem is a peer that stalls after writing part of a message or a peer that slowly drips out a message.

That's right.

The package (or application) can use a read deadline to avoid blocking for a long time. The read deadline can be adjusted when reading a message to ensure timely delivery of the message from start to end.

But this is incorrect. Our main goal is to reduce the number of goroutines, so the goroutine pool is usually used for reading at the event. Setting the deadline still requires the reading action to be blocked. Then the goroutine that executes the read may block until the deadline. If there are too many slow connections, the goroutine pool will be used out. If the number of goroutines we use is greater than or equal to the number of slow connections, the goal of reducing the number of goroutines is not achieved.

We need to support a non-blocking parsing first, my repo do this:
https://github.com/lesismal/nbio

@lesismal
Copy link

For example:

size := 1024
pool := NewWorkerPool(size)

server.OnData(func(c net.Conn){
	// Step 1: get a goroutine from pool to read and parse a complete frame
	pool.Exec(func(){
		c.SetReadDeadline(timeout)

		wsConn := getWsConn(c)

		// Step 2: blocking for a deadline time on a slow connection
		wsConn.NextFrame()
	})
})

If there are 100k connections in which there are more than 1024(pool size) slow connections, these slow connections make all the goroutines of pool blocking, most of the other healthy connections would wait for long.
Even if the num of slow connections is less than 1024(pool size), or even it's only blocked for less than a deadline time at [Step 2], they still cause the goroutine pool unable to handle more message/connections in time, then we get lower performance.
If we set the pool size greater, we can not reduce the num of goroutines.

@lesismal
Copy link

For example in my implementation:

size := 1024
pool := NewWorkerPool(size)

onWsMessage := (c *websocket.Conn, messageType int, data) {
	// Step 6: do sometihng, usually use another goroutine pool to handle message
}

// Step 1: net poller pass the data to application layer
server.OnData(func(c net.Conn, data []byte){
	// Step 2: get a goroutine from pool to read and parse a complete frame
	pool.Exec(func(){
		c.SetReadDeadline(timeout)

		wsConn := getWsConn(c)

		// Step 3: non-blocking parsing
		messageType, payload, ok, err := wsConn.NextMessage(data)
		if err {
			wsConn.Close()

			// Step 4: release goroutine when return
			return
		}

		if ok {
			// Step 5: we get a complete message
			onWsMessage(wsConn, messageType, payload)
		}

		// Step 6: release goroutine when return
	})
})

Step 1-4 are non-blocking and make sure the framework layer effective.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 🏗 In progress
Development

Successfully merging a pull request may close this issue.

7 participants