Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow client config of WebSocket per message deflate #10357

Merged
merged 16 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,38 @@ public class DefaultHttpClientConfiguration extends HttpClientConfiguration {
*/
public static final String PREFIX = "micronaut.http.client";
private final DefaultConnectionPoolConfiguration connectionPoolConfiguration;
private final DefaultWebSocketCompressionConfiguration webSocketCompressionConfiguration;

/**
* Default constructor.
*/
public DefaultHttpClientConfiguration() {
this.connectionPoolConfiguration = new DefaultConnectionPoolConfiguration();
this.webSocketCompressionConfiguration = new DefaultWebSocketCompressionConfiguration();
}

/**
* @param connectionPoolConfiguration The connection pool configuration
* @param webSocketCompressionConfiguration The WebSocket compression configuration
* @param applicationConfiguration The application configuration
*/
@Inject
public DefaultHttpClientConfiguration(DefaultConnectionPoolConfiguration connectionPoolConfiguration, ApplicationConfiguration applicationConfiguration) {
public DefaultHttpClientConfiguration(DefaultConnectionPoolConfiguration connectionPoolConfiguration, DefaultWebSocketCompressionConfiguration webSocketCompressionConfiguration, ApplicationConfiguration applicationConfiguration) {
super(applicationConfiguration);
this.connectionPoolConfiguration = connectionPoolConfiguration;
this.webSocketCompressionConfiguration = webSocketCompressionConfiguration;
}

@Override
public ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return connectionPoolConfiguration;
}

@Override
public WebSocketCompressionConfiguration getWebSocketCompressionConfiguration() {
return webSocketCompressionConfiguration;
}

/**
* Uses the default SSL configuration.
*
Expand All @@ -82,4 +91,13 @@ public void setClientSslConfiguration(@Nullable ClientSslConfiguration sslConfig
@Primary
public static class DefaultConnectionPoolConfiguration extends ConnectionPoolConfiguration {
}

/**
* The default WebSocket compression configuration.
*/
@ConfigurationProperties(WebSocketCompressionConfiguration.PREFIX)
@BootstrapContextCompatible
@Primary
public static class DefaultWebSocketCompressionConfiguration extends WebSocketCompressionConfiguration {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public abstract class HttpClientConfiguration {
/**
* The default value.
*/
@SuppressWarnings("WeakerAccess")
public static final boolean DEFAULT_EXCEPTION_ON_ERROR_STATUS = true;

/**
Expand Down Expand Up @@ -302,6 +301,13 @@ public void setSslConfiguration(SslConfiguration sslConfiguration) {
this.sslConfiguration = sslConfiguration;
}

/**
* Obtains the WebSocket compression configuration.
*
* @return The WebSocket compression configuration.
*/
public abstract WebSocketCompressionConfiguration getWebSocketCompressionConfiguration();
sdelamo marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return Whether redirects should be followed
*/
Expand Down Expand Up @@ -934,4 +940,41 @@ public void setMaxConcurrentHttp2Connections(int maxConcurrentHttp2Connections)
}
}

/**
* Configuration for WebSocket client compression extensions.
*/
public static class WebSocketCompressionConfiguration implements Toggleable {

/**
* The prefix to use for configuration.
*/
public static final String PREFIX = "ws.compression";

/**
* The default enable value.
*/
@SuppressWarnings("WeakerAccess")
public static final boolean DEFAULT_ENABLED = true;

private boolean enabled = DEFAULT_ENABLED;

/**
* Whether deflate compression is enabled for client WebSocket connections.
*
* @return True if the per message deflate extension is enabled.
*/
public boolean isEnabled() {
return enabled;
}

/**
* Sets whether the per message deflate extension is enabled for WebSocket connections.
* Default value ({@link io.micronaut.http.client.HttpClientConfiguration.WebSocketCompressionConfiguration#DEFAULT_ENABLED}).
*
* @param enabled True is it is enabled.
*/
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ServiceHttpClientConfiguration extends HttpClientConfiguration impl

private final String serviceId;
private final ServiceConnectionPoolConfiguration connectionPoolConfiguration;
private final ServiceWebSocketCompressionConfiguration webSocketCompressionConfiguration;
private List<URI> urls = Collections.emptyList();
private String healthCheckUri = DEFAULT_HEALTHCHECKURI;
private boolean healthCheck = DEFAULT_HEALTHCHECK;
Expand All @@ -79,12 +80,14 @@ public class ServiceHttpClientConfiguration extends HttpClientConfiguration impl
*
* @param serviceId The service id
* @param connectionPoolConfiguration The connection pool configuration
* @param webSocketCompressionConfiguration The WebSocket compression configuration
* @param sslConfiguration The SSL configuration
* @param applicationConfiguration The application configuration
*/
public ServiceHttpClientConfiguration(
@Parameter String serviceId,
@Nullable ServiceConnectionPoolConfiguration connectionPoolConfiguration,
@Nullable ServiceWebSocketCompressionConfiguration webSocketCompressionConfiguration,
@Nullable ServiceSslClientConfiguration sslConfiguration,
ApplicationConfiguration applicationConfiguration) {
super(applicationConfiguration);
Expand All @@ -97,20 +100,27 @@ public ServiceHttpClientConfiguration(
} else {
this.connectionPoolConfiguration = new ServiceConnectionPoolConfiguration();
}
if (webSocketCompressionConfiguration != null) {
this.webSocketCompressionConfiguration = webSocketCompressionConfiguration;
} else {
this.webSocketCompressionConfiguration = new ServiceWebSocketCompressionConfiguration();
}
}

/**
* Creates a new client configuration for the given service ID.
*
* @param serviceId The service id
* @param connectionPoolConfiguration The connection pool configuration
* @param webSocketCompressionConfiguration The WebSocket compression configuration
* @param sslConfiguration The SSL configuration
* @param defaultHttpClientConfiguration The default HTTP client configuration
*/
@Inject
public ServiceHttpClientConfiguration(
@Parameter String serviceId,
@Nullable ServiceConnectionPoolConfiguration connectionPoolConfiguration,
@Nullable ServiceWebSocketCompressionConfiguration webSocketCompressionConfiguration,
@Nullable ServiceSslClientConfiguration sslConfiguration,
HttpClientConfiguration defaultHttpClientConfiguration) {
super(defaultHttpClientConfiguration);
Expand All @@ -123,6 +133,11 @@ public ServiceHttpClientConfiguration(
} else {
this.connectionPoolConfiguration = new ServiceConnectionPoolConfiguration();
}
if (webSocketCompressionConfiguration != null) {
this.webSocketCompressionConfiguration = webSocketCompressionConfiguration;
} else {
this.webSocketCompressionConfiguration = new ServiceWebSocketCompressionConfiguration();
}
}

/**
Expand Down Expand Up @@ -248,13 +263,25 @@ public ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return connectionPoolConfiguration;
}

@Override
public WebSocketCompressionConfiguration getWebSocketCompressionConfiguration() {
return webSocketCompressionConfiguration;
}

/**
* The default connection pool configuration.
*/
@ConfigurationProperties(ConnectionPoolConfiguration.PREFIX)
public static class ServiceConnectionPoolConfiguration extends ConnectionPoolConfiguration {
}

/**
* The default WebSocket compression configuration.
*/
@ConfigurationProperties(WebSocketCompressionConfiguration.PREFIX)
public static class ServiceWebSocketCompressionConfiguration extends WebSocketCompressionConfiguration {
}

/**
* The default connection pool configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ class ClientLoggerNameSpec extends Specification {
ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return null
}

@Override
WebSocketCompressionConfiguration getWebSocketCompressionConfiguration() {
return null
}
}

@Singleton
Expand All @@ -75,5 +80,10 @@ class ClientLoggerNameSpec extends Specification {
ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return null
}

@Override
WebSocketCompressionConfiguration getWebSocketCompressionConfiguration() {
return null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ protected void initChannel(@NonNull Channel ch) {
}

try {
ch.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
if (configuration.getWebSocketCompressionConfiguration().isEnabled()) {
ch.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
}
ch.pipeline().addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_WEBSOCKET_CLIENT, handler);
clientCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION).onInitialPipelineBuilt();
if (initial.tryEmitEmpty().isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class DefaultNettyHttpClientRegistry implements AutoCloseable,
*/
public DefaultNettyHttpClientRegistry(
HttpClientConfiguration defaultHttpClientConfiguration,
HttpClientFilterResolver httpClientFilterResolver,
HttpClientFilterResolver<ClientFilterResolutionContext> httpClientFilterResolver,
LoadBalancerResolver loadBalancerResolver,
ClientSslBuilder nettyClientSslBuilder,
ThreadFactory threadFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.micronaut.http.client
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.core.io.socket.SocketUtils
import io.micronaut.http.client.DefaultHttpClientConfiguration.DefaultWebSocketCompressionConfiguration
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.netty.DefaultHttpClient
import io.micronaut.runtime.ApplicationConfiguration
Expand Down Expand Up @@ -54,17 +55,27 @@ class ClientSpecificLoggerSpec extends Specification {

private final DefaultHttpClientConfiguration.DefaultConnectionPoolConfiguration connectionPoolConfiguration

private final DefaultHttpClientConfiguration.DefaultWebSocketCompressionConfiguration webSocketCompressionConfiguration

@Inject
ClientTwoHttpConfiguration(ApplicationConfiguration applicationConfiguration, DefaultHttpClientConfiguration.DefaultConnectionPoolConfiguration connectionPoolConfiguration) {
ClientTwoHttpConfiguration(ApplicationConfiguration applicationConfiguration,
DefaultHttpClientConfiguration.DefaultConnectionPoolConfiguration connectionPoolConfiguration,
DefaultWebSocketCompressionConfiguration webSocketCompressionConfiguration) {
super(applicationConfiguration)
this.connectionPoolConfiguration = connectionPoolConfiguration
this.webSocketCompressionConfiguration = webSocketCompressionConfiguration
}

@Override
ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return this.connectionPoolConfiguration
}

@Override
WebSocketCompressionConfiguration getWebSocketCompressionConfiguration() {
return this.webSocketCompressionConfiguration
}

@Override
Optional<String> getLoggerName() {
return Optional.of("${ClientSpecificLoggerSpec.class}.client.two".toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class DefaultHttpClientConfigurationSpec extends Specification {
'shutdown-quiet-period' | 'shutdownQuietPeriod' | '2s' | Optional.of(Duration.ofSeconds(2))
'shutdown-timeout' | 'shutdownTimeout' | '100ms' | Optional.of(Duration.ofMillis(100))
'shutdown-timeout' | 'shutdownTimeout' | '15s' | Optional.of(Duration.ofSeconds(15))
'follow-redirects' | 'followRedirects' | 'false' | false
}


void "test pool config"() {
given:
def ctx = ApplicationContext.run(
Expand All @@ -78,6 +78,25 @@ class DefaultHttpClientConfigurationSpec extends Specification {
'enabled' | 'enabled' | 'false' | false
}

void "test WebSocket compression config"() {
given:
def ctx = ApplicationContext.run(
("micronaut.http.client.ws.compression.$key".toString()): value
)
HttpClientConfiguration config = ctx.getBean(HttpClientConfiguration)
HttpClientConfiguration.WebSocketCompressionConfiguration compressionConfig = config.getWebSocketCompressionConfiguration()

expect:
compressionConfig[property] == expected

cleanup:
ctx.close()

where:
key | property | value | expected
'enabled' | 'enabled' | 'false' | false
}

timyates marked this conversation as resolved.
Show resolved Hide resolved
void "test overriding logger for the client"() {
given:
def ctx = ApplicationContext.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class BinaryWebSocketSpec extends Specification {

fred.sendMany()

then:
then: "Messages were compressed"
conditions.eventually {
bob.replies.contains("[fred] abcdef")
}
Expand All @@ -263,6 +263,64 @@ class BinaryWebSocketSpec extends Specification {
embeddedServer.close()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/7711')
void "test per-message compression disabled"() {
given:
def ctx = ApplicationContext.run([
'spec.name' : 'test per-message compression',
'micronaut.server.port': -1,
'micronaut.http.client.ws.compression.enabled': false
])
def cdcServer = ctx.getBean(CompressionDetectionCustomizerServer)
def cdcClient = ctx.getBean(CompressionDetectionCustomizerClient)
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer)
embeddedServer.start()
PollingConditions conditions = new PollingConditions(timeout: 15, delay: 0.5)

when: "a websocket connection is established"
WebSocketClient wsClient = embeddedServer.applicationContext.createBean(WebSocketClient, embeddedServer.getURI())
BinaryChatClientWebSocket fred = wsClient.connect(BinaryChatClientWebSocket, "/binary/chat/stuff/fred").blockFirst()
BinaryChatClientWebSocket bob = wsClient.connect(BinaryChatClientWebSocket, [topic: "stuff", username: "bob"]).blockFirst()


then: "The connection is valid"
fred.session != null
fred.session.id != null
conditions.eventually {
fred.replies.contains("[bob] Joined!")
fred.replies.size() == 1
}

cdcServer.getPipelines().size() == 2
cdcClient.getPipelines().size() == 2

when: "A message is sent"
List<MessageInterceptor> interceptors = new ArrayList<>()
for (ChannelPipeline pipeline : cdcServer.getPipelines() + cdcClient.getPipelines()) {
def interceptor = new MessageInterceptor()
if (pipeline.get('ws-encoder') != null) {
pipeline.addAfter('ws-encoder', 'MessageInterceptor', interceptor)
} else {
pipeline.addAfter('wsencoder', 'MessageInterceptor', interceptor)
}
interceptors.add(interceptor)
}

fred.sendMany()

then: "Messages were not compressed"
conditions.eventually {
bob.replies.contains("[fred] abcdef")
}
interceptors.every { !it.seenCompressedMessage }

cleanup:
fred.close()
bob.close()
wsClient.close()
embeddedServer.close()
}

@Singleton
@Requires(property = 'spec.name', value = 'test per-message compression')
static class CompressionDetectionCustomizerServer implements BeanCreatedEventListener<NettyServerCustomizer.Registry> {
Expand Down
Loading
Loading