From bfa0d60e2289b3ff8ef6671efb3e1f58f5f00c80 Mon Sep 17 00:00:00 2001 From: laa Date: Mon, 7 Sep 2020 14:57:44 +0300 Subject: [PATCH] Issue #9387 has been implemented. --- .../core/config/OGlobalConfiguration.java | 18 +++ .../orient/server/OServer.java | 19 +++ .../memorymanager/MXBeanMemoryManager.java | 110 ++++++++++++++++++ .../server/memorymanager/MemoryManager.java | 10 ++ .../memorymanager/NoOpMemoryManager.java | 12 ++ .../binary/ONetworkProtocolBinary.java | 2 + .../http/ONetworkProtocolHttpAbstract.java | 2 + 7 files changed, 173 insertions(+) create mode 100644 server/src/main/java/com/orientechnologies/orient/server/memorymanager/MXBeanMemoryManager.java create mode 100644 server/src/main/java/com/orientechnologies/orient/server/memorymanager/MemoryManager.java create mode 100644 server/src/main/java/com/orientechnologies/orient/server/memorymanager/NoOpMemoryManager.java diff --git a/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java b/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java index 8c7debec7fd..10434292fa1 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java +++ b/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java @@ -1456,6 +1456,24 @@ public void change(final Object iCurrentValue, final Object iNewValue) { true, false), + SERVER_HEAP_USAGE_LIMIT( + "server.heapUsageLimit", + "Once server heap usage reaches provided limit (in percent), " + + "sever will stop to accept new requests from client " + + "till heap usage will drop bellow given limit", + Integer.class, + 85, + false, + false), + + SERVER_HEAP_USAGE_SLEEP_INTERVAL( + "server.heapUsageSleepInterval", + "Once heap usage reaches threshold server will wait given time in ms. before checking memory usage again", + Integer.class, + 20, + false, + false), + // DISTRIBUTED /** @Since 2.2.18 */ DISTRIBUTED_DUMP_STATS_EVERY( diff --git a/server/src/main/java/com/orientechnologies/orient/server/OServer.java b/server/src/main/java/com/orientechnologies/orient/server/OServer.java index 2a21b38ee61..5e4b0d5e39f 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/OServer.java +++ b/server/src/main/java/com/orientechnologies/orient/server/OServer.java @@ -56,6 +56,9 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerManager; import com.orientechnologies.orient.server.distributed.config.ODistributedConfig; import com.orientechnologies.orient.server.handler.OConfigurableHooksManager; +import com.orientechnologies.orient.server.memorymanager.MXBeanMemoryManager; +import com.orientechnologies.orient.server.memorymanager.MemoryManager; +import com.orientechnologies.orient.server.memorymanager.NoOpMemoryManager; import com.orientechnologies.orient.server.network.OServerNetworkListener; import com.orientechnologies.orient.server.network.OServerSocketFactory; import com.orientechnologies.orient.server.network.protocol.ONetworkProtocol; @@ -116,6 +119,7 @@ public class OServer { private OClientConnectionManager clientConnectionManager; private OHttpSessionManager httpSessionManager; private OPushManager pushManager; + private volatile MemoryManager memoryManager; private ClassLoader extensionClassLoader; private OTokenHandler tokenHandler; private OSystemDatabase systemDatabase; @@ -448,6 +452,16 @@ public OServer activate() final OServerConfiguration configuration = serverCfg.getConfiguration(); tokenHandler = new OTokenHandlerImpl(this); + if (OGlobalConfiguration.SERVER_HEAP_USAGE_LIMIT.getValueAsInteger() > 0) { + memoryManager = + new MXBeanMemoryManager( + OGlobalConfiguration.SERVER_HEAP_USAGE_LIMIT.getValueAsInteger(), + OGlobalConfiguration.SERVER_HEAP_USAGE_SLEEP_INTERVAL.getValueAsInteger()); + } else { + memoryManager = new NoOpMemoryManager(); + } + + memoryManager.start(); if (configuration.network != null) { // REGISTER/CREATE SOCKET FACTORIES @@ -603,6 +617,7 @@ protected boolean deinit() { pushManager.shutdown(); clientConnectionManager.shutdown(); httpSessionManager.shutdown(); + memoryManager.shutdown(); if (pluginManager != null) pluginManager.shutdown(); @@ -1284,6 +1299,10 @@ public OrientDB getContext() { return context; } + public MemoryManager getMemoryManager() { + return memoryManager; + } + public void dropDatabase(String databaseName) { if (databases.exists(databaseName, null, null)) { databases.drop(databaseName, null, null); diff --git a/server/src/main/java/com/orientechnologies/orient/server/memorymanager/MXBeanMemoryManager.java b/server/src/main/java/com/orientechnologies/orient/server/memorymanager/MXBeanMemoryManager.java new file mode 100644 index 00000000000..8b180e7a83c --- /dev/null +++ b/server/src/main/java/com/orientechnologies/orient/server/memorymanager/MXBeanMemoryManager.java @@ -0,0 +1,110 @@ +package com.orientechnologies.orient.server.memorymanager; + +import com.orientechnologies.common.log.OLogManager; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryNotificationInfo; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.LockSupport; +import javax.management.ListenerNotFoundException; +import javax.management.Notification; +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; + +public final class MXBeanMemoryManager implements NotificationListener, MemoryManager { + private final int memoryThreshold; + private final int sleepInterval; + + private final ConcurrentLinkedQueue poolsWithOverhead = new ConcurrentLinkedQueue<>(); + private volatile HashMap memoryBeans; + + public MXBeanMemoryManager(final int memoryThreshold, final int sleepInterval) { + this.memoryThreshold = memoryThreshold; + this.sleepInterval = sleepInterval; + + assert sleepInterval >= 0; + assert memoryThreshold >= 0 && memoryThreshold < 100; + } + + @Override + public void start() { + if (memoryThreshold <= 0) { + return; + } + + final HashMap beansByName = new HashMap<>(); + + final List mxBeans = ManagementFactory.getMemoryPoolMXBeans(); + for (final MemoryPoolMXBean mxBean : mxBeans) { + if (mxBean.isUsageThresholdSupported() && mxBean.getType() == MemoryType.HEAP) { + final long maxMemory = mxBean.getUsage().getMax(); + if (maxMemory > 0) { + final long threshold = maxMemory * memoryThreshold / 100; + mxBean.setUsageThreshold(threshold); + + final NotificationEmitter emitter = (NotificationEmitter) mxBean; + emitter.addNotificationListener(this, null, null); + + beansByName.put(mxBean.getName(), mxBean); + + OLogManager.instance() + .infoNoDb( + this, + "Memory usage threshold for memory pool '%s' is set to %d bytes", + mxBean.getName(), + threshold); + } + } + } + + this.memoryBeans = beansByName; + } + + @Override + public void shutdown() { + for (final MemoryPoolMXBean mxBean : memoryBeans.values()) { + final NotificationEmitter emitter = (NotificationEmitter) mxBean; + try { + emitter.removeNotificationListener(this); + } catch (final ListenerNotFoundException e) { + throw new IllegalStateException( + "Memory bean " + + mxBean.getName() + + " was processed by memory manager but manager was not added as a listener", + e); + } + } + } + + @Override + public void checkAndWaitMemoryThreshold() { + if (poolsWithOverhead.isEmpty()) { + return; + } + + while (!poolsWithOverhead.isEmpty()) { + final String poolName = poolsWithOverhead.peek(); + final MemoryPoolMXBean bean = memoryBeans.get(poolName); + + while (bean.isUsageThresholdExceeded()) { + LockSupport.parkNanos(sleepInterval * 1_000_000L); + } + + poolsWithOverhead.poll(); + } + } + + @Override + public void handleNotification(final Notification notification, final Object handback) { + final String notificationType = notification.getType(); + if (notificationType.equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) { + final CompositeData cd = (CompositeData) notification.getUserData(); + final MemoryNotificationInfo info = MemoryNotificationInfo.from(cd); + poolsWithOverhead.add(info.getPoolName()); + } + } +} diff --git a/server/src/main/java/com/orientechnologies/orient/server/memorymanager/MemoryManager.java b/server/src/main/java/com/orientechnologies/orient/server/memorymanager/MemoryManager.java new file mode 100644 index 00000000000..d1fc03f7ab1 --- /dev/null +++ b/server/src/main/java/com/orientechnologies/orient/server/memorymanager/MemoryManager.java @@ -0,0 +1,10 @@ +package com.orientechnologies.orient.server.memorymanager; + +public interface MemoryManager { + void start(); + + void shutdown(); + + void checkAndWaitMemoryThreshold(); +} + diff --git a/server/src/main/java/com/orientechnologies/orient/server/memorymanager/NoOpMemoryManager.java b/server/src/main/java/com/orientechnologies/orient/server/memorymanager/NoOpMemoryManager.java new file mode 100644 index 00000000000..36182cab91c --- /dev/null +++ b/server/src/main/java/com/orientechnologies/orient/server/memorymanager/NoOpMemoryManager.java @@ -0,0 +1,12 @@ +package com.orientechnologies.orient.server.memorymanager; + +public final class NoOpMemoryManager implements MemoryManager { + @Override + public void start() {} + + @Override + public void shutdown() {} + + @Override + public void checkAndWaitMemoryThreshold() {} +} diff --git a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java index a9da518a7ad..45bc1026389 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java +++ b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java @@ -198,6 +198,8 @@ protected void execute() throws Exception { // do not remove this or we will get deadlock upon shutdown. if (isShutdownFlag()) return; + server.getMemoryManager().checkAndWaitMemoryThreshold(); + clientTxId = 0; okSent = false; try { diff --git a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/http/ONetworkProtocolHttpAbstract.java b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/http/ONetworkProtocolHttpAbstract.java index 7a83c747659..e840071d3ff 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/http/ONetworkProtocolHttpAbstract.java +++ b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/http/ONetworkProtocolHttpAbstract.java @@ -737,6 +737,8 @@ protected void execute() throws Exception { connection.getData().commandInfo = "Listening"; connection.getData().commandDetail = null; + server.getMemoryManager().checkAndWaitMemoryThreshold(); + try { channel.socket.setSoTimeout(socketTimeout); connection.getStats().lastCommandReceived = -1;