Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalStateException when client closes connection #9

Open
junglie85 opened this issue Jun 24, 2017 · 2 comments
Open

IllegalStateException when client closes connection #9

junglie85 opened this issue Jun 24, 2017 · 2 comments

Comments

@junglie85
Copy link

Hopefully you're open to being asked questions in the issues? Really enjoying this library, but I'm trying to figure out how to gracefully deal with when the remote client disconnects, rather than the following exception:

java.lang.IllegalStateException: Response has already been written
	at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:588)
	at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:613)
	at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:296)
	at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:55)
	at io.vertx.ext.web.handler.sse.impl.SSEConnectionImpl.writeHeader(SSEConnectionImpl.java:143)
	at io.vertx.ext.web.handler.sse.impl.SSEConnectionImpl.withHeader(SSEConnectionImpl.java:131)
	at io.vertx.ext.web.handler.sse.impl.SSEConnectionImpl.id(SSEConnectionImpl.java:101)
	at uk.ashleybye.rxweb.verticles.ApiVerticle$testSse$1$1.accept(ApiVerticle.kt:71)
	at uk.ashleybye.rxweb.verticles.ApiVerticle$testSse$1$1.accept(ApiVerticle.kt:20)
	at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:60)
	at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:200)
	at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:252)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

I can't figure out from the tests how you're going about it. Perhaps I should use EventSource rather than just push data from the Observable directly? But that still seems like it would not affect the client connection.

Anyway, this is my rather noddy test example (excuse Kotlin vice Java, but it should be reasonably clear what's going on):

private fun testSse(): SSEHandler {
    val sse = SSEHandler.create()

    sse.connectHandler { connection ->
        val address = connection.request().connection().remoteAddress()

        Observable
                .interval(0, 1000, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(
                        { value ->
                            val dateTime = LocalDateTime.now()
                            connection.id("$value", "$address")
                            connection.data("${connection.lastId()} | " +
                                    "${dateTime.hour}:" +
                                    "${dateTime.minute}:" +
                                    "${dateTime.second}." +
                                    "${dateTime.nano}")
                        },
                        { error ->
                            error.printStackTrace()
                        },
                        {
                            sse.closeHandler { sseConnection ->
                                sseConnection?.close()
                            }
                        }
                )
    }

    return sse
}

As an aside (probably another question really), why does connection.id(...) send the details to the client? And why does lastId() return null?

http localhost:8080/sse Accept:text/event-stream --stream
HTTP/1.1 200 OK
Cache-Control: no-cache
Connection: keep-alive
Content-Type: text/event-stream
Transfer-Encoding: chunked

id: 0
data: 0:0:0:0:0:0:0:1:60576

data: null | 7:31:37.835000000

id: 1
data: 0:0:0:0:0:0:0:1:60576

data: null | 7:31:38.826000000
@aesteve
Copy link
Owner

aesteve commented Jun 30, 2017

Hopefully you're open to being asked questions in the issues?

Absolutely, sorry for being so late to answer.

The main idea for me too look at the issue would be to try to reproduce it without the Observable.

What I'd like to try so far would be a simple :

sse.closeHandler { sseConnection ->
    sseConnection?.close()
}

And I think it could reproduce the issue.

Simply because in these few lines of codes, it tries to close a connection that has already been closed by the client, hence the IllegalStateException.

But I'd need to reproduce the issue, in Java, without any fancy Observable involved. If you have time to do so, that'd be awesome, else I'll try to check by myself when I have the time.

For the lastId stuff, same thing, I'd need to reproduce the issue in an unit test to give it a proper look.

@ctranxuan
Copy link

lastId is null because you have to pass it as header:

curl -v "http://localhost:8080/sse" -H "Last-Event-ID: 2"

it's the last id the client has received. If the client wants to reconnect to get the missing events, it has to reconnect with it. Otherwise, the client will get the current event and in this case, the lastEventId is null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants