Skip to content

Commit 10af60b

Browse files
committed
Transport: Refactor guice startup
* Removed & refactored unused module code * Allowed to set transports programmatically * Allow to set the source of the changed transport Note: The current implementation breaks BWC as you need to specify a concrete transport now instead of a module if you want to use a different Transport or HttpServerTransport Closes #7289
1 parent 1efb685 commit 10af60b

11 files changed

+303
-358
lines changed

src/main/java/org/elasticsearch/http/HttpServerModule.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,50 @@
1919

2020
package org.elasticsearch.http;
2121

22-
import com.google.common.collect.ImmutableList;
2322
import org.elasticsearch.common.inject.AbstractModule;
24-
import org.elasticsearch.common.inject.Module;
25-
import org.elasticsearch.common.inject.Modules;
26-
import org.elasticsearch.common.inject.SpawnModules;
23+
import org.elasticsearch.common.logging.ESLogger;
24+
import org.elasticsearch.common.logging.Loggers;
2725
import org.elasticsearch.common.settings.Settings;
28-
import org.elasticsearch.http.netty.NettyHttpServerTransportModule;
26+
import org.elasticsearch.http.netty.NettyHttpServerTransport;
27+
import org.elasticsearch.plugins.Plugin;
28+
29+
import static org.elasticsearch.common.Preconditions.checkNotNull;
2930

