Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove RxJava - Part 4 #560

Conversation

OwenLindsell
Copy link
Contributor

@OwenLindsell OwenLindsell commented Dec 10, 2019

Request body now follows reactive streams back pressure protocol.

Copy link
Contributor

@mikkokar mikkokar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @OwenLindsell This code is broken. But not because of your changes (apart from my another comment, see below).

This class fails to correctly follow the reactive back pressure protocol. It merely triggers a channel read every on each back pressure request. But event count tracking is completely missing.

As a background info: it was designed that way: to allow only one request at the time. But I think the original implementation was broken. But we never had a need to fix this.

I suggest now is the time to roll in the FlowControllingHttpContentProducer from styx client to rectify this issue.

@OwenLindsell OwenLindsell requested a review from mikkokar January 13, 2020 14:53
/**
* A publisher to wrap the FlowControllingHttpContentProducer FSM and perform subscription operations via a Netty EventLoop.
*/
public final class ContentPublisher implements Publisher<Buffer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should rename this to ByteStreamPublisher? Because it is the ByteStream publisher for which this class produces content. That might make it easier for other devs make the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That couples it deeply to the ByteStream class, which it knows nothing about. It only knows about the FlowControllingHttpContentProducer and even that doesn't know about ByteStream. However, I get your point, the name is not very meaningful, so I have renamed it to FlowControllingPublisher. Is that more helpful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. It was just a suggestion. I'm not too opinionated about this. But between FlowControllingPublisher and ContentPublisher, I would prefer the original because it is shorter and simpler :-)

}
} catch (BadRequestException ex) {
throw ex;
} catch (Exception ex) {
throw new BadRequestException(ex.getMessage() + " in " + httpObject, ex);
throw new BadRequestException(ex.getMessage() + " in " + msg, ex);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider what happens when the try block throws. For example due to headers decoding failure. This code would throw, and the resulting LiveHttpRequest is never propagated up to the HTTP pipeline handler. But all the subsequent content chunks would still get queued in the flow controller.

This would end up leaking (direct) memory.

The easiest way to handle this is to pass the exception to the FSM:

} catch (NNNException ex) {
    getContentProducer().channelException(ex);
    throw ex;
}

This will move the FSM to a TERMINATED state, where all subsequent content chunks are automatically down referenced.

We need to have a test case for this scenario, if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that. I think the exception will be handled by Netty, which will pass it onto the exceptionCaught method, which calls the FSM and then propagates it to the HttpPipelineHandler. Or have I not understood it correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi yes. The exception will propagate. And ideally the pipeline handler responds by closing the connection. It is still better to be defensive, and move the FSM to TERMINATED. That guarantees that any extra pending content chunk events will be correctly dealt with. Also the FSM state would be consistent with the error, in case someone ever needs to debug it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptionCaught method makes the same call to the FSM, so it will move it to terminated.

ctx.channel().eventLoop().schedule(
() -> producer.ifPresent(FlowControllingHttpContentProducer::tearDownResources),
TEARDOWN_DELAY,
MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now looking back retrospectively, it seems bit funny that we tear down resources by generating a ResponseTimeoutException with "channelClosed" reason phrase. Let's think about improving this in the near future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the same token, I now realise that the scheduleTeardown trigger should be different for uplink traffic.

The FSM was developed for downlink traffic. In downlink direction we cannot have two back-to-back responses without subscriber fully consuming the response body. Therefore in downlink we can use channel idleness to drive the scheduleTeardown event.

  • The FSM must go to the COMPLETED state before the channel can be reused for the next request. Because origin cannot respond anything before we send the another request, the cannel (TCP connection) would remain idle if the content subscriber never consumes the content.

But for uplink, I believe, this assumption is not valid. It is possible for an origin to respond without consuming the request body at all. This response would allows a remote a sender to send a new request even if Styx never consumed the original request body. Therefore we cannot rely on channel idleness to clean up resources.

We need to think of something else that is guaranteed to trigger a clean up.

…th-reactor-part4

# Conflicts:
#	components/proxy/src/main/java/com/hotels/styx/ProxyConnectorFactory.java
#	components/server/src/main/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoder.java
@@ -70,6 +83,9 @@

private volatile Subscriber<? super ByteBuf> contentSubscriber;

private final HashedWheelTimer timer = new HashedWheelTimer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From HashedWheelTimer:

Do not create many instances.
HashedWheelTimer creates a new thread whenever it is instantiated and started. Therefore, you should make sure to create only one instance and share it across your application. One of the common mistakes, that makes your application unresponsive, is to create a new instance for every connection.

However, it seems we are creating a Timer for every http request we receive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to static

dvlato
dvlato previously requested changes Jan 20, 2020
Copy link
Contributor

@dvlato dvlato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comment about HashedWheelTimer

import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.Timer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import seems unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import seems unnecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


TimerTask timerTask = timeout -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need curly braces here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed :)

