Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/separate manager restart #3469

Merged
merged 5 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.bakdata.conquery.Conquery;
import com.bakdata.conquery.mode.cluster.ClusterManager;
Expand Down Expand Up @@ -84,7 +80,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
.setNameFormat("ShardNode Storage Loader %d")
.setUncaughtExceptionHandler((t, e) -> {
ConqueryMDC.setLocation(t.getName());
log.error(t.getName() + " failed to init storage of ShardNode", e);
log.error("{} failed to init storage of ShardNode", t.getName(), e);
})
.build()
);
Expand Down Expand Up @@ -121,6 +117,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
for (Future<ShardNode> f : tasks) {
try {
f.get();

}
catch (ExecutionException e) {
log.error("during ShardNodes creation", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.bakdata.conquery.commands;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.cps.CPSTypeIdResolver;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
Expand All @@ -31,24 +31,18 @@
import com.bakdata.conquery.tasks.PermissionCleanupTask;
import com.bakdata.conquery.tasks.QueryCleanupTask;
import com.bakdata.conquery.tasks.ReloadMetaStorageTask;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.google.common.base.Throwables;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jersey.DropwizardResourceConfig;
import io.dropwizard.lifecycle.Managed;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.FilterEvent;
import org.glassfish.jersey.internal.inject.AbstractBinder;

/**
Expand All @@ -58,7 +52,7 @@
*/
@Slf4j
@Getter
public class ManagerNode extends IoHandlerAdapter implements Managed {
public class ManagerNode implements Managed {

public static final String DEFAULT_NAME = "manager";

Expand Down Expand Up @@ -281,45 +275,4 @@ public void stop() throws Exception {
}

}

private void setLocation(IoSession session) {
final String loc = session.getLocalAddress().toString();
ConqueryMDC.setLocation(loc);
}

@Override
public void sessionClosed(IoSession session) {
setLocation(session);
log.info("Disconnected.");
}

@Override
public void sessionCreated(IoSession session) {
setLocation(session);
log.debug("Session created.");
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
setLocation(session);
log.warn("Session idle {}. Last read: {}. Last write: {}.", status, Instant.ofEpochMilli(session.getLastReadTime()), Instant.ofEpochMilli(session.getLastWriteTime()));
}

@Override
public void messageSent(IoSession session, Object message) {
setLocation(session);
log.trace("Message sent: {}", message);
}

@Override
public void inputClosed(IoSession session) {
setLocation(session);
log.info("Session closed.");
}

@Override
public void event(IoSession session, FilterEvent event) throws Exception {
setLocation(session);
log.trace("Event handled: {}", event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import io.dropwizard.core.setup.Environment;

Expand All @@ -29,26 +27,10 @@ static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig
return new InternalObjectMapperCreator(config, validator);
}

static DatasetRegistry<DistributedNamespace> createDistributedDatasetRegistry(
NamespaceHandler<DistributedNamespace> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
) {
return createDatasetRegistry(namespaceHandler, creator, config);
}

static DatasetRegistry<LocalNamespace> createLocalDatasetRegistry(
NamespaceHandler<LocalNamespace> namespaceHandler,
static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
) {
return createDatasetRegistry(namespaceHandler, creator, config);
}

private static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
InternalObjectMapperCreator creator,
ConqueryConfig config
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
DatasetRegistry<N> datasetRegistry = new DatasetRegistry<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,22 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.mina.BinaryJacksonCoder;
import com.bakdata.conquery.io.mina.CQProtocolCodecFilter;
import com.bakdata.conquery.io.mina.ChunkReader;
import com.bakdata.conquery.io.mina.ChunkWriter;
import com.bakdata.conquery.io.mina.MinaAttributes;
import com.bakdata.conquery.io.mina.NetworkSession;
import com.bakdata.conquery.io.mina.*;
import com.bakdata.conquery.mode.InternalObjectMapperCreator;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.jobs.ReactingJob;
import com.bakdata.conquery.models.messages.SlowMessage;
import com.bakdata.conquery.models.messages.namespaces.specific.ShutdownShard;
import com.bakdata.conquery.models.messages.network.MessageToManagerNode;
import com.bakdata.conquery.models.messages.network.NetworkMessageContext;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -108,8 +102,6 @@ public void start() throws IOException {
}

public void stop() {
clusterState.getShardNodes().forEach(((socketAddress, shardNodeInformation) -> shardNodeInformation.send(new ShutdownShard())));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hat eh nie geklappt :D


try {
acceptor.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
import java.util.List;
import java.util.function.Supplier;

import com.bakdata.conquery.mode.DelegateManager;
import com.bakdata.conquery.mode.ImportHandler;
import com.bakdata.conquery.mode.InternalObjectMapperCreator;
import com.bakdata.conquery.mode.ManagerProvider;
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.mode.StorageListener;
import com.bakdata.conquery.mode.*;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.ClusterHealthCheck;
Expand All @@ -27,7 +22,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
final ClusterState clusterState = new ClusterState();
final NamespaceHandler<DistributedNamespace> namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDistributedDatasetRegistry(namespaceHandler, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

final ClusterConnectionManager connectionManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env

InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

return new DelegateManager<>(
Expand Down

This file was deleted.

Loading