diff --git a/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java b/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java index f5c4ec42fb4..8e945113ea4 100755 --- a/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java +++ b/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java @@ -445,6 +445,11 @@ public void loadAllDatabases() { // In remote does nothing } + @Override + public ODatabaseDocumentInternal openInternal(String iDbUrl, String user) { + throw new UnsupportedOperationException("Open for internal use is not supported in remote"); + } + @Override public ODatabaseDocumentInternal openNoAuthenticate(String iDbUrl, String user) { throw new UnsupportedOperationException( diff --git a/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java b/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java index 67eb0014c7d..4a8e106d10f 100755 --- a/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java @@ -1,18 +1,14 @@ package com.orientechnologies.common.thread; import com.orientechnologies.common.log.OLogManager; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; /** * The same as thread {@link ScheduledThreadPoolExecutor} but also logs all exceptions happened * inside of the tasks which caused tasks to stop. */ -public class OScheduledThreadPoolExecutorWithLogging extends ScheduledThreadPoolExecutor { +public class OScheduledThreadPoolExecutorWithLogging extends ScheduledThreadPoolExecutor + implements TracingScheduledExecutorService { public OScheduledThreadPoolExecutorWithLogging(int corePoolSize) { super(corePoolSize); } @@ -58,4 +54,168 @@ protected void afterExecute(Runnable r, Throwable t) { OLogManager.instance().errorNoDb(this, "Exception in thread '%s'", t, thread.getName()); } } + + @Override + public Future submit(String taskName, Callable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + return task.call(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task, T result) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }, + result); + } + + @Override + public void execute(String taskName, Runnable command) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + super.execute( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }); + } + + @Override + public Future submit(Runnable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Callable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Runnable task, T result) { + return submit(null, task, result); + } + + @Override + public void execute(Runnable command) { + execute(null, command); + } + + @Override + public ScheduledFuture schedule(String taskName, Runnable command, long delay, TimeUnit unit) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + return super.schedule( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }, + delay, + unit); + } + + @Override + public ScheduledFuture schedule( + String taskName, Callable task, long delay, TimeUnit unit) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.schedule( + () -> { + try { + return task.call(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }, + delay, + unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + String taskName, Runnable command, long initialDelay, long period, TimeUnit unit) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + return super.scheduleAtFixedRate( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }, + initialDelay, + period, + unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + String taskName, Runnable command, long initialDelay, long delay, TimeUnit unit) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + return super.scheduleWithFixedDelay( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }, + initialDelay, + delay, + unit); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return schedule(null, command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return schedule(null, callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return scheduleAtFixedRate(null, command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduleWithFixedDelay(null, command, initialDelay, delay, unit); + } } diff --git a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java index 200063dd62c..ef8eacf4a2f 100755 --- a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java @@ -2,6 +2,7 @@ import com.orientechnologies.common.log.OLogManager; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -14,7 +15,8 @@ * The same as thread {@link ThreadPoolExecutor} but also logs all exceptions happened inside of the * tasks which caused tasks to stop. */ -public class OThreadPoolExecutorWithLogging extends ThreadPoolExecutor { +public class OThreadPoolExecutorWithLogging extends ThreadPoolExecutor + implements TracingExecutorService { public OThreadPoolExecutorWithLogging( int corePoolSize, int maximumPoolSize, @@ -79,4 +81,78 @@ protected void afterExecute(Runnable r, Throwable t) { OLogManager.instance().errorNoDb(this, "Exception in thread '%s'", t, thread.getName()); } } + + @Override + public Future submit(String taskName, Callable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + return task.call(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task, T result) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }, + result); + } + + @Override + public void execute(String taskName, Runnable command) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + super.execute( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }); + } + + @Override + public Future submit(Runnable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Callable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Runnable task, T result) { + return submit(null, task, result); + } + + @Override + public void execute(Runnable command) { + execute(null, command); + } } diff --git a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java index 7ea40c3369b..12de74f1032 100644 --- a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java @@ -6,7 +6,7 @@ public class OThreadPoolExecutors { private OThreadPoolExecutors() {} - public static ExecutorService newScalingThreadPool( + public static TracingExecutorService newScalingThreadPool( String threadName, int corePoolSize, int maxPoolSize, @@ -23,7 +23,7 @@ public static ExecutorService newScalingThreadPool( timeoutUnit); } - public static ExecutorService newScalingThreadPool( + public static TracingExecutorService newScalingThreadPool( String threadName, ThreadGroup parentThreadGroup, int corePoolSize, @@ -40,7 +40,7 @@ public static ExecutorService newScalingThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newBlockingScalingThreadPool( + public static TracingExecutorService newBlockingScalingThreadPool( String threadName, ThreadGroup parentThreadGroup, int corePoolSize, @@ -59,11 +59,11 @@ public static ExecutorService newBlockingScalingThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newFixedThreadPool(String threadName, int poolSize) { + public static TracingExecutorService newFixedThreadPool(String threadName, int poolSize) { return newFixedThreadPool(threadName, Thread.currentThread().getThreadGroup(), poolSize); } - public static ExecutorService newFixedThreadPool( + public static TracingExecutorService newFixedThreadPool( String threadName, ThreadGroup parentThreadGroup, int poolSize) { return new OThreadPoolExecutorWithLogging( poolSize, @@ -74,16 +74,16 @@ public static ExecutorService newFixedThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newCachedThreadPool(String threadName) { + public static TracingExecutorService newCachedThreadPool(String threadName) { return newCachedThreadPool(threadName, Thread.currentThread().getThreadGroup()); } - public static ExecutorService newCachedThreadPool( + public static TracingExecutorService newCachedThreadPool( String threadName, ThreadGroup parentThreadGroup) { return newCachedThreadPool(threadName, parentThreadGroup, Integer.MAX_VALUE, 0); } - public static ExecutorService newCachedThreadPool( + public static TracingExecutorService newCachedThreadPool( String threadName, ThreadGroup parentThreadGroup, int maxThreads, int maxQueue) { return new OThreadPoolExecutorWithLogging( 0, @@ -94,11 +94,11 @@ public static ExecutorService newCachedThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newSingleThreadPool(String threadName) { + public static TracingExecutorService newSingleThreadPool(String threadName) { return newSingleThreadPool(threadName, Thread.currentThread().getThreadGroup()); } - public static ExecutorService newSingleThreadPool( + public static TracingExecutorService newSingleThreadPool( String threadName, ThreadGroup parentThreadGroup) { return new OThreadPoolExecutorWithLogging( 1, @@ -109,7 +109,7 @@ public static ExecutorService newSingleThreadPool( new SingletonNamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newSingleThreadPool( + public static TracingExecutorService newSingleThreadPool( String threadName, int maxQueue, RejectedExecutionHandler rejectHandler) { return new OThreadPoolExecutorWithLogging( 1, @@ -121,13 +121,30 @@ public static ExecutorService newSingleThreadPool( rejectHandler); } - public static ScheduledExecutorService newSingleThreadScheduledPool(String threadName) { + public static TracingScheduledExecutorService newSingleThreadScheduledPool(String threadName) { return newSingleThreadScheduledPool(threadName, Thread.currentThread().getThreadGroup()); } - public static ScheduledExecutorService newSingleThreadScheduledPool( + public static TracingScheduledExecutorService newSingleThreadScheduledPool( String threadName, ThreadGroup parentThreadGroup) { return new OScheduledThreadPoolExecutorWithLogging( 1, new SingletonNamedThreadFactory(threadName, parentThreadGroup)); } + + private static final TracingExecutorService GLOBAL_EXECUTOR = + newCachedThreadPool("OrientDB-Global"); + + public static void executeUnbound(Runnable task, String name) { + GLOBAL_EXECUTOR.submit( + name, + () -> { + final String poolThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(name); + task.run(); + } finally { + Thread.currentThread().setName(poolThreadName); + } + }); + } } diff --git a/core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java b/core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java new file mode 100755 index 00000000000..be8f289c11a --- /dev/null +++ b/core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java @@ -0,0 +1,51 @@ +package com.orientechnologies.common.thread; + +import com.orientechnologies.common.exception.OException; +import com.orientechnologies.common.log.OLogManager; + +public class OTracedExecutionException extends OException { + + public OTracedExecutionException(String message, Exception cause) { + super(message); + initCause(cause); + } + + public OTracedExecutionException(String message) { + super(message); + } + + private static String taskName(String taskName, Object task) { + if (taskName != null) { + return taskName; + } + if (task != null) { + return task.getClass().getSimpleName(); + } + return "?"; + } + + public static OTracedExecutionException prepareTrace(String taskName, Object task) { + final OTracedExecutionException trace; + if (OLogManager.instance().isDebugEnabled()) { + trace = + new OTracedExecutionException( + String.format("Async task [%s] failed", taskName(taskName, task))); + trace.fillInStackTrace(); + } else { + trace = null; + } + return trace; + } + + public static OTracedExecutionException trace( + OTracedExecutionException trace, Exception e, String taskName, Object task) + throws OTracedExecutionException { + if (trace != null) { + trace.initCause(e); + return trace; + } else { + return new OTracedExecutionException( + String.format("Async task [%s] failed", taskName(taskName, task)), e); + } + } +} diff --git a/core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java b/core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java new file mode 100644 index 00000000000..2e01b05ed6d --- /dev/null +++ b/core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java @@ -0,0 +1,16 @@ +package com.orientechnologies.common.thread; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public interface TracingExecutorService extends ExecutorService { + + Future submit(String taskName, Callable task); + + Future submit(String taskName, Runnable task); + + Future submit(String taskName, Runnable task, T result); + + void execute(String taskName, Runnable command); +} diff --git a/core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java b/core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java new file mode 100644 index 00000000000..45974304f85 --- /dev/null +++ b/core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java @@ -0,0 +1,20 @@ +package com.orientechnologies.common.thread; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public interface TracingScheduledExecutorService + extends TracingExecutorService, ScheduledExecutorService { + + ScheduledFuture schedule(String taskName, Runnable command, long delay, TimeUnit unit); + + ScheduledFuture schedule(String taskName, Callable callable, long delay, TimeUnit unit); + + ScheduledFuture scheduleAtFixedRate( + String taskName, Runnable command, long initialDelay, long period, TimeUnit unit); + + ScheduledFuture scheduleWithFixedDelay( + String taskName, Runnable command, long initialDelay, long delay, TimeUnit unit); +} diff --git a/core/src/main/java/com/orientechnologies/orient/core/Orient.java b/core/src/main/java/com/orientechnologies/orient/core/Orient.java index a7018c900b4..8d681300eb9 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/Orient.java +++ b/core/src/main/java/com/orientechnologies/orient/core/Orient.java @@ -411,6 +411,11 @@ public Orient shutdown() { } public TimerTask scheduleTask(final Runnable task, final long delay, final long period) { + return scheduleTask(task.getClass().getSimpleName(), task, delay, period); + } + + public TimerTask scheduleTask( + final String taskName, final Runnable task, final long delay, final long period) { engineLock.readLock().lock(); try { final TimerTask timerTask = @@ -421,16 +426,10 @@ public void run() { task.run(); } catch (Exception e) { OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); + .error(Orient.this, "Error during execution of task " + taskName, e); } catch (Error e) { OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); + .error(Orient.this, "Error during execution of task " + taskName, e); throw e; } } @@ -453,45 +452,7 @@ public void run() { } public TimerTask scheduleTask(final Runnable task, final Date firstTime, final long period) { - engineLock.readLock().lock(); - try { - final TimerTask timerTask = - new TimerTask() { - @Override - public void run() { - try { - task.run(); - } catch (Exception e) { - OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); - } catch (Error e) { - OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); - throw e; - } - } - }; - - if (active) { - if (period > 0) { - timer.schedule(timerTask, firstTime, period); - } else { - timer.schedule(timerTask, firstTime); - } - } else { - OLogManager.instance().warn(this, "OrientDB engine is down. Task will not be scheduled."); - } - - return timerTask; - } finally { - engineLock.readLock().unlock(); - } + return scheduleTask(task, firstTime.getTime() - System.currentTimeMillis(), period); } public boolean isActive() { diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index ed7d3dff59c..c4136b1ef5c 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -28,6 +28,7 @@ import com.orientechnologies.common.io.OIOUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.common.thread.OThreadPoolExecutors; +import com.orientechnologies.common.thread.TracingExecutorService; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.command.OCommandOutputListener; import com.orientechnologies.orient.core.command.script.OScriptManager; @@ -67,7 +68,6 @@ import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -92,7 +92,7 @@ public class OrientDBEmbedded implements OrientDBInternal { protected final Orient orient; protected final OCachedDatabasePoolFactory cachedPoolFactory; private volatile boolean open = true; - private ExecutorService executor; + private TracingExecutorService executor; private Timer timer; private TimerTask autoCloseTimer = null; private final OScriptManager scriptManager = new OScriptManager(); @@ -414,25 +414,23 @@ private long calculateDoubleWriteLogMaxSegSize(Path storagePath) throws IOExcept return maxSegSize; } - @Override - public ODatabaseDocumentInternal open(String name, String user, String password) { - return open(name, user, password, null); - } - - public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { + private ODatabaseDocumentEmbedded doOpen( + String name, String user, String password, boolean checkPassword, OrientDBConfig config) { checkDatabaseName(name); + checkOpen(); try { final ODatabaseDocumentEmbedded embedded; - OrientDBConfig config = solveConfig(null); + config = solveConfig(config); synchronized (this) { - checkOpen(); OAbstractPaginatedStorage storage = getAndOpenStorage(name, config); storage.incOnOpen(); embedded = newSessionInstance(storage); embedded.init(config, getOrCreateSharedContext(storage)); } embedded.rebuildIndexes(); - embedded.internalOpen(user, "nopwd", false); + if (user != null) { + embedded.internalOpen(user, password, checkPassword); + } embedded.callOnOpenListeners(); return embedded; } catch (Exception e) { @@ -441,55 +439,34 @@ public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { } } - protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage storage) { - return new ODatabaseDocumentEmbedded(storage); - } - - public ODatabaseDocumentEmbedded openNoAuthorization(String name) { - checkDatabaseName(name); - try { - final ODatabaseDocumentEmbedded embedded; - OrientDBConfig config = solveConfig(null); - synchronized (this) { - checkOpen(); - OAbstractPaginatedStorage storage = getAndOpenStorage(name, config); - storage.incOnOpen(); - embedded = newSessionInstance(storage); - embedded.init(config, getOrCreateSharedContext(storage)); - } - embedded.rebuildIndexes(); - embedded.callOnOpenListeners(); - return embedded; - } catch (Exception e) { - throw OException.wrapException( - new ODatabaseException("Cannot open database '" + name + "'"), e); - } + @Override + public final ODatabaseDocumentInternal open(String name, String user, String password) { + return open(name, user, password, null); } @Override public ODatabaseDocumentInternal open( String name, String user, String password, OrientDBConfig config) { - checkDatabaseName(name); checkDefaultPassword(name, user, password); - try { - final ODatabaseDocumentEmbedded embedded; - synchronized (this) { - checkOpen(); - config = solveConfig(config); - OAbstractPaginatedStorage storage = getAndOpenStorage(name, config); + return doOpen(name, user, password, true, config); + } - embedded = newSessionInstance(storage); - embedded.init(config, getOrCreateSharedContext(storage)); - storage.incOnOpen(); - } - embedded.rebuildIndexes(); - embedded.internalOpen(user, password); - embedded.callOnOpenListeners(); - return embedded; - } catch (Exception e) { - throw OException.wrapException( - new ODatabaseException("Cannot open database '" + name + "'"), e); - } + @Override + public final ODatabaseDocumentEmbedded openInternal(String name, String user) { + return doOpen(name, user, null, false, null); + } + + public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { + return doOpen(name, user, null, false, null); + } + + @Override + public ODatabaseDocumentEmbedded openNoAuthorization(String name) { + return doOpen(name, null, null, false, null); + } + + protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage storage) { + return new ODatabaseDocumentEmbedded(storage); } private OAbstractPaginatedStorage getAndOpenStorage(String name, OrientDBConfig config) { @@ -539,6 +516,7 @@ public ODatabaseDocumentInternal poolOpen( String name, String user, String password, ODatabasePoolInternal pool) { final ODatabaseDocumentEmbedded embedded; synchronized (this) { + checkDatabaseName(name); checkOpen(); OAbstractPaginatedStorage storage = getAndOpenStorage(name, pool.getConfig()); embedded = newPooledSessionInstance(pool, storage); @@ -603,7 +581,7 @@ protected String buildName(String name) { return basePath + "/" + name; } - public void create(String name, String user, String password, ODatabaseType type) { + public final void create(String name, String user, String password, ODatabaseType type) { create(name, user, password, type, null); } @@ -810,10 +788,8 @@ public synchronized boolean exists(String name, String user, String password) { @Override public void drop(String name, String user, String password) { - synchronized (this) { - checkOpen(); - } checkDatabaseName(name); + checkOpen(); ODatabaseDocumentInternal current = ODatabaseRecordThreadLocal.instance().getIfDefined(); try { ODatabaseDocumentInternal db = openNoAuthenticate(name, user); @@ -873,7 +849,7 @@ public synchronized void loadAllDatabases() { } } - public ODatabasePoolInternal openPool(String name, String user, String password) { + public final ODatabasePoolInternal openPool(String name, String user, String password) { return openPool(name, user, password, null); } @@ -888,7 +864,7 @@ public ODatabasePoolInternal openPool( } @Override - public ODatabasePoolInternal cachedPool(String database, String user, String password) { + public final ODatabasePoolInternal cachedPool(String database, String user, String password) { return cachedPool(database, user, password, null); } @@ -1087,9 +1063,31 @@ public Future execute(String database, String user, ODatabaseTask task }); } + public Future executeInternalNoAuthorization( + String taskName, String database, ODatabaseTask task) { + return executeInternal(taskName, database, null, task); + } + + public Future executeInternal( + String taskName, String database, String user, ODatabaseTask task) { + return executor.submit( + taskName, + () -> { + try (ODatabaseSession session = openInternal(database, null)) { + return task.call(session); + } + }); + } + @Override public Future executeNoAuthorization(String database, ODatabaseTask task) { + return executeNoAuthorization(null, database, task); + } + + public Future executeNoAuthorization( + String taskName, String database, ODatabaseTask task) { return executor.submit( + taskName, () -> { try (ODatabaseSession session = openNoAuthorization(database)) { return task.call(session); @@ -1098,7 +1096,11 @@ public Future executeNoAuthorization(String database, ODatabaseTask ta } public Future executeNoDb(Callable callable) { - return executor.submit(callable); + return executeNoDb(null, callable); + } + + public Future executeNoDb(String taskName, Callable callable) { + return executor.submit(taskName, callable); } public OScriptManager getScriptManager() { diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java index 8b1f35cd5cc..09877ee7b3a 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java @@ -288,6 +288,8 @@ static OrientDBInternal extract(OrientDB orientDB) { return orientDB.internal; } + ODatabaseDocumentInternal openInternal(String iDbUrl, String user); + ODatabaseDocumentInternal openNoAuthenticate(String iDbUrl, String user); ODatabaseDocumentInternal openNoAuthorization(String name); diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java index 80df1b779b9..679f75e94fa 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java @@ -1,5 +1,6 @@ package com.orientechnologies.orient.core.db.viewmanager; +import com.orientechnologies.common.concur.OOfflineNodeException; import com.orientechnologies.common.concur.lock.OInterruptedException; import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.log.OLogManager; @@ -10,6 +11,7 @@ import com.orientechnologies.orient.core.db.ODatabaseSession; import com.orientechnologies.orient.core.db.OLiveQueryResultListener; import com.orientechnologies.orient.core.db.OScenarioThreadLocal; +import com.orientechnologies.orient.core.db.OrientDBEmbedded; import com.orientechnologies.orient.core.db.OrientDBInternal; import com.orientechnologies.orient.core.db.document.ODatabaseDocument; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentEmbedded; @@ -97,15 +99,17 @@ public ViewManager(OrientDBInternal orientDb, String dbName) { } protected void init() { - orientDB.executeNoAuthorization( - dbName, - (db) -> { - // do this to make sure that the storage is already initialized and so is the shared - // context. - // you just don't need the db passed as a param here - registerLiveUpdates(db); - return null; - }); + ((OrientDBEmbedded) orientDB) + .executeInternalNoAuthorization( + "ViewManager.registerLiveUpdates", + dbName, + (db) -> { + // do this to make sure that the storage is already initialized and so is the shared + // context. + // you just don't need the db passed as a param here + registerLiveUpdates(db); + return null; + }); } private synchronized void registerLiveUpdates(ODatabaseSession db) { @@ -146,12 +150,22 @@ private void schedule() { public void run() { if (closed) return; lastTask = - orientDB.executeNoAuthorization( - dbName, - (db) -> { - ViewManager.this.updateViews((ODatabaseDocumentInternal) db); - return null; - }); + ((OrientDBEmbedded) orientDB) + .executeNoDb( + "ViewManager.updateViews", + () -> { + try (ODatabaseDocumentInternal db = + orientDB.openNoAuthorization(dbName)) { + ViewManager.this.updateViews(db); + } catch (OOfflineNodeException e) { + OLogManager.instance() + .debug(this, "Node offline when updating views", e); + } finally { + // When the run is finished schedule the next run. + schedule(); + } + return null; + }); } }; this.orientDB.scheduleOnce(timerTask, 1000); @@ -165,8 +179,6 @@ private void updateViews(ODatabaseDocumentInternal db) { if (view != null) { updateView(view, db); } - // When the run is finished schedule the next run. - schedule(); } catch (Exception e) { OLogManager.instance().warn(this, "Failed to update views", e); } @@ -178,18 +190,20 @@ public void close() { } if (lastTask != null) { try { - try { + // Try to cancel last task before it runs, otherwise wait for completion + if (!lastTask.cancel(false)) { lastTask.get(20, TimeUnit.SECONDS); - } catch (TimeoutException e) { - lastTask.cancel(true); - lastTask.get(); } - + } catch (TimeoutException e) { + OLogManager.instance() + .warn( + this, + "Timeout waiting for last task to complete view update background operations"); } catch (InterruptedException e) { throw OException.wrapException( new OInterruptedException("Terminated while waiting update view to finis"), e); } catch (ExecutionException e) { - OLogManager.instance().warn(this, "Issue terminating view update background operations", e); + // Will already have been logged by thread pool executor } } diff --git a/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java b/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java index 544aa508ae8..5a612a75b35 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java +++ b/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java @@ -48,6 +48,13 @@ public void init(OServer server) { this.server = server; } + private boolean isDistributedEnabled() { + if (server == null) { + return false; + } + return server.isDistributedPluginEnabled(); + } + public synchronized OHazelcastPlugin getPlugin() { if (plugin == null) { if (server != null && server.isActive()) plugin = server.getPlugin("cluster"); @@ -55,21 +62,29 @@ public synchronized OHazelcastPlugin getPlugin() { return plugin; } + @Override + public void create( + String name, String user, String password, ODatabaseType type, OrientDBConfig config) { + if (isDistributedEnabled() && (plugin == null)) { + throw new OOfflineNodeException("Distributed plugin is not active"); + } + super.create(name, user, password, type, config); + } + protected OSharedContext createSharedContext(OAbstractPaginatedStorage storage) { - if (OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName()) - || getPlugin() == null - || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new OSharedContextEmbedded(storage, this); } return new OSharedContextDistributed(storage, this); } protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage storage) { - if (OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName()) - || getPlugin() == null - || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new ODatabaseDocumentEmbedded(storage); } + if (plugin == null) { + throw new OOfflineNodeException("Distributed plugin is not active"); + } plugin.registerNewDatabaseIfNeeded( storage.getName(), plugin.getDatabaseConfiguration(storage.getName())); return new ODatabaseDocumentDistributed(plugin.getStorage(storage.getName(), storage), plugin); @@ -77,11 +92,12 @@ protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage protected ODatabaseDocumentEmbedded newPooledSessionInstance( ODatabasePoolInternal pool, OAbstractPaginatedStorage storage) { - if (OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName()) - || getPlugin() == null - || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new ODatabaseDocumentEmbeddedPooled(pool, storage); } + if (plugin == null) { + throw new OOfflineNodeException("Distributed plugin is not active"); + } plugin.registerNewDatabaseIfNeeded( storage.getName(), plugin.getDatabaseConfiguration(storage.getName())); return new ODatabaseDocumentDistributedPooled( @@ -163,8 +179,22 @@ public OStorage fullSync(String dbName, InputStream backupStream, OrientDBConfig @Override public ODatabaseDocumentInternal poolOpen( String name, String user, String password, ODatabasePoolInternal pool) { - ODatabaseDocumentInternal session = super.poolOpen(name, user, password, pool); - return session; + checkDbAvailable(name); + return super.poolOpen(name, user, password, pool); + } + + @Override + public ODatabasePoolInternal openPool( + String name, String user, String password, OrientDBConfig config) { + checkDbAvailable(name); + return super.openPool(name, user, password, config); + } + + @Override + public ODatabasePoolInternal cachedPool( + String name, String user, String password, OrientDBConfig config) { + checkDbAvailable(name); + return super.cachedPool(name, user, password, config); } @Override @@ -204,10 +234,14 @@ public void drop(String name, String user, String password) { } private void checkDbAvailable(String name) { - if (getPlugin() == null || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(name)) { return; } - if (OSystemDatabase.SYSTEM_DB_NAME.equals(name)) return; + if (getPlugin() == null || !getPlugin().isRunning()) { + // The configuration specifies distributed mode, but the distributed plugin has not + // started yet (and a client has requested database access before the server is up) + throw new OOfflineNodeException("Distributed server is not yet ONLINE"); + } ODistributedServerManager.DB_STATUS dbStatus = plugin.getDatabaseStatus(plugin.getLocalNodeName(), name); if (dbStatus != ODistributedServerManager.DB_STATUS.ONLINE @@ -218,16 +252,22 @@ private void checkDbAvailable(String name) { } @Override - public ODatabaseDocumentInternal open(String name, String user, String password) { + public ODatabaseDocumentInternal open( + String name, String user, String password, OrientDBConfig config) { checkDbAvailable(name); - return super.open(name, user, password); + return super.open(name, user, password, config); } @Override - public ODatabaseDocumentInternal open( - String name, String user, String password, OrientDBConfig config) { + public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { checkDbAvailable(name); - return super.open(name, user, password, config); + return super.openNoAuthenticate(name, user); + } + + @Override + public ODatabaseDocumentEmbedded openNoAuthorization(String name) { + checkDbAvailable(name); + return super.openNoAuthorization(name); } @Override diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java index acbd11e9347..c9c1b507156 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java @@ -20,6 +20,8 @@ package com.orientechnologies.orient.server.distributed.impl; import static com.orientechnologies.orient.core.config.OGlobalConfiguration.DISTRIBUTED_MAX_STARTUP_DELAY; +import static com.orientechnologies.orient.server.distributed.ODistributedServerManager.NODE_STATUS.OFFLINE; +import static com.orientechnologies.orient.server.distributed.ODistributedServerManager.NODE_STATUS.SHUTTINGDOWN; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.HazelcastInstanceNotActiveException; @@ -1006,7 +1008,7 @@ public boolean installDatabase( // DON'T REPLICATE SYSTEM BECAUSE IS DIFFERENT AND PER SERVER return false; - if (installingDatabases.contains(databaseName)) { + if (!installingDatabases.add(databaseName)) { return false; } @@ -1014,7 +1016,6 @@ public boolean installDatabase( messageService.registerDatabase(databaseName, null); try { - installingDatabases.add(databaseName); return executeInDistributedDatabaseLock( databaseName, 20000, @@ -1978,6 +1979,10 @@ public T executeInDistributedDatabaseLock( OModifiableDistributedConfiguration lastCfg, final OCallable iCallback) { + if (getNodeStatus() == SHUTTINGDOWN || getNodeStatus() == OFFLINE) { + throw new OOfflineNodeException( + String.format("Node %s is shutting down or offline.", nodeName)); + } boolean updated; T result; getLockManagerExecutor().acquireExclusiveLock(databaseName, nodeName, timeoutLocking); diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java index 14fe9d1db82..28f2931724b 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java @@ -31,6 +31,7 @@ import com.orientechnologies.common.profiler.OAbstractProfiler; import com.orientechnologies.common.profiler.OProfiler; import com.orientechnologies.common.thread.OThreadPoolExecutors; +import com.orientechnologies.common.thread.TracingExecutorService; import com.orientechnologies.common.util.OCallable; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; @@ -44,23 +45,9 @@ import com.orientechnologies.orient.core.tx.OTransactionSequenceStatus; import com.orientechnologies.orient.core.tx.OTxMetadataHolder; import com.orientechnologies.orient.core.tx.ValidationResult; -import com.orientechnologies.orient.server.OServer; import com.orientechnologies.orient.server.OSystemDatabase; -import com.orientechnologies.orient.server.distributed.ODistributedConfiguration; -import com.orientechnologies.orient.server.distributed.ODistributedDatabase; -import com.orientechnologies.orient.server.distributed.ODistributedException; -import com.orientechnologies.orient.server.distributed.ODistributedRequest; -import com.orientechnologies.orient.server.distributed.ODistributedRequestId; -import com.orientechnologies.orient.server.distributed.ODistributedResponse; -import com.orientechnologies.orient.server.distributed.ODistributedResponseManager; -import com.orientechnologies.orient.server.distributed.ODistributedResponseManagerImpl; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog; +import com.orientechnologies.orient.server.distributed.*; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import com.orientechnologies.orient.server.distributed.ODistributedSyncConfiguration; -import com.orientechnologies.orient.server.distributed.ODistributedTxContext; -import com.orientechnologies.orient.server.distributed.OModifiableDistributedConfiguration; -import com.orientechnologies.orient.server.distributed.ORemoteServerController; import com.orientechnologies.orient.server.distributed.impl.lock.OFreezeGuard; import com.orientechnologies.orient.server.distributed.impl.lock.OLockGuard; import com.orientechnologies.orient.server.distributed.impl.lock.OLockManager; @@ -74,16 +61,7 @@ import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.SortedSet; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -107,8 +85,8 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase { protected Map activeTxContexts = new ConcurrentHashMap<>(64); - private AtomicLong totalSentRequests = new AtomicLong(); - private AtomicLong totalReceivedRequests = new AtomicLong(); + private final AtomicLong totalSentRequests = new AtomicLong(); + private final AtomicLong totalReceivedRequests = new AtomicLong(); private TimerTask txTimeoutTask = null; private volatile boolean running = true; private volatile boolean parsing = true; @@ -116,12 +94,12 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase { private final String localNodeName; private final OSimpleLockManager recordLockManager; private final OSimpleLockManager indexKeyLockManager; - private AtomicLong operationsRunnig = new AtomicLong(0); - private ODistributedSynchronizedSequence sequenceManager; + private final AtomicLong operationsRunnig = new AtomicLong(0); + private final ODistributedSynchronizedSequence sequenceManager; private final AtomicLong pending = new AtomicLong(); - private ThreadPoolExecutor requestExecutor; - private OLockManager lockManager = new OLockManagerImpl(); - private Set inQueue = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final TracingExecutorService requestExecutor; + private final OLockManager lockManager = new OLockManagerImpl(); + private final Set inQueue = Collections.newSetFromMap(new ConcurrentHashMap<>()); private OFreezeGuard freezeGuard; public static boolean sendResponseBack( @@ -186,34 +164,50 @@ public void endOperation() { operationsRunnig.decrementAndGet(); } - public ODistributedDatabaseImpl( + ODistributedDatabaseImpl( final OHazelcastPlugin manager, final ODistributedMessageServiceImpl msgService, - final String iDatabaseName, - final ODistributedConfiguration cfg, - OServer server) { + final String iDatabaseName) { this.manager = manager; this.msgService = msgService; this.databaseName = iDatabaseName; this.localNodeName = manager.getLocalNodeName(); - // SELF REGISTERING ITSELF HERE BECAUSE IT'S NEEDED FURTHER IN THE CALL CHAIN - final ODistributedDatabaseImpl prev = msgService.databases.put(iDatabaseName, this); - if (prev != null) { - // KILL THE PREVIOUS ONE - prev.shutdown(); - } - - startAcceptingRequests(); + this.requestExecutor = + OThreadPoolExecutors.newScalingThreadPool( + String.format( + "OrientDB DistributedWorker node=%s db=%s", getLocalNodeName(), databaseName), + 0, + calculateWorkers(manager.getManagedDatabases().size()), + 0, + 1, + TimeUnit.HOURS); if (iDatabaseName.equals(OSystemDatabase.SYSTEM_DB_NAME)) { recordLockManager = null; indexKeyLockManager = null; - return; + sequenceManager = null; + } else { + long timeout = + manager + .getServerInstance() + .getContextConfiguration() + .getValueAsLong(DISTRIBUTED_ATOMIC_LOCK_TIMEOUT); + int sequenceSize = + manager + .getServerInstance() + .getContextConfiguration() + .getValueAsInteger(DISTRIBUTED_TRANSACTION_SEQUENCE_SET_SIZE); + recordLockManager = new OSimpleLockManagerImpl<>(timeout); + indexKeyLockManager = new OSimpleLockManagerImpl<>(timeout); + sequenceManager = new ODistributedSynchronizedSequence(localNodeName, sequenceSize); + + startTxTimeoutTimerTask(); + registerHooks(); } + } - startTxTimeoutTimerTask(); - + private void registerHooks() { Orient.instance() .getProfiler() .registerHookValue( @@ -265,7 +259,7 @@ public Object getValue() { new OAbstractProfiler.OProfilerHookValue() { @Override public Object getValue() { - return (long) requestExecutor.getPoolSize(); + return (long) ((ThreadPoolExecutor) requestExecutor).getPoolSize(); } }, "distributed.db.*.workerThreads"); @@ -283,20 +277,6 @@ public Object getValue() { } }, "distributed.db.*.recordLocks"); - - long timeout = - manager - .getServerInstance() - .getContextConfiguration() - .getValueAsLong(DISTRIBUTED_ATOMIC_LOCK_TIMEOUT); - int sequenceSize = - manager - .getServerInstance() - .getContextConfiguration() - .getValueAsInteger(DISTRIBUTED_TRANSACTION_SEQUENCE_SET_SIZE); - recordLockManager = new OSimpleLockManagerImpl<>(timeout); - indexKeyLockManager = new OSimpleLockManagerImpl<>(timeout); - sequenceManager = new ODistributedSynchronizedSequence(localNodeName, sequenceSize); } @Override @@ -326,6 +306,7 @@ public void reEnqueue( pending.incrementAndGet(); Orient.instance() .scheduleTask( + String.format("DistributedDatabase[%s].reEnqueue", databaseName), () -> { try { processRequest( @@ -364,6 +345,9 @@ public void processRequest( + "' discarding"); } } + final String taskName = + String.format( + "DistributedDatabase[%s].processRequest.%s", getDatabaseName(), task.getName()); synchronized (this) { task.received(request, this); manager.messageReceived(request); @@ -385,7 +369,7 @@ public void processRequest( } }; try { - this.requestExecutor.submit(executeTask); + this.requestExecutor.submit(taskName, executeTask); } catch (RejectedExecutionException e) { task.finished(this); this.lockManager.unlock(guards); @@ -401,6 +385,7 @@ public void processRequest( } else { try { this.requestExecutor.submit( + taskName, () -> { execute(request); }); @@ -903,9 +888,7 @@ public String getDatabaseName() { @Override public ODatabaseDocumentInternal getDatabaseInstance() { - return manager - .getServerInstance() - .openDatabase(databaseName, "internal", "internal", null, true); + return manager.getServerInstance().openDatabase(databaseName); } @Override @@ -915,7 +898,7 @@ public long getReceivedRequests() { @Override public long getProcessedRequests() { - return requestExecutor.getCompletedTaskCount(); + return ((ThreadPoolExecutor) requestExecutor).getCompletedTaskCount(); } public void onDropShutdown() { @@ -927,7 +910,10 @@ public void shutdown() { shutdown(true); } - public void shutdown(boolean wait) { + public synchronized void shutdown(boolean wait) { + if (!running) { + return; + } waitPending(); running = false; @@ -1165,15 +1151,14 @@ protected String getLocalNodeName() { return localNodeName; } - private void startAcceptingRequests() { - // START ALL THE WORKER THREADS (CONFIGURABLE) + private static int calculateWorkers(int managedDbCount) { int totalWorkers = OGlobalConfiguration.DISTRIBUTED_DB_WORKERTHREADS.getValueAsInteger(); if (totalWorkers < 0) throw new ODistributedException( "Cannot create configured distributed workers (" + totalWorkers + ")"); else if (totalWorkers == 0) { // AUTOMATIC - final int totalDatabases = manager.getManagedDatabases().size() + 1; + final int totalDatabases = managedDbCount + 1; final int cpus = Runtime.getRuntime().availableProcessors(); @@ -1181,19 +1166,7 @@ else if (totalWorkers == 0) { if (totalWorkers == 0) totalWorkers = 1; } - - synchronized (this) { - this.requestExecutor = - (ThreadPoolExecutor) - OThreadPoolExecutors.newScalingThreadPool( - String.format( - "OrientDB DistributedWorker node=%s db=%s", getLocalNodeName(), databaseName), - 0, - totalWorkers, - 0, - 1, - TimeUnit.HOURS); - } + return totalWorkers; } @Override @@ -1346,7 +1319,7 @@ public String dump() { "\n\nDATABASE '" + databaseName + "' ON SERVER '" + manager.getLocalNodeName() + "'"); buffer.append("\n- MESSAGES IN QUEUES"); - buffer.append(" (" + (requestExecutor.getPoolSize()) + " WORKERS):"); + buffer.append(" (" + (((ThreadPoolExecutor) requestExecutor).getPoolSize()) + " WORKERS):"); return buffer.toString(); } diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java index bae840642bd..d3c2693d79d 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java @@ -57,7 +57,7 @@ public class ODistributedMessageServiceImpl implements ODistributedMessageServic private final ConcurrentHashMap responsesByRequestIds; private final TimerTask asynchMessageManager; protected final ConcurrentHashMap databases = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); private Thread responseThread; private long[] responseTimeMetrics = new long[10]; private volatile boolean running = true; @@ -153,11 +153,8 @@ public long getAverageResponseTime() { /** Creates a distributed database instance if not defined yet. */ public ODistributedDatabaseImpl registerDatabase( final String iDatabaseName, ODistributedConfiguration cfg) { - final ODistributedDatabaseImpl ddb = databases.get(iDatabaseName); - if (ddb != null) return ddb; - - return new ODistributedDatabaseImpl( - manager, this, iDatabaseName, cfg, manager.getServerInstance()); + return databases.computeIfAbsent( + iDatabaseName, db -> new ODistributedDatabaseImpl(manager, this, iDatabaseName)); } public ODistributedDatabaseImpl unregisterDatabase(final String iDatabaseName) { diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java index daf5125b0a4..599a09b143b 100644 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java @@ -147,7 +147,8 @@ public Object execute( OLogManager.instance() .info( OTransactionPhase2Task.this, - "Received second phase but not yet first phase, re-enqueue second phase"); + "Received second phase but not yet first phase for commit tx:%s, re-enqueue second phase", + firstPhaseId); ((ODatabaseDocumentDistributed) database) .getStorageDistributed() .getLocalDistributedDatabase() diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java index 8c06898ece1..ed80b8355bf 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java @@ -257,7 +257,7 @@ public static Object replaceCluster( ODatabaseDocumentInternal db = ODatabaseRecordThreadLocal.instance().getIfDefined(); final boolean openDatabaseHere = db == null; - if (db == null) db = serverInstance.openDatabase("plocal:" + dbPath, "", "", null, true); + if (db == null) db = serverInstance.openDatabase("plocal:" + dbPath); try { diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java index aea40e635bc..dd66af2809d 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java @@ -44,6 +44,7 @@ import com.orientechnologies.common.io.OUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.common.parser.OSystemVariableResolver; +import com.orientechnologies.common.thread.OThreadPoolExecutors; import com.orientechnologies.common.util.OCallable; import com.orientechnologies.common.util.OCallableNoParamNoReturn; import com.orientechnologies.common.util.OCallableUtils; @@ -163,7 +164,17 @@ public void config(final OServer iServer, final OServerParameterConfiguration[] public void startup() { if (!enabled) return; - running = true; + try { + final String delayEnv = System.getenv("HAZELCAST_PLUGIN_STARTUP_DELAY"); + if (delayEnv != null) { + long delay = Long.parseLong(delayEnv); + OLogManager.instance().info(this, "Delaying HazelcastPlugin startup by '%d' ms", delay); + Thread.sleep(delay); + } + } catch (Throwable t) { + t.printStackTrace(); + } + if (serverInstance.getDatabases() instanceof OrientDBDistributed) ((OrientDBDistributed) serverInstance.getDatabases()).setPlugin(this); @@ -312,17 +323,6 @@ public void startup() { publishLocalNodeConfiguration(); - new Thread( - () -> { - try { - installNewDatabasesFromCluster(); - loadLocalDatabases(); - } finally { - serverStarted.countDown(); - } - }) - .start(); - membershipListenerMapRegistration = configurationMap.getHazelcastMap().addEntryListener(this, true); membershipListenerRegistration = hazelcastInstance.getCluster().addMembershipListener(this); @@ -372,6 +372,20 @@ public void onSignal(final Signal signal) { new ODistributedStartupException("Error on starting distributed plugin"), e); } + running = true; + OThreadPoolExecutors.executeUnbound( + () -> { + try { + installNewDatabasesFromCluster(); + + // FIXME: installNewDatabasesFromCluster catches exceptions, and loadLocalDatabases + // fails with same exception + loadLocalDatabases(); + } finally { + serverStarted.countDown(); + } + }, + "OHazelcastPlugin InstallDBs"); dumpServersStatus(); } diff --git a/server/src/main/java/com/orientechnologies/orient/server/OServer.java b/server/src/main/java/com/orientechnologies/orient/server/OServer.java index 19e0958573a..f3c36b00a9d 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/OServer.java +++ b/server/src/main/java/com/orientechnologies/orient/server/OServer.java @@ -108,6 +108,9 @@ public class OServer { protected OServerPluginManager pluginManager; protected OConfigurableHooksManager hookManager; protected ODistributedServerManager distributedManager; + /** Whether a distributed plugin is registered in the configuration */ + private volatile boolean distributedPluginEnabled; + protected OServerSecurity serverSecurity; private SecureRandom random = new SecureRandom(); private Map variables = new HashMap(); @@ -446,6 +449,15 @@ public OServer activate() for (OServerLifecycleListener l : lifecycleListeners) l.onBeforeActivate(); + // Load server plugin config and determine if participants in activation + // need to be aware if distributed plugin will be part of server + final List serverPlugins = loadServerPlugins(); + for (ServerPluginConfig serverPlugin : serverPlugins) { + if (ODistributedServerManager.class.isAssignableFrom(serverPlugin.clazz)) { + distributedPluginEnabled = true; + } + } + final OServerConfiguration configuration = serverCfg.getConfiguration(); tokenHandler = new OTokenHandlerImpl(this); @@ -497,7 +509,7 @@ public OServer activate() throw OException.wrapException(new OConfigurationException(message), e); } - registerPlugins(); + registerPlugins(serverPlugins); for (OServerLifecycleListener l : lifecycleListeners) l.onAfterActivate(); @@ -965,7 +977,7 @@ public ODatabaseDocumentInternal openDatabase( // TODO: final String path = getStoragePath(iDbUrl); it use to resolve the path in some way boolean serverAuth = false; if (iBypassAccess) { - database = databases.openNoAuthenticate(iDbUrl, user); + database = databases.openInternal(iDbUrl, user); serverAuth = true; } else { OServerUserConfiguration serverUser = serverLogin(user, password, "database.passthrough"); @@ -1000,6 +1012,10 @@ public ODatabaseDocumentInternal openDatabase(String database) { return openDatabase(database, "internal", "internal", null, true); } + public boolean isDistributedPluginEnabled() { + return distributedPluginEnabled; + } + public ODistributedServerManager getDistributedManager() { return distributedManager; } @@ -1203,21 +1219,23 @@ public OServerPluginManager getPluginManager() { return pluginManager; } - protected void registerPlugins() - throws InstantiationException, IllegalAccessException, ClassNotFoundException { - pluginManager = new OServerPluginManager(); - pluginManager.config(this); - pluginManager.startup(); + private static class ServerPluginConfig { + private final Class clazz; + private OServerParameterConfiguration[] parameters; - if (serverSecurity != null) serverSecurity.onAfterDynamicPlugins(); + private ServerPluginConfig(Class clazz, OServerParameterConfiguration[] parameters) { + this.clazz = clazz; + this.parameters = parameters; + } + } + + private List loadServerPlugins() throws ClassNotFoundException { + // PLUGINS CONFIGURED IN XML and enabled + List plugins = new ArrayList<>(); - // PLUGINS CONFIGURED IN XML final OServerConfiguration configuration = serverCfg.getConfiguration(); if (configuration.handlers != null) { - // ACTIVATE PLUGINS - final List plugins = new ArrayList(); - for (OServerHandlerConfiguration h : configuration.handlers) { if (h.parameters != null) { // CHECK IF IT'S ENABLED @@ -1244,27 +1262,43 @@ protected void registerPlugins() continue; } - final OServerPlugin plugin = (OServerPlugin) loadClass(h.clazz).newInstance(); + Class pluginClass = loadClass(h.clazz); + plugins.add(new ServerPluginConfig(pluginClass, h.parameters)); + } + } + return plugins; + } + + protected void registerPlugins(List serverPlugins) + throws InstantiationException, IllegalAccessException, ClassNotFoundException { + pluginManager = new OServerPluginManager(); + pluginManager.config(this); + pluginManager.startup(); - if (plugin instanceof ODistributedServerManager) - distributedManager = (ODistributedServerManager) plugin; + if (serverSecurity != null) serverSecurity.onAfterDynamicPlugins(); - pluginManager.registerPlugin( - new OServerPluginInfo(plugin.getName(), null, null, null, plugin, null, 0, null)); + final List plugins = new ArrayList<>(); + for (ServerPluginConfig serverPlugin : serverPlugins) { + final OServerPlugin plugin = (OServerPlugin) serverPlugin.clazz.newInstance(); - pluginManager.callListenerBeforeConfig(plugin, h.parameters); - plugin.config(this, h.parameters); - pluginManager.callListenerAfterConfig(plugin, h.parameters); + if (plugin instanceof ODistributedServerManager) + distributedManager = (ODistributedServerManager) plugin; - plugins.add(plugin); - } + pluginManager.registerPlugin( + new OServerPluginInfo(plugin.getName(), null, null, null, plugin, null, 0, null)); - // START ALL THE CONFIGURED PLUGINS - for (OServerPlugin plugin : plugins) { - pluginManager.callListenerBeforeStartup(plugin); - plugin.startup(); - pluginManager.callListenerAfterStartup(plugin); - } + pluginManager.callListenerBeforeConfig(plugin, serverPlugin.parameters); + plugin.config(this, serverPlugin.parameters); + pluginManager.callListenerAfterConfig(plugin, serverPlugin.parameters); + + plugins.add(plugin); + } + + // START ALL THE CONFIGURED PLUGINS + for (OServerPlugin plugin : plugins) { + pluginManager.callListenerBeforeStartup(plugin); + plugin.startup(); + pluginManager.callListenerAfterStartup(plugin); } } diff --git a/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java b/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java index 74dbcc52113..43b6c70e6cb 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java +++ b/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java @@ -183,7 +183,7 @@ public void run() { if (include) { ODatabaseDocumentInternal db = null; try { - db = serverInstance.openDatabase(dbName, null, null, null, true); + db = serverInstance.openDatabase(dbName); final long begin = System.currentTimeMillis();