3031
/**
3132
*
3233
*/
33-
public class HttpServerModule extends AbstractModule implements SpawnModules {
34+
public class HttpServerModule extends AbstractModule {
3435

3536
private final Settings settings;
37+
private final ESLogger logger;
38+
39+
private Class<? extends HttpServerTransport> configuredHttpServerTransport;
40+
private String configuredHttpServerTransportSource;
3641

3742
public HttpServerModule(Settings settings) {
3843
this.settings = settings;
39-
}
40-
41-
@Override
42-
public Iterable<? extends Module> spawnModules() {
43-
return ImmutableList.of(Modules.createModule(settings.getAsClass("http.type", NettyHttpServerTransportModule.class, "org.elasticsearch.http.", "HttpServerTransportModule"), settings));
44+
this.logger = Loggers.getLogger(getClass(), settings);
4445
}
4546

4647
@SuppressWarnings({"unchecked"})
4748
@Override
4849
protected void configure() {
50+
if (configuredHttpServerTransport != null) {
51+
logger.info("Using [{}] as http transport, overridden by [{}]", configuredHttpServerTransport.getName(), configuredHttpServerTransportSource);
52+
bind(HttpServerTransport.class).to(configuredHttpServerTransport).asEagerSingleton();
53+
} else {
54+
Class<? extends HttpServerTransport> defaultHttpServerTransport = NettyHttpServerTransport.class;
55+
Class<? extends HttpServerTransport> httpServerTransport = settings.getAsClass("http.type", defaultHttpServerTransport, "org.elasticsearch.http.", "HttpServerTransport");
56+
bind(HttpServerTransport.class).to(httpServerTransport).asEagerSingleton();
57+
}
58+
4959
bind(HttpServer.class).asEagerSingleton();
5060
}
61+
62+
public void setHttpServerTransport(Class<? extends HttpServerTransport> httpServerTransport, String source) {
63+
checkNotNull(httpServerTransport, "Configured http server transport may not be null");
64+
checkNotNull(source, "Plugin, that changes transport may not be null");
65+
this.configuredHttpServerTransport = httpServerTransport;
66+
this.configuredHttpServerTransportSource = source;
67+
}
5168
}

src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransportModule.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/main/java/org/elasticsearch/transport/TransportModule.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,74 @@
1919

2020
package org.elasticsearch.transport;
2121

22-
import com.google.common.collect.ImmutableList;
2322
import org.elasticsearch.cluster.node.DiscoveryNode;
2423
import org.elasticsearch.common.inject.AbstractModule;
25-
import org.elasticsearch.common.inject.Module;
26-
import org.elasticsearch.common.inject.Modules;
27-
import org.elasticsearch.common.inject.SpawnModules;
24+
import org.elasticsearch.common.logging.ESLogger;
25+
import org.elasticsearch.common.logging.Loggers;
2826
import org.elasticsearch.common.settings.Settings;
29-
import org.elasticsearch.transport.local.LocalTransportModule;
30-
import org.elasticsearch.transport.netty.NettyTransportModule;
27+
import org.elasticsearch.plugins.Plugin;
28+
import org.elasticsearch.transport.local.LocalTransport;
29+
import org.elasticsearch.transport.netty.NettyTransport;
30+
31+
import static org.elasticsearch.common.Preconditions.checkNotNull;
3132

3233
/**
3334
*
3435
*/
35-
public class TransportModule extends AbstractModule implements SpawnModules {
36+
public class TransportModule extends AbstractModule {
3637

37-
private final Settings settings;
38-
3938
public static final String TRANSPORT_TYPE_KEY = "transport.type";
4039
public static final String TRANSPORT_SERVICE_TYPE_KEY = "transport.service.type";
4140

41+
private final ESLogger logger;
42+
private final Settings settings;
43+
44+
private Class<? extends TransportService> configuredTransportService;
45+
private Class<? extends Transport> configuredTransport;
46+
private String configuredTransportServiceSource;
47+
private String configuredTransportSource;
48+
4249
public TransportModule(Settings settings) {
4350
this.settings = settings;
51+
this.logger = Loggers.getLogger(getClass(), settings);
4452
}
4553

4654
@Override
47-
public Iterable<? extends Module> spawnModules() {
48-
Class<? extends Module> defaultTransportModule;
49-
if (DiscoveryNode.localNode(settings)) {
50-
defaultTransportModule = LocalTransportModule.class;
55+
protected void configure() {
56+
if (configuredTransportService != null) {
57+
logger.info("Using [{}] as transport service, overridden by [{}]", configuredTransportService.getName(), configuredTransportServiceSource);
58+
bind(TransportService.class).to(configuredTransportService).asEagerSingleton();
5159
} else {
52-
defaultTransportModule = NettyTransportModule.class;
60+
Class<? extends TransportService> defaultTransportService = TransportService.class;
61+
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, defaultTransportService, "org.elasticsearch.transport.", "TransportService");
62+
if (!TransportService.class.equals(transportService)) {
63+
bind(TransportService.class).to(transportService).asEagerSingleton();
64+
} else {
65+
bind(TransportService.class).asEagerSingleton();
66+
}
5367
}
54-
return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
55-
}
5668

57-
@Override
58-
protected void configure() {
59-
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
60-
if (!TransportService.class.equals(transportService)) {
61-
bind(TransportService.class).to(transportService).asEagerSingleton();
69+
if (configuredTransport != null) {
70+
logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource);
71+
bind(Transport.class).to(configuredTransport).asEagerSingleton();
6272
} else {
63-
bind(TransportService.class).asEagerSingleton();
73+
Class<? extends Transport> defaultTransport = DiscoveryNode.localNode(settings) ? LocalTransport.class : NettyTransport.class;
74+
Class<? extends Transport> transport = settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransport, "org.elasticsearch.transport.", "Transport");
75+
bind(Transport.class).to(transport).asEagerSingleton();
6476
}
6577
}
78+
79+
public void setTransportService(Class<? extends TransportService> transportService, String source) {
80+
checkNotNull(transportService, "Configured transport service may not be null");
81+
checkNotNull(source, "Plugin, that changes transport service may not be null");
82+
this.configuredTransportService = transportService;
83+
this.configuredTransportServiceSource = source;
84+
}
85+
86+
public void setTransport(Class<? extends Transport> transport, String source) {
87+
checkNotNull(transport, "Configured transport may not be null");
88+
checkNotNull(source, "Plugin, that changes transport may not be null");
89+
this.configuredTransport = transport;
90+
this.configuredTransportSource = source;
91+
}
6692
}

src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

src/main/java/org/elasticsearch/transport/netty/NettyTransportModule.java

Lines changed: 0 additions & 42 deletions
This file was deleted.
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.plugins;
20+
21+
import org.elasticsearch.Version;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.common.inject.Inject;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.test.ElasticsearchIntegrationTest;
26+
import org.elasticsearch.test.transport.AssertingLocalTransport;
27+
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.transport.*;
29+
import org.junit.Test;
30+
31+
import java.io.IOException;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
34+
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
35+
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
36+
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
37+
import static org.hamcrest.Matchers.*;
38+
39+
/**
40+
*
41+
*/
42+
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
43+
public class PluggableTransportModuleTests extends ElasticsearchIntegrationTest {
44+
45+
public static final AtomicInteger SENT_REQUEST_COUNTER = new AtomicInteger(0);
46+
47+
@Override
48+
protected Settings nodeSettings(int nodeOrdinal) {
49+
return settingsBuilder()
50+
.put("plugin.types", CountingSentRequestsPlugin.class.getName())
51+
.put(super.nodeSettings(nodeOrdinal))
52+
.build();
53+
}
54+
55+
@Test
56+
public void testThatPluginFunctionalityIsLoadedWithoutConfiguration() throws Exception {
57+
for (Transport transport : internalCluster().getInstances(Transport.class)) {
58+
assertThat(transport, instanceOf(CountingAssertingLocalTransport.class));
59+
}
60+
61+
// the cluster node communication on start up is sufficient to increase the counter
62+
// no need to do anything specific
63+
int count = SENT_REQUEST_COUNTER.get();
64+
assertThat("Expected send request counter to be greather than zero", count, is(greaterThan(0)));
65+
66+
// sending a new request via client node will increase the sent requests
67+
internalCluster().clientNodeClient().admin().cluster().prepareHealth().get();
68+
assertThat("Expected send request counter to be greather than zero", SENT_REQUEST_COUNTER.get(), is(greaterThan(count)));
69+
}
70+
71+
public static class CountingSentRequestsPlugin extends AbstractPlugin {
72+
@Override
73+
public String name() {
74+
return "counting-pipelines-plugin";
75+
}
76+
77+
@Override
78+
public String description() {
79+
return "counting-pipelines-plugin";
80+
}
81+
82+
public void onModule(TransportModule transportModule) {
83+
transportModule.setTransport(CountingAssertingLocalTransport.class, this.name());
84+
}
85+
}
86+
87+
public static final class CountingAssertingLocalTransport extends AssertingLocalTransport {
88+
89+
@Inject
90+
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
91+
super(settings, threadPool, version);
92+
}
93+
94+
@Override
95+
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
96+
SENT_REQUEST_COUNTER.incrementAndGet();
97+
super.sendRequest(node, requestId, action, request, options);
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)