From 96d480bd1407e3f1814ab7b50330a6b49f0d37ef Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Tue, 7 May 2024 11:36:55 +0200 Subject: [PATCH 1/2] adds shard reconnect mechanic if connection to manager is lost Signed-off-by: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> --- .../bakdata/conquery/commands/ShardNode.java | 58 +++++++++++++++---- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index cb4e7b8b87..c3f34a6970 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -29,6 +29,7 @@ import com.bakdata.conquery.models.messages.network.specific.AddShardNode; import com.bakdata.conquery.models.messages.network.specific.RegisterWorker; import com.bakdata.conquery.models.messages.network.specific.UpdateJobManagerStatus; +import com.bakdata.conquery.models.worker.IdResolveContext; import com.bakdata.conquery.models.worker.Worker; import com.bakdata.conquery.models.worker.WorkerInformation; import com.bakdata.conquery.models.worker.Workers; @@ -51,6 +52,7 @@ import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.FilterEvent; import org.apache.mina.transport.socket.nio.NioSocketConnector; +import org.jetbrains.annotations.NotNull; /** * This node holds a shard of data (in so called {@link Worker}s for the different datasets in conquery. @@ -64,6 +66,7 @@ public class ShardNode extends ConqueryCommand implements IoHandler, Managed { public static final String DEFAULT_NAME = "shard-node"; private NioSocketConnector connector; + private ConnectFuture future; private JobManager jobManager; private Validator validator; private ConqueryConfig config; @@ -88,7 +91,6 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig this.environment = environment; this.config = config; - connector = new NioSocketConnector(); jobManager = new JobManager(getName(), config.isFailOnError()); environment.lifecycle().manage(this); @@ -106,6 +108,7 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig getConfig().getQueries().getSecondaryIdSubPlanRetention() ); + final Collection workerStorages = config.getStorage().discoverWorkerStorages(); @@ -178,7 +181,7 @@ public ObjectMapper createInternalObjectMapper(Class viewClass) final MutableInjectableValues injectableValues = new MutableInjectableValues(); objectMapper.setInjectableValues(injectableValues); - injectableValues.add(Validator.class, getValidator()); + injectableValues.add(Validator.class, getEnvironment().getValidator()); // Set serialization config @@ -268,6 +271,8 @@ private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSes public void sessionClosed(IoSession session) { setLocation(session); log.info("Disconnected from ManagerNode."); + + scheduler.schedule(this::connectToCluster, 2, TimeUnit.SECONDS); } @Override @@ -291,7 +296,9 @@ public void messageSent(IoSession session, Object message) { @Override public void inputClosed(IoSession session) { setLocation(session); - log.info("Session closed."); + log.info("Input closed."); + session.closeNow(); + scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS); } @Override @@ -306,24 +313,27 @@ public void start() throws Exception { value.getJobManager().addSlowJob(new SimpleJob("Update Bucket Manager", value.getBucketManager()::fullUpdate)); } - ObjectMapper om = createInternalObjectMapper(View.InternalCommunication.class); + scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES); - BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, validator, om); - connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om))); - connector.setHandler(this); - connector.getSessionConfig().setAll(config.getCluster().getMina()); + } + + private void connectToCluster() { InetSocketAddress address = new InetSocketAddress( config.getCluster().getManagerURL().getHostAddress(), config.getCluster().getPort() ); + disconnectFromCluster(); + + connector = getClusterConnector(workers); + while (true) { try { log.info("Trying to connect to {}", address); // Try opening a connection (Note: This fails immediately instead of waiting a minute to try and connect) - ConnectFuture future = connector.connect(address); + future = connector.connect(address); future.awaitUninterruptibly(); @@ -339,9 +349,24 @@ public void start() throws Exception { catch (RuntimeIoException e) { log.warn("Failed to connect to {}", address, e); } + catch (InterruptedException e) { + log.warn("Interrupted while trying to connector to cluster, giving up.", e); + break; + } } + } + @NotNull + private NioSocketConnector getClusterConnector(IdResolveContext workers) { + ObjectMapper om = createInternalObjectMapper(View.InternalCommunication.class); + + NioSocketConnector connector = new NioSocketConnector(); + BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, validator, om); + connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om))); + connector.setHandler(this); + connector.getSessionConfig().setAll(config.getCluster().getMina()); + return connector; } @Override @@ -350,12 +375,23 @@ public void stop() throws Exception { workers.stop(); + disconnectFromCluster(); + } + + private void disconnectFromCluster() { + if (future != null) { + future.cancel(); + } + //after the close command was send if (context != null) { context.awaitClose(); } - log.info("Connection was closed by ManagerNode"); - connector.dispose(); + + if (connector != null) { + log.info("Connection was closed by ManagerNode"); + connector.dispose(); + } } public boolean isBusy() { From a3f44fe849d677ef40f642271bfb29cdd44b8c7d Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Tue, 7 May 2024 14:35:26 +0200 Subject: [PATCH 2/2] fix getValidator NPE Signed-off-by: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> --- .../src/main/java/com/bakdata/conquery/commands/ShardNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index c3f34a6970..2293959fec 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -181,7 +181,7 @@ public ObjectMapper createInternalObjectMapper(Class viewClass) final MutableInjectableValues injectableValues = new MutableInjectableValues(); objectMapper.setInjectableValues(injectableValues); - injectableValues.add(Validator.class, getEnvironment().getValidator()); + injectableValues.add(Validator.class, getValidator()); // Set serialization config