@@ -16,6 +16,5 @@
<logger name="com.hotels.styx.http-messages.inbound" level="INFO"/>
<logger name="com.hotels.styx.http-messages.outbound" level="INFO"/>
<logger name="com.hotels.styx.client.OriginRestrictionLoadBalancingStrategy" level="INFO"/>
<logger name="com.hotels.styx.client.netty.connectionpool.FlowControllingHttpContentProducer" level="INFO"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove this or just rename with the appropriate package name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it as the class has moved to a different module

private static final Logger LOGGER = LoggerFactory.getLogger(FlowControllingHttpContentProducer.class);
private static final int MAX_DEPTH = 1;

private final StateMachine<ProducerState> stateMachine;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be kept final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, not sure why I removed final

@OwenLindsell OwenLindsell requested a review from mikkokar April 6, 2020 12:53
@OwenLindsell OwenLindsell dismissed dvlato’s stale review April 6, 2020 12:55

David comments have been addressed and David left the company

Copy link
Contributor Author

@OwenLindsell OwenLindsell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made changes as requested :)

private static final Logger LOGGER = LoggerFactory.getLogger(FlowControllingHttpContentProducer.class);
private static final int MAX_DEPTH = 1;

private final StateMachine<ProducerState> stateMachine;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, not sure why I removed final

@@ -70,6 +83,9 @@

private volatile Subscriber<? super ByteBuf> contentSubscriber;

private final HashedWheelTimer timer = new HashedWheelTimer();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to static

Copy link
Contributor Author

@OwenLindsell OwenLindsell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

@OwenLindsell OwenLindsell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

getContentProducer(ctx).tearDownResources("idle timeout");
FlowControllingHttpContentProducer producer = getContentProducer(ctx);
getContentProducer(ctx).tearDownResources(
new ResponseTimeoutException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A strictly correct behaviour is to use a ResponseTimeoutException whenever the HTTP headers have yet not been received, and a ContentTimeoutException otherwise. We could use a presence of producer to determine this.

But this retains the existing behaviour, and therefore we can do this as a separate increment. Could you raise a GH issue for this please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

@mikkokar
Copy link
Contributor

mikkokar commented Apr 7, 2020

HI @OwenLindsell there are check style related changes to the pom.xml files. They look bit unrelated. Could you add a quick comment to this PR to shed some light on them?

Just a quick note will serve as a record for the future maintainers. This would be really helpful.

@OwenLindsell
Copy link
Contributor Author

HI @OwenLindsell there are check style related changes to the pom.xml files. They look bit unrelated. Could you add a quick comment to this PR to shed some light on them?

Just a quick note will serve as a record for the future maintainers. This would be really helpful.

pom changes were reverted yesterday

… an InactiveSubscriberException or a ResponseTimeoutException. This is because both timeouts are set by the same config - idleResponseTimeout. In the future we should implement these to be separately configurable.
@OwenLindsell OwenLindsell merged commit cd2ae43 into ExpediaGroup:master Apr 8, 2020
@OwenLindsell OwenLindsell deleted the replace-rxjava-with-reactor-part4 branch April 9, 2020 09:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants