From 1694e6fcbafd726b1815db39eada928349358ed5 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:01:25 +1200 Subject: [PATCH 01/25] Detect distributed setup early in activate process. Split loading of enabled server plugins and starting of plugins to allow presence of a distributed server manager to be detected prior to network listeners being established and storage open. This allows guarding of constructs that require the distributed plugin to be present and running, which currently experience a race condition between the network listeners starting and the distributed plugin fully starting. --- .../orient/server/OServer.java | 88 +++++++++++++------ 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/com/orientechnologies/orient/server/OServer.java b/server/src/main/java/com/orientechnologies/orient/server/OServer.java index 19e0958573a..76738c73d6e 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/OServer.java +++ b/server/src/main/java/com/orientechnologies/orient/server/OServer.java @@ -108,6 +108,9 @@ public class OServer { protected OServerPluginManager pluginManager; protected OConfigurableHooksManager hookManager; protected ODistributedServerManager distributedManager; + /** Whether a distributed plugin is registered in the configuration */ + private volatile boolean distributedPluginEnabled; + protected OServerSecurity serverSecurity; private SecureRandom random = new SecureRandom(); private Map variables = new HashMap(); @@ -446,6 +449,15 @@ public OServer activate() for (OServerLifecycleListener l : lifecycleListeners) l.onBeforeActivate(); + // Load server plugin config and determine if participants in activation + // need to be aware if distributed plugin will be part of server + final List serverPlugins = loadServerPlugins(); + for (ServerPluginConfig serverPlugin : serverPlugins) { + if (ODistributedServerManager.class.isAssignableFrom(serverPlugin.clazz)) { + distributedPluginEnabled = true; + } + } + final OServerConfiguration configuration = serverCfg.getConfiguration(); tokenHandler = new OTokenHandlerImpl(this); @@ -497,7 +509,7 @@ public OServer activate() throw OException.wrapException(new OConfigurationException(message), e); } - registerPlugins(); + registerPlugins(serverPlugins); for (OServerLifecycleListener l : lifecycleListeners) l.onAfterActivate(); @@ -1000,6 +1012,10 @@ public ODatabaseDocumentInternal openDatabase(String database) { return openDatabase(database, "internal", "internal", null, true); } + public boolean isDistributedPluginEnabled() { + return distributedPluginEnabled; + } + public ODistributedServerManager getDistributedManager() { return distributedManager; } @@ -1203,21 +1219,23 @@ public OServerPluginManager getPluginManager() { return pluginManager; } - protected void registerPlugins() - throws InstantiationException, IllegalAccessException, ClassNotFoundException { - pluginManager = new OServerPluginManager(); - pluginManager.config(this); - pluginManager.startup(); + private static class ServerPluginConfig { + private final Class clazz; + private OServerParameterConfiguration[] parameters; - if (serverSecurity != null) serverSecurity.onAfterDynamicPlugins(); + private ServerPluginConfig(Class clazz, OServerParameterConfiguration[] parameters) { + this.clazz = clazz; + this.parameters = parameters; + } + } + + private List loadServerPlugins() throws ClassNotFoundException { + // PLUGINS CONFIGURED IN XML and enabled + List plugins = new ArrayList<>(); - // PLUGINS CONFIGURED IN XML final OServerConfiguration configuration = serverCfg.getConfiguration(); if (configuration.handlers != null) { - // ACTIVATE PLUGINS - final List plugins = new ArrayList(); - for (OServerHandlerConfiguration h : configuration.handlers) { if (h.parameters != null) { // CHECK IF IT'S ENABLED @@ -1244,27 +1262,43 @@ protected void registerPlugins() continue; } - final OServerPlugin plugin = (OServerPlugin) loadClass(h.clazz).newInstance(); + Class pluginClass = loadClass(h.clazz); + plugins.add(new ServerPluginConfig(pluginClass, h.parameters)); + } + } + return plugins; + } + + protected void registerPlugins(List serverPlugins) + throws InstantiationException, IllegalAccessException, ClassNotFoundException { + pluginManager = new OServerPluginManager(); + pluginManager.config(this); + pluginManager.startup(); - if (plugin instanceof ODistributedServerManager) - distributedManager = (ODistributedServerManager) plugin; + if (serverSecurity != null) serverSecurity.onAfterDynamicPlugins(); - pluginManager.registerPlugin( - new OServerPluginInfo(plugin.getName(), null, null, null, plugin, null, 0, null)); + final List plugins = new ArrayList<>(); + for (ServerPluginConfig serverPlugin : serverPlugins) { + final OServerPlugin plugin = (OServerPlugin) serverPlugin.clazz.newInstance(); - pluginManager.callListenerBeforeConfig(plugin, h.parameters); - plugin.config(this, h.parameters); - pluginManager.callListenerAfterConfig(plugin, h.parameters); + if (plugin instanceof ODistributedServerManager) + distributedManager = (ODistributedServerManager) plugin; - plugins.add(plugin); - } + pluginManager.registerPlugin( + new OServerPluginInfo(plugin.getName(), null, null, null, plugin, null, 0, null)); - // START ALL THE CONFIGURED PLUGINS - for (OServerPlugin plugin : plugins) { - pluginManager.callListenerBeforeStartup(plugin); - plugin.startup(); - pluginManager.callListenerAfterStartup(plugin); - } + pluginManager.callListenerBeforeConfig(plugin, serverPlugin.parameters); + plugin.config(this, serverPlugin.parameters); + pluginManager.callListenerAfterConfig(plugin, serverPlugin.parameters); + + plugins.add(plugin); + } + + // START ALL THE CONFIGURED PLUGINS + for (OServerPlugin plugin : plugins) { + pluginManager.callListenerBeforeStartup(plugin); + plugin.startup(); + pluginManager.callListenerAfterStartup(plugin); } } From 392c94b7f55c0e508d75a49c520581520216d94a Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:49:11 +1200 Subject: [PATCH 02/25] Minor consistency fixes in embedded DB input validation. --- .../orientechnologies/orient/core/db/OrientDBEmbedded.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index ed7d3dff59c..febb15b7f47 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -539,6 +539,7 @@ public ODatabaseDocumentInternal poolOpen( String name, String user, String password, ODatabasePoolInternal pool) { final ODatabaseDocumentEmbedded embedded; synchronized (this) { + checkDatabaseName(name); checkOpen(); OAbstractPaginatedStorage storage = getAndOpenStorage(name, pool.getConfig()); embedded = newPooledSessionInstance(pool, storage); @@ -810,10 +811,8 @@ public synchronized boolean exists(String name, String user, String password) { @Override public void drop(String name, String user, String password) { - synchronized (this) { - checkOpen(); - } checkDatabaseName(name); + checkOpen(); ODatabaseDocumentInternal current = ODatabaseRecordThreadLocal.instance().getIfDefined(); try { ODatabaseDocumentInternal db = openNoAuthenticate(name, user); From 06cf4748df08b0e37715204c3620ef30ff2da8b4 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:51:10 +1200 Subject: [PATCH 03/25] Factor embedded DB open methods into single core implementation --- .../orient/core/db/OrientDBEmbedded.java | 74 ++++++------------- 1 file changed, 23 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index febb15b7f47..db65f928f31 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -414,25 +414,23 @@ private long calculateDoubleWriteLogMaxSegSize(Path storagePath) throws IOExcept return maxSegSize; } - @Override - public ODatabaseDocumentInternal open(String name, String user, String password) { - return open(name, user, password, null); - } - - public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { + private ODatabaseDocumentEmbedded doOpen( + String name, String user, String password, boolean checkPassword, OrientDBConfig config) { checkDatabaseName(name); + checkOpen(); try { final ODatabaseDocumentEmbedded embedded; - OrientDBConfig config = solveConfig(null); + config = solveConfig(config); synchronized (this) { - checkOpen(); OAbstractPaginatedStorage storage = getAndOpenStorage(name, config); storage.incOnOpen(); embedded = newSessionInstance(storage); embedded.init(config, getOrCreateSharedContext(storage)); } embedded.rebuildIndexes(); - embedded.internalOpen(user, "nopwd", false); + if (user != null) { + embedded.internalOpen(user, password, checkPassword); + } embedded.callOnOpenListeners(); return embedded; } catch (Exception e) { @@ -441,55 +439,29 @@ public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { } } - protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage storage) { - return new ODatabaseDocumentEmbedded(storage); - } - - public ODatabaseDocumentEmbedded openNoAuthorization(String name) { - checkDatabaseName(name); - try { - final ODatabaseDocumentEmbedded embedded; - OrientDBConfig config = solveConfig(null); - synchronized (this) { - checkOpen(); - OAbstractPaginatedStorage storage = getAndOpenStorage(name, config); - storage.incOnOpen(); - embedded = newSessionInstance(storage); - embedded.init(config, getOrCreateSharedContext(storage)); - } - embedded.rebuildIndexes(); - embedded.callOnOpenListeners(); - return embedded; - } catch (Exception e) { - throw OException.wrapException( - new ODatabaseException("Cannot open database '" + name + "'"), e); - } + @Override + public final ODatabaseDocumentInternal open(String name, String user, String password) { + return open(name, user, password, null); } @Override public ODatabaseDocumentInternal open( String name, String user, String password, OrientDBConfig config) { - checkDatabaseName(name); checkDefaultPassword(name, user, password); - try { - final ODatabaseDocumentEmbedded embedded; - synchronized (this) { - checkOpen(); - config = solveConfig(config); - OAbstractPaginatedStorage storage = getAndOpenStorage(name, config); + return doOpen(name, user, password, true, config); + } - embedded = newSessionInstance(storage); - embedded.init(config, getOrCreateSharedContext(storage)); - storage.incOnOpen(); - } - embedded.rebuildIndexes(); - embedded.internalOpen(user, password); - embedded.callOnOpenListeners(); - return embedded; - } catch (Exception e) { - throw OException.wrapException( - new ODatabaseException("Cannot open database '" + name + "'"), e); - } + public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { + return doOpen(name, user, null, false, null); + } + + @Override + public ODatabaseDocumentEmbedded openNoAuthorization(String name) { + return doOpen(name, null, null, false, null); + } + + protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage storage) { + return new ODatabaseDocumentEmbedded(storage); } private OAbstractPaginatedStorage getAndOpenStorage(String name, OrientDBConfig config) { From c6f96fd019aa550290acecab3c7ec0628c747df5 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:52:35 +1200 Subject: [PATCH 04/25] Add distinct openInternal operation that bypasses online checks. The current usages of openNoAuthenticate include cases (like DB delta/full syncs) that need to bypass not only auth checks but distributed online status. --- .../com/orientechnologies/orient/core/db/OrientDBRemote.java | 5 +++++ .../orientechnologies/orient/core/db/OrientDBEmbedded.java | 5 +++++ .../orientechnologies/orient/core/db/OrientDBInternal.java | 2 ++ 3 files changed, 12 insertions(+) diff --git a/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java b/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java index f5c4ec42fb4..8e945113ea4 100755 --- a/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java +++ b/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java @@ -445,6 +445,11 @@ public void loadAllDatabases() { // In remote does nothing } + @Override + public ODatabaseDocumentInternal openInternal(String iDbUrl, String user) { + throw new UnsupportedOperationException("Open for internal use is not supported in remote"); + } + @Override public ODatabaseDocumentInternal openNoAuthenticate(String iDbUrl, String user) { throw new UnsupportedOperationException( diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index db65f928f31..752818d1c05 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -451,6 +451,11 @@ public ODatabaseDocumentInternal open( return doOpen(name, user, password, true, config); } + @Override + public final ODatabaseDocumentEmbedded openInternal(String name, String user) { + return doOpen(name, user, null, false, null); + } + public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { return doOpen(name, user, null, false, null); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java index 8b1f35cd5cc..09877ee7b3a 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBInternal.java @@ -288,6 +288,8 @@ static OrientDBInternal extract(OrientDB orientDB) { return orientDB.internal; } + ODatabaseDocumentInternal openInternal(String iDbUrl, String user); + ODatabaseDocumentInternal openNoAuthenticate(String iDbUrl, String user); ODatabaseDocumentInternal openNoAuthorization(String name); From 3b88eca8ac580d898eb4f830881cbbe7010e9a34 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:53:30 +1200 Subject: [PATCH 05/25] Fix database online checks for distributed database opens. --- .../orient/core/db/OrientDBEmbedded.java | 4 +- .../orient/core/db/OrientDBDistributed.java | 67 ++++++++++++++----- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index 752818d1c05..f6319fab53f 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -849,7 +849,7 @@ public synchronized void loadAllDatabases() { } } - public ODatabasePoolInternal openPool(String name, String user, String password) { + public final ODatabasePoolInternal openPool(String name, String user, String password) { return openPool(name, user, password, null); } @@ -864,7 +864,7 @@ public ODatabasePoolInternal openPool( } @Override - public ODatabasePoolInternal cachedPool(String database, String user, String password) { + public final ODatabasePoolInternal cachedPool(String database, String user, String password) { return cachedPool(database, user, password, null); } diff --git a/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java b/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java index 544aa508ae8..2f306dba7be 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java +++ b/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java @@ -48,6 +48,13 @@ public void init(OServer server) { this.server = server; } + private boolean isDistributedEnabled() { + if (server == null) { + return false; + } + return server.isDistributedPluginEnabled(); + } + public synchronized OHazelcastPlugin getPlugin() { if (plugin == null) { if (server != null && server.isActive()) plugin = server.getPlugin("cluster"); @@ -56,20 +63,19 @@ public synchronized OHazelcastPlugin getPlugin() { } protected OSharedContext createSharedContext(OAbstractPaginatedStorage storage) { - if (OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName()) - || getPlugin() == null - || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new OSharedContextEmbedded(storage, this); } return new OSharedContextDistributed(storage, this); } protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage storage) { - if (OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName()) - || getPlugin() == null - || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new ODatabaseDocumentEmbedded(storage); } + if (plugin == null) { + throw new OOfflineNodeException("Distributed plugin is not active"); + } plugin.registerNewDatabaseIfNeeded( storage.getName(), plugin.getDatabaseConfiguration(storage.getName())); return new ODatabaseDocumentDistributed(plugin.getStorage(storage.getName(), storage), plugin); @@ -77,11 +83,12 @@ protected ODatabaseDocumentEmbedded newSessionInstance(OAbstractPaginatedStorage protected ODatabaseDocumentEmbedded newPooledSessionInstance( ODatabasePoolInternal pool, OAbstractPaginatedStorage storage) { - if (OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName()) - || getPlugin() == null - || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new ODatabaseDocumentEmbeddedPooled(pool, storage); } + if (plugin == null) { + throw new OOfflineNodeException("Distributed plugin is not active"); + } plugin.registerNewDatabaseIfNeeded( storage.getName(), plugin.getDatabaseConfiguration(storage.getName())); return new ODatabaseDocumentDistributedPooled( @@ -163,8 +170,22 @@ public OStorage fullSync(String dbName, InputStream backupStream, OrientDBConfig @Override public ODatabaseDocumentInternal poolOpen( String name, String user, String password, ODatabasePoolInternal pool) { - ODatabaseDocumentInternal session = super.poolOpen(name, user, password, pool); - return session; + checkDbAvailable(name); + return super.poolOpen(name, user, password, pool); + } + + @Override + public ODatabasePoolInternal openPool( + String name, String user, String password, OrientDBConfig config) { + checkDbAvailable(name); + return super.openPool(name, user, password, config); + } + + @Override + public ODatabasePoolInternal cachedPool( + String name, String user, String password, OrientDBConfig config) { + checkDbAvailable(name); + return super.cachedPool(name, user, password, config); } @Override @@ -204,10 +225,14 @@ public void drop(String name, String user, String password) { } private void checkDbAvailable(String name) { - if (getPlugin() == null || !getPlugin().isRunning()) { + if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(name)) { return; } - if (OSystemDatabase.SYSTEM_DB_NAME.equals(name)) return; + if (getPlugin() == null || !getPlugin().isRunning()) { + // The configuration specifies distributed mode, but the distributed plugin has not + // started yet (and a client has requested database access before the server is up) + throw new OOfflineNodeException("Distributed server is not yet ONLINE"); + } ODistributedServerManager.DB_STATUS dbStatus = plugin.getDatabaseStatus(plugin.getLocalNodeName(), name); if (dbStatus != ODistributedServerManager.DB_STATUS.ONLINE @@ -218,16 +243,22 @@ private void checkDbAvailable(String name) { } @Override - public ODatabaseDocumentInternal open(String name, String user, String password) { + public ODatabaseDocumentInternal open( + String name, String user, String password, OrientDBConfig config) { checkDbAvailable(name); - return super.open(name, user, password); + return super.open(name, user, password, config); } @Override - public ODatabaseDocumentInternal open( - String name, String user, String password, OrientDBConfig config) { + public ODatabaseDocumentEmbedded openNoAuthenticate(String name, String user) { checkDbAvailable(name); - return super.open(name, user, password, config); + return super.openNoAuthenticate(name, user); + } + + @Override + public ODatabaseDocumentEmbedded openNoAuthorization(String name) { + checkDbAvailable(name); + return super.openNoAuthorization(name); } @Override From 58083511d115b065214be9d0229567f99b4fe524 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:54:42 +1200 Subject: [PATCH 06/25] Use openInternal path for bypass access. --- .../main/java/com/orientechnologies/orient/server/OServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/com/orientechnologies/orient/server/OServer.java b/server/src/main/java/com/orientechnologies/orient/server/OServer.java index 76738c73d6e..f3c36b00a9d 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/OServer.java +++ b/server/src/main/java/com/orientechnologies/orient/server/OServer.java @@ -977,7 +977,7 @@ public ODatabaseDocumentInternal openDatabase( // TODO: final String path = getStoragePath(iDbUrl); it use to resolve the path in some way boolean serverAuth = false; if (iBypassAccess) { - database = databases.openNoAuthenticate(iDbUrl, user); + database = databases.openInternal(iDbUrl, user); serverAuth = true; } else { OServerUserConfiguration serverUser = serverLogin(user, password, "database.passthrough"); From 345c9b6a12b21cdfdadb4f93dd99cb550b1ba126 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 12:56:25 +1200 Subject: [PATCH 07/25] Use simpler but equivalent openDatabase calls --- .../server/distributed/impl/ODistributedDatabaseImpl.java | 4 +--- .../distributed/sql/OCommandExecutorSQLHASyncCluster.java | 2 +- .../orient/server/handler/OAutomaticBackup.java | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java index 14fe9d1db82..e8f9f7c035c 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java @@ -903,9 +903,7 @@ public String getDatabaseName() { @Override public ODatabaseDocumentInternal getDatabaseInstance() { - return manager - .getServerInstance() - .openDatabase(databaseName, "internal", "internal", null, true); + return manager.getServerInstance().openDatabase(databaseName); } @Override diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java index 8c06898ece1..ed80b8355bf 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/sql/OCommandExecutorSQLHASyncCluster.java @@ -257,7 +257,7 @@ public static Object replaceCluster( ODatabaseDocumentInternal db = ODatabaseRecordThreadLocal.instance().getIfDefined(); final boolean openDatabaseHere = db == null; - if (db == null) db = serverInstance.openDatabase("plocal:" + dbPath, "", "", null, true); + if (db == null) db = serverInstance.openDatabase("plocal:" + dbPath); try { diff --git a/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java b/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java index 74dbcc52113..43b6c70e6cb 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java +++ b/server/src/main/java/com/orientechnologies/orient/server/handler/OAutomaticBackup.java @@ -183,7 +183,7 @@ public void run() { if (include) { ODatabaseDocumentInternal db = null; try { - db = serverInstance.openDatabase(dbName, null, null, null, true); + db = serverInstance.openDatabase(dbName); final long begin = System.currentTimeMillis(); From 11427e7d5e70c4a4f1c1e358aacaca8d4cfcbc89 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:01:43 +1200 Subject: [PATCH 08/25] Add tracing support to executors. Errors in unbound tasks in executors that are launched from common points (e.g. OrientDBEmbedded#execute) are hard to trace. This change allows a task ID to be associated with each execution, which will be reported on any exception, and if debug logging is enabled, a full stack trace identifying the launching call site will be attached. --- .../OThreadPoolExecutorWithLogging.java | 78 ++++++++++++++++++- .../common/thread/OThreadPoolExecutors.java | 22 +++--- .../thread/OTracedExecutionException.java | 51 ++++++++++++ .../common/thread/TracingExecutorService.java | 16 ++++ 4 files changed, 155 insertions(+), 12 deletions(-) create mode 100755 core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java create mode 100644 core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java diff --git a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java index 200063dd62c..ef8eacf4a2f 100755 --- a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutorWithLogging.java @@ -2,6 +2,7 @@ import com.orientechnologies.common.log.OLogManager; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -14,7 +15,8 @@ * The same as thread {@link ThreadPoolExecutor} but also logs all exceptions happened inside of the * tasks which caused tasks to stop. */ -public class OThreadPoolExecutorWithLogging extends ThreadPoolExecutor { +public class OThreadPoolExecutorWithLogging extends ThreadPoolExecutor + implements TracingExecutorService { public OThreadPoolExecutorWithLogging( int corePoolSize, int maximumPoolSize, @@ -79,4 +81,78 @@ protected void afterExecute(Runnable r, Throwable t) { OLogManager.instance().errorNoDb(this, "Exception in thread '%s'", t, thread.getName()); } } + + @Override + public Future submit(String taskName, Callable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + return task.call(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task, T result) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }, + result); + } + + @Override + public void execute(String taskName, Runnable command) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + super.execute( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }); + } + + @Override + public Future submit(Runnable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Callable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Runnable task, T result) { + return submit(null, task, result); + } + + @Override + public void execute(Runnable command) { + execute(null, command); + } } diff --git a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java index 7ea40c3369b..204624a4289 100644 --- a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java @@ -6,7 +6,7 @@ public class OThreadPoolExecutors { private OThreadPoolExecutors() {} - public static ExecutorService newScalingThreadPool( + public static TracingExecutorService newScalingThreadPool( String threadName, int corePoolSize, int maxPoolSize, @@ -23,7 +23,7 @@ public static ExecutorService newScalingThreadPool( timeoutUnit); } - public static ExecutorService newScalingThreadPool( + public static TracingExecutorService newScalingThreadPool( String threadName, ThreadGroup parentThreadGroup, int corePoolSize, @@ -40,7 +40,7 @@ public static ExecutorService newScalingThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newBlockingScalingThreadPool( + public static TracingExecutorService newBlockingScalingThreadPool( String threadName, ThreadGroup parentThreadGroup, int corePoolSize, @@ -59,11 +59,11 @@ public static ExecutorService newBlockingScalingThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newFixedThreadPool(String threadName, int poolSize) { + public static TracingExecutorService newFixedThreadPool(String threadName, int poolSize) { return newFixedThreadPool(threadName, Thread.currentThread().getThreadGroup(), poolSize); } - public static ExecutorService newFixedThreadPool( + public static TracingExecutorService newFixedThreadPool( String threadName, ThreadGroup parentThreadGroup, int poolSize) { return new OThreadPoolExecutorWithLogging( poolSize, @@ -74,16 +74,16 @@ public static ExecutorService newFixedThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newCachedThreadPool(String threadName) { + public static TracingExecutorService newCachedThreadPool(String threadName) { return newCachedThreadPool(threadName, Thread.currentThread().getThreadGroup()); } - public static ExecutorService newCachedThreadPool( + public static TracingExecutorService newCachedThreadPool( String threadName, ThreadGroup parentThreadGroup) { return newCachedThreadPool(threadName, parentThreadGroup, Integer.MAX_VALUE, 0); } - public static ExecutorService newCachedThreadPool( + public static TracingExecutorService newCachedThreadPool( String threadName, ThreadGroup parentThreadGroup, int maxThreads, int maxQueue) { return new OThreadPoolExecutorWithLogging( 0, @@ -94,11 +94,11 @@ public static ExecutorService newCachedThreadPool( new NamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newSingleThreadPool(String threadName) { + public static TracingExecutorService newSingleThreadPool(String threadName) { return newSingleThreadPool(threadName, Thread.currentThread().getThreadGroup()); } - public static ExecutorService newSingleThreadPool( + public static TracingExecutorService newSingleThreadPool( String threadName, ThreadGroup parentThreadGroup) { return new OThreadPoolExecutorWithLogging( 1, @@ -109,7 +109,7 @@ public static ExecutorService newSingleThreadPool( new SingletonNamedThreadFactory(threadName, parentThreadGroup)); } - public static ExecutorService newSingleThreadPool( + public static TracingExecutorService newSingleThreadPool( String threadName, int maxQueue, RejectedExecutionHandler rejectHandler) { return new OThreadPoolExecutorWithLogging( 1, diff --git a/core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java b/core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java new file mode 100755 index 00000000000..be8f289c11a --- /dev/null +++ b/core/src/main/java/com/orientechnologies/common/thread/OTracedExecutionException.java @@ -0,0 +1,51 @@ +package com.orientechnologies.common.thread; + +import com.orientechnologies.common.exception.OException; +import com.orientechnologies.common.log.OLogManager; + +public class OTracedExecutionException extends OException { + + public OTracedExecutionException(String message, Exception cause) { + super(message); + initCause(cause); + } + + public OTracedExecutionException(String message) { + super(message); + } + + private static String taskName(String taskName, Object task) { + if (taskName != null) { + return taskName; + } + if (task != null) { + return task.getClass().getSimpleName(); + } + return "?"; + } + + public static OTracedExecutionException prepareTrace(String taskName, Object task) { + final OTracedExecutionException trace; + if (OLogManager.instance().isDebugEnabled()) { + trace = + new OTracedExecutionException( + String.format("Async task [%s] failed", taskName(taskName, task))); + trace.fillInStackTrace(); + } else { + trace = null; + } + return trace; + } + + public static OTracedExecutionException trace( + OTracedExecutionException trace, Exception e, String taskName, Object task) + throws OTracedExecutionException { + if (trace != null) { + trace.initCause(e); + return trace; + } else { + return new OTracedExecutionException( + String.format("Async task [%s] failed", taskName(taskName, task)), e); + } + } +} diff --git a/core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java b/core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java new file mode 100644 index 00000000000..2e01b05ed6d --- /dev/null +++ b/core/src/main/java/com/orientechnologies/common/thread/TracingExecutorService.java @@ -0,0 +1,16 @@ +package com.orientechnologies.common.thread; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public interface TracingExecutorService extends ExecutorService { + + Future submit(String taskName, Callable task); + + Future submit(String taskName, Runnable task); + + Future submit(String taskName, Runnable task, T result); + + void execute(String taskName, Runnable command); +} From fb9170df9f610a986fa701d6784ee34beb6a26c5 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 23:23:39 +1200 Subject: [PATCH 09/25] Add tracing support to scheduled executor service. --- ...cheduledThreadPoolExecutorWithLogging.java | 174 +++++++++++++++++- .../common/thread/OThreadPoolExecutors.java | 4 +- .../TracingScheduledExecutorService.java | 20 ++ 3 files changed, 189 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java diff --git a/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java b/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java index 67eb0014c7d..4a8e106d10f 100755 --- a/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OScheduledThreadPoolExecutorWithLogging.java @@ -1,18 +1,14 @@ package com.orientechnologies.common.thread; import com.orientechnologies.common.log.OLogManager; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; /** * The same as thread {@link ScheduledThreadPoolExecutor} but also logs all exceptions happened * inside of the tasks which caused tasks to stop. */ -public class OScheduledThreadPoolExecutorWithLogging extends ScheduledThreadPoolExecutor { +public class OScheduledThreadPoolExecutorWithLogging extends ScheduledThreadPoolExecutor + implements TracingScheduledExecutorService { public OScheduledThreadPoolExecutorWithLogging(int corePoolSize) { super(corePoolSize); } @@ -58,4 +54,168 @@ protected void afterExecute(Runnable r, Throwable t) { OLogManager.instance().errorNoDb(this, "Exception in thread '%s'", t, thread.getName()); } } + + @Override + public Future submit(String taskName, Callable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + return task.call(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }); + } + + @Override + public Future submit(String taskName, Runnable task, T result) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }, + result); + } + + @Override + public void execute(String taskName, Runnable command) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + super.execute( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }); + } + + @Override + public Future submit(Runnable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Callable task) { + return submit((String) null, task); + } + + @Override + public Future submit(Runnable task, T result) { + return submit(null, task, result); + } + + @Override + public void execute(Runnable command) { + execute(null, command); + } + + @Override + public ScheduledFuture schedule(String taskName, Runnable command, long delay, TimeUnit unit) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + return super.schedule( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }, + delay, + unit); + } + + @Override + public ScheduledFuture schedule( + String taskName, Callable task, long delay, TimeUnit unit) { + final OTracedExecutionException trace = OTracedExecutionException.prepareTrace(taskName, task); + return super.schedule( + () -> { + try { + return task.call(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, task); + } + }, + delay, + unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + String taskName, Runnable command, long initialDelay, long period, TimeUnit unit) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + return super.scheduleAtFixedRate( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }, + initialDelay, + period, + unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + String taskName, Runnable command, long initialDelay, long delay, TimeUnit unit) { + final OTracedExecutionException trace = + OTracedExecutionException.prepareTrace(taskName, command); + return super.scheduleWithFixedDelay( + () -> { + try { + command.run(); + } catch (Exception e) { + throw OTracedExecutionException.trace(trace, e, taskName, command); + } + }, + initialDelay, + delay, + unit); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return schedule(null, command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return schedule(null, callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return scheduleAtFixedRate(null, command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduleWithFixedDelay(null, command, initialDelay, delay, unit); + } } diff --git a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java index 204624a4289..b8e5dc56786 100644 --- a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java @@ -121,11 +121,11 @@ public static TracingExecutorService newSingleThreadPool( rejectHandler); } - public static ScheduledExecutorService newSingleThreadScheduledPool(String threadName) { + public static TracingScheduledExecutorService newSingleThreadScheduledPool(String threadName) { return newSingleThreadScheduledPool(threadName, Thread.currentThread().getThreadGroup()); } - public static ScheduledExecutorService newSingleThreadScheduledPool( + public static TracingScheduledExecutorService newSingleThreadScheduledPool( String threadName, ThreadGroup parentThreadGroup) { return new OScheduledThreadPoolExecutorWithLogging( 1, new SingletonNamedThreadFactory(threadName, parentThreadGroup)); diff --git a/core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java b/core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java new file mode 100644 index 00000000000..45974304f85 --- /dev/null +++ b/core/src/main/java/com/orientechnologies/common/thread/TracingScheduledExecutorService.java @@ -0,0 +1,20 @@ +package com.orientechnologies.common.thread; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public interface TracingScheduledExecutorService + extends TracingExecutorService, ScheduledExecutorService { + + ScheduledFuture schedule(String taskName, Runnable command, long delay, TimeUnit unit); + + ScheduledFuture schedule(String taskName, Callable callable, long delay, TimeUnit unit); + + ScheduledFuture scheduleAtFixedRate( + String taskName, Runnable command, long initialDelay, long period, TimeUnit unit); + + ScheduledFuture scheduleWithFixedDelay( + String taskName, Runnable command, long initialDelay, long delay, TimeUnit unit); +} From 6ff2afd73b27ba9da020cb72f881c47d0397166c Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:03:11 +1200 Subject: [PATCH 10/25] Add a global executor for use instead of general one-off threads. This allows improved logging and tracing consistency over general use of new Thread() --- .../common/thread/OThreadPoolExecutors.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java index b8e5dc56786..12de74f1032 100644 --- a/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java +++ b/core/src/main/java/com/orientechnologies/common/thread/OThreadPoolExecutors.java @@ -130,4 +130,21 @@ public static TracingScheduledExecutorService newSingleThreadScheduledPool( return new OScheduledThreadPoolExecutorWithLogging( 1, new SingletonNamedThreadFactory(threadName, parentThreadGroup)); } + + private static final TracingExecutorService GLOBAL_EXECUTOR = + newCachedThreadPool("OrientDB-Global"); + + public static void executeUnbound(Runnable task, String name) { + GLOBAL_EXECUTOR.submit( + name, + () -> { + final String poolThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(name); + task.run(); + } finally { + Thread.currentThread().setName(poolThreadName); + } + }); + } } From 0ce5af1a9dd1ee8cbd267408d3a81b04b8ceee4a Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:06:11 +1200 Subject: [PATCH 11/25] Defer setting running to end of distributed plugin startup. This prevents storage tasks that require the distributed status to be online from accessing distributed lifecycle objects that have not yet been set up (which shows up as NPEs during execution). --- .../orient/server/hazelcast/OHazelcastPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java index aea40e635bc..9a850970251 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java @@ -163,7 +163,6 @@ public void config(final OServer iServer, final OServerParameterConfiguration[] public void startup() { if (!enabled) return; - running = true; if (serverInstance.getDatabases() instanceof OrientDBDistributed) ((OrientDBDistributed) serverInstance.getDatabases()).setPlugin(this); @@ -372,6 +371,7 @@ public void onSignal(final Signal signal) { new ODistributedStartupException("Error on starting distributed plugin"), e); } + running = true; dumpServersStatus(); } From 1c49c99ddd21bea3e40e331a4d9c7389151e4386 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:07:41 +1200 Subject: [PATCH 12/25] Defer installation of databases until distributed plugin is online. This avoids accesses to uninitialised distributed state during initial database setup from cluster. --- .../server/hazelcast/OHazelcastPlugin.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java index 9a850970251..5e7cd8d356b 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java @@ -44,6 +44,7 @@ import com.orientechnologies.common.io.OUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.common.parser.OSystemVariableResolver; +import com.orientechnologies.common.thread.OThreadPoolExecutors; import com.orientechnologies.common.util.OCallable; import com.orientechnologies.common.util.OCallableNoParamNoReturn; import com.orientechnologies.common.util.OCallableUtils; @@ -311,17 +312,6 @@ public void startup() { publishLocalNodeConfiguration(); - new Thread( - () -> { - try { - installNewDatabasesFromCluster(); - loadLocalDatabases(); - } finally { - serverStarted.countDown(); - } - }) - .start(); - membershipListenerMapRegistration = configurationMap.getHazelcastMap().addEntryListener(this, true); membershipListenerRegistration = hazelcastInstance.getCluster().addMembershipListener(this); @@ -372,6 +362,19 @@ public void onSignal(final Signal signal) { } running = true; + OThreadPoolExecutors.executeUnbound( + () -> { + try { + installNewDatabasesFromCluster(); + + // FIXME: installNewDatabasesFromCluster catches exceptions, and loadLocalDatabases + // fails with same exception + loadLocalDatabases(); + } finally { + serverStarted.countDown(); + } + }, + "OHazelcastPlugin InstallDBs"); dumpServersStatus(); } From cdcec5b838c70a3395a41aa61cffb33f6eee2009 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:08:21 +1200 Subject: [PATCH 13/25] Sanity check that distributed plugin is online before attempting distributed lock. --- .../server/distributed/impl/ODistributedAbstractPlugin.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java index acbd11e9347..f5a3cb67bf6 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java @@ -20,6 +20,8 @@ package com.orientechnologies.orient.server.distributed.impl; import static com.orientechnologies.orient.core.config.OGlobalConfiguration.DISTRIBUTED_MAX_STARTUP_DELAY; +import static com.orientechnologies.orient.server.distributed.ODistributedServerManager.NODE_STATUS.OFFLINE; +import static com.orientechnologies.orient.server.distributed.ODistributedServerManager.NODE_STATUS.SHUTTINGDOWN; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.HazelcastInstanceNotActiveException; @@ -1978,6 +1980,10 @@ public T executeInDistributedDatabaseLock( OModifiableDistributedConfiguration lastCfg, final OCallable iCallback) { + if (getNodeStatus() == SHUTTINGDOWN || getNodeStatus() == OFFLINE) { + throw new OOfflineNodeException( + String.format("Node %s is shutting down or offline.", nodeName)); + } boolean updated; T result; getLockManagerExecutor().acquireExclusiveLock(databaseName, nodeName, timeoutLocking); From 3c533200732e879e2a17200675934e383f34ff7c Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:09:26 +1200 Subject: [PATCH 14/25] Fix waiting for last task in ViewManager close. Prefer attempting to cancel task before execution before waiting. Also removes double logging of execution exception, and avoids problem where get cannot be called after cancel. --- .../orient/core/db/viewmanager/ViewManager.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java index 80df1b779b9..a585e810d81 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java @@ -178,18 +178,20 @@ public void close() { } if (lastTask != null) { try { - try { + // Try to cancel last task before it runs, otherwise wait for completion + if (!lastTask.cancel(false)) { lastTask.get(20, TimeUnit.SECONDS); - } catch (TimeoutException e) { - lastTask.cancel(true); - lastTask.get(); } - + } catch (TimeoutException e) { + OLogManager.instance() + .warn( + this, + "Timeout waiting for last task to complete view update background operations"); } catch (InterruptedException e) { throw OException.wrapException( new OInterruptedException("Terminated while waiting update view to finis"), e); } catch (ExecutionException e) { - OLogManager.instance().warn(this, "Issue terminating view update background operations", e); + // Will already have been logged by thread pool executor } } From db457608a7124637442a6acd19f1417e9479f09c Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:20:29 +1200 Subject: [PATCH 15/25] Use tracing executor service in OrientDBEmbedded Provide tracing overrides to aid in tracking async errors. --- .../orient/core/db/OrientDBEmbedded.java | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index f6319fab53f..e6e053734a3 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -28,6 +28,7 @@ import com.orientechnologies.common.io.OIOUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.common.thread.OThreadPoolExecutors; +import com.orientechnologies.common.thread.TracingExecutorService; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.command.OCommandOutputListener; import com.orientechnologies.orient.core.command.script.OScriptManager; @@ -67,7 +68,6 @@ import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -92,7 +92,7 @@ public class OrientDBEmbedded implements OrientDBInternal { protected final Orient orient; protected final OCachedDatabasePoolFactory cachedPoolFactory; private volatile boolean open = true; - private ExecutorService executor; + private TracingExecutorService executor; private Timer timer; private TimerTask autoCloseTimer = null; private final OScriptManager scriptManager = new OScriptManager(); @@ -1063,9 +1063,31 @@ public Future execute(String database, String user, ODatabaseTask task }); } + public Future executeInternalNoAuthorization( + String taskName, String database, ODatabaseTask task) { + return executeInternal(taskName, database, null, task); + } + + public Future executeInternal( + String taskName, String database, String user, ODatabaseTask task) { + return executor.submit( + taskName, + () -> { + try (ODatabaseSession session = openInternal(database, null)) { + return task.call(session); + } + }); + } + @Override public Future executeNoAuthorization(String database, ODatabaseTask task) { + return executeNoAuthorization(null, database, task); + } + + public Future executeNoAuthorization( + String taskName, String database, ODatabaseTask task) { return executor.submit( + taskName, () -> { try (ODatabaseSession session = openNoAuthorization(database)) { return task.call(session); @@ -1074,7 +1096,11 @@ public Future executeNoAuthorization(String database, ODatabaseTask ta } public Future executeNoDb(Callable callable) { - return executor.submit(callable); + return executeNoDb(null, callable); + } + + public Future executeNoDb(String taskName, Callable callable) { + return executor.submit(taskName, callable); } public OScriptManager getScriptManager() { From ad6363f2d44d14c67182b4b4beda577d0b9e6184 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:21:33 +1200 Subject: [PATCH 16/25] Use internal open for ViewManager init Allows registering live updates to succeed when distributed plugin not online. --- .../core/db/viewmanager/ViewManager.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java index a585e810d81..f99bf2215ab 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java @@ -97,15 +97,17 @@ public ViewManager(OrientDBInternal orientDb, String dbName) { } protected void init() { - orientDB.executeNoAuthorization( - dbName, - (db) -> { - // do this to make sure that the storage is already initialized and so is the shared - // context. - // you just don't need the db passed as a param here - registerLiveUpdates(db); - return null; - }); + ((OrientDBEmbedded) orientDB) + .executeInternalNoAuthorization( + "ViewManager.registerLiveUpdates", + dbName, + (db) -> { + // do this to make sure that the storage is already initialized and so is the shared + // context. + // you just don't need the db passed as a param here + registerLiveUpdates(db); + return null; + }); } private synchronized void registerLiveUpdates(ODatabaseSession db) { From ede8fbf52719f497cb8199296a7351372967b5fb Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 19 Jul 2022 13:22:36 +1200 Subject: [PATCH 17/25] Make ViewManager updates resilient to offline DB status. View update uses distributed state, which can break if view update occurs during a distributed state change, breaking the update loop. --- .../core/db/viewmanager/ViewManager.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java index f99bf2215ab..679f75e94fa 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/viewmanager/ViewManager.java @@ -1,5 +1,6 @@ package com.orientechnologies.orient.core.db.viewmanager; +import com.orientechnologies.common.concur.OOfflineNodeException; import com.orientechnologies.common.concur.lock.OInterruptedException; import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.log.OLogManager; @@ -10,6 +11,7 @@ import com.orientechnologies.orient.core.db.ODatabaseSession; import com.orientechnologies.orient.core.db.OLiveQueryResultListener; import com.orientechnologies.orient.core.db.OScenarioThreadLocal; +import com.orientechnologies.orient.core.db.OrientDBEmbedded; import com.orientechnologies.orient.core.db.OrientDBInternal; import com.orientechnologies.orient.core.db.document.ODatabaseDocument; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentEmbedded; @@ -148,12 +150,22 @@ private void schedule() { public void run() { if (closed) return; lastTask = - orientDB.executeNoAuthorization( - dbName, - (db) -> { - ViewManager.this.updateViews((ODatabaseDocumentInternal) db); - return null; - }); + ((OrientDBEmbedded) orientDB) + .executeNoDb( + "ViewManager.updateViews", + () -> { + try (ODatabaseDocumentInternal db = + orientDB.openNoAuthorization(dbName)) { + ViewManager.this.updateViews(db); + } catch (OOfflineNodeException e) { + OLogManager.instance() + .debug(this, "Node offline when updating views", e); + } finally { + // When the run is finished schedule the next run. + schedule(); + } + return null; + }); } }; this.orientDB.scheduleOnce(timerTask, 1000); @@ -167,8 +179,6 @@ private void updateViews(ODatabaseDocumentInternal db) { if (view != null) { updateView(view, db); } - // When the run is finished schedule the next run. - schedule(); } catch (Exception e) { OLogManager.instance().warn(this, "Failed to update views", e); } From 4ec87b1c3fe9162ee9678d58af569ab824b76fd5 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Thu, 28 Jul 2022 12:11:54 +1200 Subject: [PATCH 18/25] Log transaction ID on re-enqueue --- .../server/distributed/impl/task/OTransactionPhase2Task.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java index daf5125b0a4..599a09b143b 100644 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/task/OTransactionPhase2Task.java @@ -147,7 +147,8 @@ public Object execute( OLogManager.instance() .info( OTransactionPhase2Task.this, - "Received second phase but not yet first phase, re-enqueue second phase"); + "Received second phase but not yet first phase for commit tx:%s, re-enqueue second phase", + firstPhaseId); ((ODatabaseDocumentDistributed) database) .getStorageDistributed() .getLocalDistributedDatabase() From 8c8e67c4fe02821463e774356c4d0fa7608751e3 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Thu, 28 Jul 2022 12:18:53 +1200 Subject: [PATCH 19/25] Move DistributedDatabase registration out of constructor. ODistributedDatabaseImpl construction registered the instance, leaking the this reference, and shut down the previous instance if present. The previous instance may not have been constructed fully however, so shutdown could NPE, resulting in the construction of the current instance aborting with uninitialised state, which would then be picked up by other threads finding it registered in the message service. This change externalises the construction into an atomic operation in the message service, and makes the state in the distributed database impl final. The warning about needing registration because of use "further in the call chain" appears to be spurious. --- .../impl/ODistributedDatabaseImpl.java | 120 +++++++----------- .../impl/ODistributedMessageServiceImpl.java | 9 +- 2 files changed, 46 insertions(+), 83 deletions(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java index e8f9f7c035c..fbf765dbca0 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java @@ -44,23 +44,9 @@ import com.orientechnologies.orient.core.tx.OTransactionSequenceStatus; import com.orientechnologies.orient.core.tx.OTxMetadataHolder; import com.orientechnologies.orient.core.tx.ValidationResult; -import com.orientechnologies.orient.server.OServer; import com.orientechnologies.orient.server.OSystemDatabase; -import com.orientechnologies.orient.server.distributed.ODistributedConfiguration; -import com.orientechnologies.orient.server.distributed.ODistributedDatabase; -import com.orientechnologies.orient.server.distributed.ODistributedException; -import com.orientechnologies.orient.server.distributed.ODistributedRequest; -import com.orientechnologies.orient.server.distributed.ODistributedRequestId; -import com.orientechnologies.orient.server.distributed.ODistributedResponse; -import com.orientechnologies.orient.server.distributed.ODistributedResponseManager; -import com.orientechnologies.orient.server.distributed.ODistributedResponseManagerImpl; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog; +import com.orientechnologies.orient.server.distributed.*; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import com.orientechnologies.orient.server.distributed.ODistributedSyncConfiguration; -import com.orientechnologies.orient.server.distributed.ODistributedTxContext; -import com.orientechnologies.orient.server.distributed.OModifiableDistributedConfiguration; -import com.orientechnologies.orient.server.distributed.ORemoteServerController; import com.orientechnologies.orient.server.distributed.impl.lock.OFreezeGuard; import com.orientechnologies.orient.server.distributed.impl.lock.OLockGuard; import com.orientechnologies.orient.server.distributed.impl.lock.OLockManager; @@ -74,16 +60,7 @@ import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.SortedSet; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -107,8 +84,8 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase { protected Map activeTxContexts = new ConcurrentHashMap<>(64); - private AtomicLong totalSentRequests = new AtomicLong(); - private AtomicLong totalReceivedRequests = new AtomicLong(); + private final AtomicLong totalSentRequests = new AtomicLong(); + private final AtomicLong totalReceivedRequests = new AtomicLong(); private TimerTask txTimeoutTask = null; private volatile boolean running = true; private volatile boolean parsing = true; @@ -116,12 +93,12 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase { private final String localNodeName; private final OSimpleLockManager recordLockManager; private final OSimpleLockManager indexKeyLockManager; - private AtomicLong operationsRunnig = new AtomicLong(0); - private ODistributedSynchronizedSequence sequenceManager; + private final AtomicLong operationsRunnig = new AtomicLong(0); + private final ODistributedSynchronizedSequence sequenceManager; private final AtomicLong pending = new AtomicLong(); - private ThreadPoolExecutor requestExecutor; - private OLockManager lockManager = new OLockManagerImpl(); - private Set inQueue = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final ThreadPoolExecutor requestExecutor; + private final OLockManager lockManager = new OLockManagerImpl(); + private final Set inQueue = Collections.newSetFromMap(new ConcurrentHashMap<>()); private OFreezeGuard freezeGuard; public static boolean sendResponseBack( @@ -186,34 +163,50 @@ public void endOperation() { operationsRunnig.decrementAndGet(); } - public ODistributedDatabaseImpl( + ODistributedDatabaseImpl( final OHazelcastPlugin manager, final ODistributedMessageServiceImpl msgService, - final String iDatabaseName, - final ODistributedConfiguration cfg, - OServer server) { + final String iDatabaseName) { this.manager = manager; this.msgService = msgService; this.databaseName = iDatabaseName; this.localNodeName = manager.getLocalNodeName(); - // SELF REGISTERING ITSELF HERE BECAUSE IT'S NEEDED FURTHER IN THE CALL CHAIN - final ODistributedDatabaseImpl prev = msgService.databases.put(iDatabaseName, this); - if (prev != null) { - // KILL THE PREVIOUS ONE - prev.shutdown(); - } - - startAcceptingRequests(); + this.requestExecutor = + (ThreadPoolExecutor)OThreadPoolExecutors.newScalingThreadPool( + String.format( + "OrientDB DistributedWorker node=%s db=%s", getLocalNodeName(), databaseName), + 0, + calculateWorkers(manager.getManagedDatabases().size()), + 0, + 1, + TimeUnit.HOURS); if (iDatabaseName.equals(OSystemDatabase.SYSTEM_DB_NAME)) { recordLockManager = null; indexKeyLockManager = null; - return; + sequenceManager = null; + } else { + long timeout = + manager + .getServerInstance() + .getContextConfiguration() + .getValueAsLong(DISTRIBUTED_ATOMIC_LOCK_TIMEOUT); + int sequenceSize = + manager + .getServerInstance() + .getContextConfiguration() + .getValueAsInteger(DISTRIBUTED_TRANSACTION_SEQUENCE_SET_SIZE); + recordLockManager = new OSimpleLockManagerImpl<>(timeout); + indexKeyLockManager = new OSimpleLockManagerImpl<>(timeout); + sequenceManager = new ODistributedSynchronizedSequence(localNodeName, sequenceSize); + + startTxTimeoutTimerTask(); + registerHooks(); } + } - startTxTimeoutTimerTask(); - + private void registerHooks() { Orient.instance() .getProfiler() .registerHookValue( @@ -283,20 +276,6 @@ public Object getValue() { } }, "distributed.db.*.recordLocks"); - - long timeout = - manager - .getServerInstance() - .getContextConfiguration() - .getValueAsLong(DISTRIBUTED_ATOMIC_LOCK_TIMEOUT); - int sequenceSize = - manager - .getServerInstance() - .getContextConfiguration() - .getValueAsInteger(DISTRIBUTED_TRANSACTION_SEQUENCE_SET_SIZE); - recordLockManager = new OSimpleLockManagerImpl<>(timeout); - indexKeyLockManager = new OSimpleLockManagerImpl<>(timeout); - sequenceManager = new ODistributedSynchronizedSequence(localNodeName, sequenceSize); } @Override @@ -1163,15 +1142,14 @@ protected String getLocalNodeName() { return localNodeName; } - private void startAcceptingRequests() { - // START ALL THE WORKER THREADS (CONFIGURABLE) + private static int calculateWorkers(int managedDbCount) { int totalWorkers = OGlobalConfiguration.DISTRIBUTED_DB_WORKERTHREADS.getValueAsInteger(); if (totalWorkers < 0) throw new ODistributedException( "Cannot create configured distributed workers (" + totalWorkers + ")"); else if (totalWorkers == 0) { // AUTOMATIC - final int totalDatabases = manager.getManagedDatabases().size() + 1; + final int totalDatabases = managedDbCount + 1; final int cpus = Runtime.getRuntime().availableProcessors(); @@ -1179,19 +1157,7 @@ else if (totalWorkers == 0) { if (totalWorkers == 0) totalWorkers = 1; } - - synchronized (this) { - this.requestExecutor = - (ThreadPoolExecutor) - OThreadPoolExecutors.newScalingThreadPool( - String.format( - "OrientDB DistributedWorker node=%s db=%s", getLocalNodeName(), databaseName), - 0, - totalWorkers, - 0, - 1, - TimeUnit.HOURS); - } + return totalWorkers; } @Override diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java index bae840642bd..d3c2693d79d 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedMessageServiceImpl.java @@ -57,7 +57,7 @@ public class ODistributedMessageServiceImpl implements ODistributedMessageServic private final ConcurrentHashMap responsesByRequestIds; private final TimerTask asynchMessageManager; protected final ConcurrentHashMap databases = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); private Thread responseThread; private long[] responseTimeMetrics = new long[10]; private volatile boolean running = true; @@ -153,11 +153,8 @@ public long getAverageResponseTime() { /** Creates a distributed database instance if not defined yet. */ public ODistributedDatabaseImpl registerDatabase( final String iDatabaseName, ODistributedConfiguration cfg) { - final ODistributedDatabaseImpl ddb = databases.get(iDatabaseName); - if (ddb != null) return ddb; - - return new ODistributedDatabaseImpl( - manager, this, iDatabaseName, cfg, manager.getServerInstance()); + return databases.computeIfAbsent( + iDatabaseName, db -> new ODistributedDatabaseImpl(manager, this, iDatabaseName)); } public ODistributedDatabaseImpl unregisterDatabase(final String iDatabaseName) { From d533fc8ea029b8461a07cd74104e753210c174de Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Thu, 28 Jul 2022 12:20:48 +1200 Subject: [PATCH 20/25] Add task name based tracing to scheduled tasks. --- .../orientechnologies/orient/core/Orient.java | 15 +++++++-------- .../impl/ODistributedDatabaseImpl.java | 18 ++++++++++++------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/Orient.java b/core/src/main/java/com/orientechnologies/orient/core/Orient.java index a7018c900b4..e0793197d07 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/Orient.java +++ b/core/src/main/java/com/orientechnologies/orient/core/Orient.java @@ -411,6 +411,11 @@ public Orient shutdown() { } public TimerTask scheduleTask(final Runnable task, final long delay, final long period) { + return scheduleTask(task.getClass().getSimpleName(), task, delay, period); + } + + public TimerTask scheduleTask( + final String taskName, final Runnable task, final long delay, final long period) { engineLock.readLock().lock(); try { final TimerTask timerTask = @@ -421,16 +426,10 @@ public void run() { task.run(); } catch (Exception e) { OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); + .error(Orient.this, "Error during execution of task " + taskName, e); } catch (Error e) { OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); + .error(Orient.this, "Error during execution of task " + taskName, e); throw e; } } diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java index fbf765dbca0..0ade32bf810 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java @@ -31,6 +31,7 @@ import com.orientechnologies.common.profiler.OAbstractProfiler; import com.orientechnologies.common.profiler.OProfiler; import com.orientechnologies.common.thread.OThreadPoolExecutors; +import com.orientechnologies.common.thread.TracingExecutorService; import com.orientechnologies.common.util.OCallable; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; @@ -96,7 +97,7 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase { private final AtomicLong operationsRunnig = new AtomicLong(0); private final ODistributedSynchronizedSequence sequenceManager; private final AtomicLong pending = new AtomicLong(); - private final ThreadPoolExecutor requestExecutor; + private final TracingExecutorService requestExecutor; private final OLockManager lockManager = new OLockManagerImpl(); private final Set inQueue = Collections.newSetFromMap(new ConcurrentHashMap<>()); private OFreezeGuard freezeGuard; @@ -173,7 +174,7 @@ public void endOperation() { this.localNodeName = manager.getLocalNodeName(); this.requestExecutor = - (ThreadPoolExecutor)OThreadPoolExecutors.newScalingThreadPool( + OThreadPoolExecutors.newScalingThreadPool( String.format( "OrientDB DistributedWorker node=%s db=%s", getLocalNodeName(), databaseName), 0, @@ -258,7 +259,7 @@ public Object getValue() { new OAbstractProfiler.OProfilerHookValue() { @Override public Object getValue() { - return (long) requestExecutor.getPoolSize(); + return (long) ((ThreadPoolExecutor) requestExecutor).getPoolSize(); } }, "distributed.db.*.workerThreads"); @@ -305,6 +306,7 @@ public void reEnqueue( pending.incrementAndGet(); Orient.instance() .scheduleTask( + String.format("DistributedDatabase[%s].reEnqueue", databaseName), () -> { try { processRequest( @@ -343,6 +345,9 @@ public void processRequest( + "' discarding"); } } + final String taskName = + String.format( + "DistributedDatabase[%s].processRequest.%s", getDatabaseName(), task.getName()); synchronized (this) { task.received(request, this); manager.messageReceived(request); @@ -364,7 +369,7 @@ public void processRequest( } }; try { - this.requestExecutor.submit(executeTask); + this.requestExecutor.submit(taskName, executeTask); } catch (RejectedExecutionException e) { task.finished(this); this.lockManager.unlock(guards); @@ -380,6 +385,7 @@ public void processRequest( } else { try { this.requestExecutor.submit( + taskName, () -> { execute(request); }); @@ -892,7 +898,7 @@ public long getReceivedRequests() { @Override public long getProcessedRequests() { - return requestExecutor.getCompletedTaskCount(); + return ((ThreadPoolExecutor) requestExecutor).getCompletedTaskCount(); } public void onDropShutdown() { @@ -1310,7 +1316,7 @@ public String dump() { "\n\nDATABASE '" + databaseName + "' ON SERVER '" + manager.getLocalNodeName() + "'"); buffer.append("\n- MESSAGES IN QUEUES"); - buffer.append(" (" + (requestExecutor.getPoolSize()) + " WORKERS):"); + buffer.append(" (" + (((ThreadPoolExecutor) requestExecutor).getPoolSize()) + " WORKERS):"); return buffer.toString(); } From 99a0bfb0bed25109ebf031ce70a3cd161bdbbb8c Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Thu, 28 Jul 2022 12:21:15 +1200 Subject: [PATCH 21/25] Avoid repeated distributed database shutdowns. --- .../server/distributed/impl/ODistributedDatabaseImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java index 0ade32bf810..28f2931724b 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedDatabaseImpl.java @@ -910,7 +910,10 @@ public void shutdown() { shutdown(true); } - public void shutdown(boolean wait) { + public synchronized void shutdown(boolean wait) { + if (!running) { + return; + } waitPending(); running = false; From 3d6d5e5853f2f6649eb2fdddfa28accbd2b34d87 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Thu, 28 Jul 2022 12:21:49 +1200 Subject: [PATCH 22/25] Fix race in registering installing database to guard against concurrent installs. --- .../server/distributed/impl/ODistributedAbstractPlugin.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java index f5a3cb67bf6..c9c1b507156 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODistributedAbstractPlugin.java @@ -1008,7 +1008,7 @@ public boolean installDatabase( // DON'T REPLICATE SYSTEM BECAUSE IS DIFFERENT AND PER SERVER return false; - if (installingDatabases.contains(databaseName)) { + if (!installingDatabases.add(databaseName)) { return false; } @@ -1016,7 +1016,6 @@ public boolean installDatabase( messageService.registerDatabase(databaseName, null); try { - installingDatabases.add(databaseName); return executeInDistributedDatabaseLock( databaseName, 20000, From d042c99d1e5246c0340339d683191ed15eacf45b Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Thu, 28 Jul 2022 12:23:14 +1200 Subject: [PATCH 23/25] Eliminate duplicate scheduleTask code. --- .../orientechnologies/orient/core/Orient.java | 40 +------------------ 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/Orient.java b/core/src/main/java/com/orientechnologies/orient/core/Orient.java index e0793197d07..8d681300eb9 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/Orient.java +++ b/core/src/main/java/com/orientechnologies/orient/core/Orient.java @@ -452,45 +452,7 @@ public void run() { } public TimerTask scheduleTask(final Runnable task, final Date firstTime, final long period) { - engineLock.readLock().lock(); - try { - final TimerTask timerTask = - new TimerTask() { - @Override - public void run() { - try { - task.run(); - } catch (Exception e) { - OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); - } catch (Error e) { - OLogManager.instance() - .error( - this, - "Error during execution of task " + task.getClass().getSimpleName(), - e); - throw e; - } - } - }; - - if (active) { - if (period > 0) { - timer.schedule(timerTask, firstTime, period); - } else { - timer.schedule(timerTask, firstTime); - } - } else { - OLogManager.instance().warn(this, "OrientDB engine is down. Task will not be scheduled."); - } - - return timerTask; - } finally { - engineLock.readLock().unlock(); - } + return scheduleTask(task, firstTime.getTime() - System.currentTimeMillis(), period); } public boolean isActive() { From 58991019478c1db9427dbc68db8cbdc2cda51bf3 Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Tue, 2 Aug 2022 13:00:55 +1200 Subject: [PATCH 24/25] Guard database create to avoid partially initialised storage being created. If the plugin isn't online, initialisation of newly created database will fail, resulting in a partially initialised database that will break when used (usually because the schema hasn't been loaded). --- .../orient/core/db/OrientDBEmbedded.java | 2 +- .../orient/core/db/OrientDBDistributed.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java index e6e053734a3..c4136b1ef5c 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OrientDBEmbedded.java @@ -581,7 +581,7 @@ protected String buildName(String name) { return basePath + "/" + name; } - public void create(String name, String user, String password, ODatabaseType type) { + public final void create(String name, String user, String password, ODatabaseType type) { create(name, user, password, type, null); } diff --git a/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java b/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java index 2f306dba7be..5a612a75b35 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java +++ b/distributed/src/main/java/com/orientechnologies/orient/core/db/OrientDBDistributed.java @@ -62,6 +62,15 @@ public synchronized OHazelcastPlugin getPlugin() { return plugin; } + @Override + public void create( + String name, String user, String password, ODatabaseType type, OrientDBConfig config) { + if (isDistributedEnabled() && (plugin == null)) { + throw new OOfflineNodeException("Distributed plugin is not active"); + } + super.create(name, user, password, type, config); + } + protected OSharedContext createSharedContext(OAbstractPaginatedStorage storage) { if (!isDistributedEnabled() || OSystemDatabase.SYSTEM_DB_NAME.equals(storage.getName())) { return new OSharedContextEmbedded(storage, this); From 61e3af58f4d52fba78f35845ab6a4a160b3b77cd Mon Sep 17 00:00:00 2001 From: Tim Whittington Date: Wed, 20 Jul 2022 09:10:23 +1200 Subject: [PATCH 25/25] DEV: Expand HazelcastPlugin startup time to widen window to expose race conditions on startup. --- .../orient/server/hazelcast/OHazelcastPlugin.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java index 5e7cd8d356b..dd66af2809d 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java @@ -164,6 +164,17 @@ public void config(final OServer iServer, final OServerParameterConfiguration[] public void startup() { if (!enabled) return; + try { + final String delayEnv = System.getenv("HAZELCAST_PLUGIN_STARTUP_DELAY"); + if (delayEnv != null) { + long delay = Long.parseLong(delayEnv); + OLogManager.instance().info(this, "Delaying HazelcastPlugin startup by '%d' ms", delay); + Thread.sleep(delay); + } + } catch (Throwable t) { + t.printStackTrace(); + } + if (serverInstance.getDatabases() instanceof OrientDBDistributed) ((OrientDBDistributed) serverInstance.getDatabases()).setPlugin(this);